You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/09/15 16:38:04 UTC
svn commit: r1625043 - in /qpid/proton/trunk/proton-j/src:
main/java/org/apache/qpid/proton/engine/impl/
test/java/org/apache/qpid/proton/systemtests/
Author: robbie
Date: Mon Sep 15 14:38:03 2014
New Revision: 1625043
URL: http://svn.apache.org/r1625043
Log:
PROTON-685: iterate on a copy of the values to prevent CME after the child free() calls modifies the map
Added:
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java
Modified:
qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
Modified: qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java?rev=1625043&r1=1625042&r2=1625043&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java (original)
+++ qpid/proton/trunk/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java Mon Sep 15 14:38:03 2014
@@ -103,11 +103,14 @@ public class SessionImpl extends Endpoin
_connection.removeSessionEndpoint(_node);
_node = null;
- for(SenderImpl sender : _senders.values()) {
+ List<SenderImpl> senders = new ArrayList<SenderImpl>(_senders.values());
+ for(SenderImpl sender : senders) {
sender.free();
}
_senders.clear();
- for(ReceiverImpl receiver : _receivers.values()) {
+
+ List<ReceiverImpl> receivers = new ArrayList<ReceiverImpl>(_receivers.values());
+ for(ReceiverImpl receiver : receivers) {
receiver.free();
}
_receivers.clear();
Added: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java?rev=1625043&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java (added)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java Mon Sep 15 14:38:03 2014
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.systemtests;
+
+import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertTrue;
+
+import java.nio.ByteBuffer;
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+
+public abstract class EngineTestBase
+{
+ private static final Logger LOGGER = Logger.getLogger(EngineTestBase.class.getName());
+
+ private final TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER);
+ private final ProtonContainer _client = new ProtonContainer("clientContainer");
+ private final ProtonContainer _server = new ProtonContainer("serverContainer");
+
+ protected TestLoggingHelper getTestLoggingHelper()
+ {
+ return _testLoggingHelper;
+ }
+
+ protected ProtonContainer getClient()
+ {
+ return _client;
+ }
+
+ protected ProtonContainer getServer()
+ {
+ return _server;
+ }
+
+ protected void assertClientHasNothingToOutput()
+ {
+ assertEquals(0, getClient().transport.getOutputBuffer().remaining());
+ getClient().transport.outputConsumed();
+ }
+
+ protected void pumpServerToClient()
+ {
+ ByteBuffer serverBuffer = getServer().transport.getOutputBuffer();
+
+ getTestLoggingHelper().prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
+ assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
+
+ ByteBuffer clientBuffer = getClient().transport.getInputBuffer();
+
+ clientBuffer.put(serverBuffer);
+
+ assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining());
+
+ getClient().transport.processInput().checkIsOk();
+ getServer().transport.outputConsumed();
+ }
+
+ protected void pumpClientToServer()
+ {
+ ByteBuffer clientBuffer = getClient().transport.getOutputBuffer();
+
+ getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer);
+ assertTrue("Client expected to produce some output", clientBuffer.hasRemaining());
+
+ ByteBuffer serverBuffer = getServer().transport.getInputBuffer();
+
+ serverBuffer.put(clientBuffer);
+
+ assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining());
+
+ getClient().transport.outputConsumed();
+ getServer().transport.processInput().checkIsOk();
+ }
+
+ protected void doOutputInputCycle() throws Exception
+ {
+ pumpClientToServer();
+
+ pumpServerToClient();
+ }
+
+ protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState)
+ {
+ assertEquals(localState, endpoint.getLocalState());
+ assertEquals(remoteState, endpoint.getRemoteState());
+ }
+
+ protected void assertTerminusEquals(org.apache.qpid.proton.amqp.transport.Target expectedTarget, org.apache.qpid.proton.amqp.transport.Target actualTarget)
+ {
+ assertEquals(
+ ((Target)expectedTarget).getAddress(),
+ ((Target)actualTarget).getAddress());
+ }
+}
Added: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java?rev=1625043&view=auto
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java (added)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java Mon Sep 15 14:38:03 2014
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.proton.systemtests;
+
+import static java.util.EnumSet.of;
+import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
+import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
+import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+
+import java.util.logging.Logger;
+
+import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.messaging.Source;
+import org.apache.qpid.proton.amqp.messaging.Target;
+import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
+import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Receiver;
+import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.engine.Session;
+import org.junit.Test;
+
+public class FreeTest extends EngineTestBase
+{
+ private static final Logger LOGGER = Logger.getLogger(FreeTest.class.getName());
+
+ @Test
+ public void testFreeConnectionWithMultipleSessionsAndSendersAndReceiversDoesNotThrowCME() throws Exception
+ {
+ LOGGER.fine(bold("======== About to create transports"));
+
+ getClient().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+ getServer().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
+
+ getClient().connection = Proton.connection();
+ getClient().transport.bind(getClient().connection);
+
+ getServer().connection = Proton.connection();
+ getServer().transport.bind(getServer().connection);
+
+
+
+ LOGGER.fine(bold("======== About to open connections"));
+ getClient().connection.open();
+ getServer().connection.open();
+
+ doOutputInputCycle();
+
+
+
+ LOGGER.fine(bold("======== About to open sessions"));
+ getClient().session = getClient().connection.session();
+ getClient().session.open();
+
+ Session clientSession2 = getClient().connection.session();
+ clientSession2.open();
+
+ pumpClientToServer();
+
+ getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+ getServer().session.open();
+ assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+ Session serverSession2 = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ assertNotNull("Engine did not return expected second server session", serverSession2);
+ assertNotSame("Engine did not return expected second server session", serverSession2, getServer().session);
+ serverSession2.open();
+
+ pumpServerToClient();
+ assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+ assertEndpointState(clientSession2, ACTIVE, ACTIVE);
+
+
+
+ LOGGER.fine(bold("======== About to create client senders"));
+
+ getClient().source = new Source();
+ getClient().source.setAddress(null);
+
+ getClient().target = new Target();
+ getClient().target.setAddress("myQueue");
+
+ getClient().sender = getClient().session.sender("sender1");
+ getClient().sender.setTarget(getClient().target);
+ getClient().sender.setSource(getClient().source);
+
+ getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED);
+
+ getClient().sender.open();
+ assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
+
+
+ Sender clientSender2 = getClient().session.sender("sender2");
+ clientSender2.setTarget(getClient().target);
+ clientSender2.setSource(getClient().source);
+
+ clientSender2.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ clientSender2.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ assertEndpointState(clientSender2, UNINITIALIZED, UNINITIALIZED);
+
+ clientSender2.open();
+ assertEndpointState(clientSender2, ACTIVE, UNINITIALIZED);
+
+ pumpClientToServer();
+
+
+ LOGGER.fine(bold("======== About to set up server receivers"));
+
+ getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+ // Accept the settlement modes suggested by the client
+ getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
+ getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
+
+ org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget();
+ assertTerminusEquals(getClient().target, serverRemoteTarget);
+
+ getServer().receiver.setTarget(serverRemoteTarget);
+
+ assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
+ getServer().receiver.open();
+
+ assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
+
+ Receiver serverReceiver2 = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+ serverReceiver2.open();
+ assertEndpointState(serverReceiver2, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+ assertEndpointState(getClient().sender, ACTIVE, ACTIVE);
+ assertEndpointState(clientSender2, ACTIVE, ACTIVE);
+
+
+
+ LOGGER.fine(bold("======== About to create client receivers"));
+
+ Source src = new Source();
+ src.setAddress("myQueue");
+
+ Target tgt1 = new Target();
+ tgt1.setAddress("receiver1");
+
+ getClient().receiver = getClient().session.receiver("receiver1");
+ getClient().receiver.setSource(src);
+ getClient().receiver.setTarget(tgt1);
+
+ getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+ getClient().receiver.open();
+ assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+
+ Target tgt2 = new Target();
+ tgt1.setAddress("receiver2");
+
+ Receiver clientReceiver2 = getClient().session.receiver("receiver2");
+ clientReceiver2.setSource(src);
+ clientReceiver2.setTarget(tgt2);
+
+ clientReceiver2.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ clientReceiver2.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+
+ assertEndpointState(clientReceiver2, UNINITIALIZED, UNINITIALIZED);
+
+ clientReceiver2.open();
+ assertEndpointState(clientReceiver2, ACTIVE, UNINITIALIZED);
+
+ pumpClientToServer();
+
+
+
+ LOGGER.fine(bold("======== About to set up server senders"));
+
+ getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+ // Accept the settlement modes suggested by the client
+ getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+ getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+
+ org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget2 = getServer().sender.getRemoteTarget();
+ assertTerminusEquals(tgt1, serverRemoteTarget2);
+
+ getServer().sender.setTarget(serverRemoteTarget2);
+
+ assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+ getServer().sender.open();
+ assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+ Sender serverSender2 = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+ serverRemoteTarget2 = serverSender2.getRemoteTarget();
+ assertTerminusEquals(tgt2, serverRemoteTarget2);
+ serverSender2.setTarget(serverRemoteTarget2);
+ serverSender2.open();
+ assertEndpointState(serverSender2, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+ assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+ assertEndpointState(clientReceiver2, ACTIVE, ACTIVE);
+
+
+
+ LOGGER.fine(bold("======== About to close and free client's connection"));
+
+ getClient().connection.close();
+ getClient().connection.free();
+ }
+
+}
Modified: qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java
URL: http://svn.apache.org/viewvc/qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java?rev=1625043&r1=1625042&r2=1625043&view=diff
==============================================================================
--- qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java (original)
+++ qpid/proton/trunk/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java Mon Sep 15 14:38:03 2014
@@ -29,7 +29,6 @@ import static org.apache.qpid.proton.sys
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
-import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.logging.Logger;
@@ -42,8 +41,6 @@ import org.apache.qpid.proton.amqp.messa
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.message.Message;
import org.junit.Test;
@@ -64,84 +61,79 @@ import org.junit.Test;
*
* Does not illustrate use of the Messenger API.
*/
-public class ProtonEngineExampleTest
+public class ProtonEngineExampleTest extends EngineTestBase
{
private static final Logger LOGGER = Logger.getLogger(ProtonEngineExampleTest.class.getName());
private static final int BUFFER_SIZE = 4096;
- private TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER);
-
- private final ProtonContainer _client = new ProtonContainer("clientContainer");
- private final ProtonContainer _server = new ProtonContainer("serverContainer");
-
- private final String _targetAddress = _server.containerId + "-link1-target";
+ private final String _targetAddress = getServer().containerId + "-link1-target";
@Test
public void test() throws Exception
{
LOGGER.fine(bold("======== About to create transports"));
- _client.transport = Proton.transport();
- ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX);
+ getClient().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
- _server.transport = Proton.transport();
- ProtocolTracerEnabler.setProtocolTracer(_server.transport, " " + TestLoggingHelper.SERVER_PREFIX);
+ getServer().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
doOutputInputCycle();
- _client.connection = Proton.connection();
- _client.transport.bind(_client.connection);
+ getClient().connection = Proton.connection();
+ getClient().transport.bind(getClient().connection);
- _server.connection = Proton.connection();
- _server.transport.bind(_server.connection);
+ getServer().connection = Proton.connection();
+ getServer().transport.bind(getServer().connection);
LOGGER.fine(bold("======== About to open connections"));
- _client.connection.open();
- _server.connection.open();
+ getClient().connection.open();
+ getServer().connection.open();
doOutputInputCycle();
LOGGER.fine(bold("======== About to open sessions"));
- _client.session = _client.connection.session();
- _client.session.open();
+ getClient().session = getClient().connection.session();
+ getClient().session.open();
pumpClientToServer();
- _server.session = _server.connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
- assertEndpointState(_server.session, UNINITIALIZED, ACTIVE);
+ getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
- _server.session.open();
- assertEndpointState(_server.session, ACTIVE, ACTIVE);
+ getServer().session.open();
+ assertEndpointState(getServer().session, ACTIVE, ACTIVE);
pumpServerToClient();
- assertEndpointState(_client.session, ACTIVE, ACTIVE);
+ assertEndpointState(getClient().session, ACTIVE, ACTIVE);
LOGGER.fine(bold("======== About to create sender"));
- _client.source = new Source();
- _client.source.setAddress(null);
+ getClient().source = new Source();
+ getClient().source.setAddress(null);
- _client.target = new Target();
- _client.target.setAddress(_targetAddress);
+ getClient().target = new Target();
+ getClient().target.setAddress(_targetAddress);
- _client.sender = _client.session.sender("link1");
- _client.sender.setTarget(_client.target);
- _client.sender.setSource(_client.source);
+ getClient().sender = getClient().session.sender("link1");
+ getClient().sender.setTarget(getClient().target);
+ getClient().sender.setSource(getClient().source);
// Exactly once delivery semantics
- _client.sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
- _client.sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);
+ getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+ getClient().sender.setReceiverSettleMode(ReceiverSettleMode.SECOND);
- assertEndpointState(_client.sender, UNINITIALIZED, UNINITIALIZED);
+ assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED);
- _client.sender.open();
- assertEndpointState(_client.sender, ACTIVE, UNINITIALIZED);
+ getClient().sender.open();
+ assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED);
pumpClientToServer();
@@ -152,46 +144,46 @@ public class ProtonEngineExampleTest
// A real application would be interested in more states than simply ACTIVE, as there
// exists the possibility that the link could have moved to another state already e.g. CLOSED.
// (See pipelining).
- _server.receiver = (Receiver) _server.connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+ getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
// Accept the settlement modes suggested by the client
- _server.receiver.setSenderSettleMode(_server.receiver.getRemoteSenderSettleMode());
- _server.receiver.setReceiverSettleMode(_server.receiver.getRemoteReceiverSettleMode());
+ getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode());
+ getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode());
- org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = _server.receiver.getRemoteTarget();
- assertTerminusEquals(_client.target, serverRemoteTarget);
+ org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget();
+ assertTerminusEquals(getClient().target, serverRemoteTarget);
- _server.receiver.setTarget(applicationDeriveTarget(serverRemoteTarget));
+ getServer().receiver.setTarget(applicationDeriveTarget(serverRemoteTarget));
- assertEndpointState(_server.receiver, UNINITIALIZED, ACTIVE);
- _server.receiver.open();
+ assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE);
+ getServer().receiver.open();
- assertEndpointState(_server.receiver, ACTIVE, ACTIVE);
+ assertEndpointState(getServer().receiver, ACTIVE, ACTIVE);
pumpServerToClient();
- assertEndpointState(_client.sender, ACTIVE, ACTIVE);
+ assertEndpointState(getClient().sender, ACTIVE, ACTIVE);
- _server.receiver.flow(1);
+ getServer().receiver.flow(1);
pumpServerToClient();
LOGGER.fine(bold("======== About to create a message and send it to the server"));
- _client.message = Proton.message();
+ getClient().message = Proton.message();
Section messageBody = new AmqpValue("Hello");
- _client.message.setBody(messageBody);
- _client.messageData = new byte[BUFFER_SIZE];
- int lengthOfEncodedMessage = _client.message.encode(_client.messageData, 0, BUFFER_SIZE);
- _testLoggingHelper.prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(_client.messageData, lengthOfEncodedMessage));
+ getClient().message.setBody(messageBody);
+ getClient().messageData = new byte[BUFFER_SIZE];
+ int lengthOfEncodedMessage = getClient().message.encode(getClient().messageData, 0, BUFFER_SIZE);
+ getTestLoggingHelper().prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(getClient().messageData, lengthOfEncodedMessage));
byte[] deliveryTag = "delivery1".getBytes();
- _client.delivery = _client.sender.delivery(deliveryTag);
- int numberOfBytesAcceptedBySender = _client.sender.send(_client.messageData, 0, lengthOfEncodedMessage);
+ getClient().delivery = getClient().sender.delivery(deliveryTag);
+ int numberOfBytesAcceptedBySender = getClient().sender.send(getClient().messageData, 0, lengthOfEncodedMessage);
assertEquals("For simplicity, assume the sender can accept all the data",
lengthOfEncodedMessage, numberOfBytesAcceptedBySender);
- assertNull(_client.delivery.getLocalState());
+ assertNull(getClient().delivery.getLocalState());
- boolean senderAdvanced = _client.sender.advance();
+ boolean senderAdvanced = getClient().sender.advance();
assertTrue("sender has not advanced", senderAdvanced);
pumpClientToServer();
@@ -199,106 +191,106 @@ public class ProtonEngineExampleTest
LOGGER.fine(bold("======== About to process the message on the server"));
- _server.delivery = _server.connection.getWorkHead();
+ getServer().delivery = getServer().connection.getWorkHead();
assertEquals("The received delivery should be on our receiver",
- _server.receiver, _server.delivery.getLink());
- assertNull(_server.delivery.getLocalState());
- assertNull(_server.delivery.getRemoteState());
+ getServer().receiver, getServer().delivery.getLink());
+ assertNull(getServer().delivery.getLocalState());
+ assertNull(getServer().delivery.getRemoteState());
- assertFalse(_server.delivery.isPartial());
- assertTrue(_server.delivery.isReadable());
+ assertFalse(getServer().delivery.isPartial());
+ assertTrue(getServer().delivery.isReadable());
- _server.messageData = new byte[BUFFER_SIZE];
- int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData, 0, BUFFER_SIZE);
+ getServer().messageData = new byte[BUFFER_SIZE];
+ int numberOfBytesProducedByReceiver = getServer().receiver.recv(getServer().messageData, 0, BUFFER_SIZE);
assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver);
- _server.message = Proton.message();
- _server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver);
+ getServer().message = Proton.message();
+ getServer().message.decode(getServer().messageData, 0, numberOfBytesProducedByReceiver);
- boolean messageProcessed = applicationProcessMessage(_server.message);
+ boolean messageProcessed = applicationProcessMessage(getServer().message);
assertTrue(messageProcessed);
- _server.delivery.disposition(Accepted.getInstance());
- assertEquals(Accepted.getInstance(), _server.delivery.getLocalState());
+ getServer().delivery.disposition(Accepted.getInstance());
+ assertEquals(Accepted.getInstance(), getServer().delivery.getLocalState());
pumpServerToClient();
- assertEquals(Accepted.getInstance(), _client.delivery.getRemoteState());
+ assertEquals(Accepted.getInstance(), getClient().delivery.getRemoteState());
LOGGER.fine(bold("======== About to accept and settle the message on the client"));
- Delivery clientDelivery = _client.connection.getWorkHead();
- assertEquals(_client.delivery, clientDelivery);
+ Delivery clientDelivery = getClient().connection.getWorkHead();
+ assertEquals(getClient().delivery, clientDelivery);
assertTrue(clientDelivery.isUpdated());
- assertEquals(_client.sender, clientDelivery.getLink());
+ assertEquals(getClient().sender, clientDelivery.getLink());
clientDelivery.disposition(clientDelivery.getRemoteState());
- assertEquals(Accepted.getInstance(), _client.delivery.getLocalState());
+ assertEquals(Accepted.getInstance(), getClient().delivery.getLocalState());
clientDelivery.settle();
- assertNull("Now we've settled, the delivery should no longer be in the work list", _client.connection.getWorkHead());
+ assertNull("Now we've settled, the delivery should no longer be in the work list", getClient().connection.getWorkHead());
pumpClientToServer();
LOGGER.fine(bold("======== About to settle the message on the server"));
- assertEquals(Accepted.getInstance(), _server.delivery.getRemoteState());
- Delivery serverDelivery = _server.connection.getWorkHead();
- assertEquals(_server.delivery, serverDelivery);
+ assertEquals(Accepted.getInstance(), getServer().delivery.getRemoteState());
+ Delivery serverDelivery = getServer().connection.getWorkHead();
+ assertEquals(getServer().delivery, serverDelivery);
assertTrue(serverDelivery.isUpdated());
assertTrue("Client should have already settled", serverDelivery.remotelySettled());
serverDelivery.settle();
assertTrue(serverDelivery.isSettled());
- assertNull("Now we've settled, the delivery should no longer be in the work list", _server.connection.getWorkHead());
+ assertNull("Now we've settled, the delivery should no longer be in the work list", getServer().connection.getWorkHead());
// Increment the receiver's credit so its ready for another message.
// When using proton-c, this call is required in order to generate a Flow frame
// (proton-j sends one even without it to eagerly restore the session incoming window).
- _server.receiver.flow(1);
+ getServer().receiver.flow(1);
pumpServerToClient();
LOGGER.fine(bold("======== About to close client's sender"));
- _client.sender.close();
+ getClient().sender.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's link closure"));
- assertSame(_server.receiver, _server.connection.linkHead(of(ACTIVE), of(CLOSED)));
- _server.receiver.close();
+ assertSame(getServer().receiver, getServer().connection.linkHead(of(ACTIVE), of(CLOSED)));
+ getServer().receiver.close();
pumpServerToClient();
LOGGER.fine(bold("======== About to close client's session"));
- _client.session.close();
+ getClient().session.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's session closure"));
- assertSame(_server.session, _server.connection.sessionHead(of(ACTIVE), of(CLOSED)));
- _server.session.close();
+ assertSame(getServer().session, getServer().connection.sessionHead(of(ACTIVE), of(CLOSED)));
+ getServer().session.close();
pumpServerToClient();
LOGGER.fine(bold("======== About to close client's connection"));
- _client.connection.close();
+ getClient().connection.close();
pumpClientToServer();
LOGGER.fine(bold("======== Server about to process client's connection closure"));
- assertEquals(CLOSED, _server.connection.getRemoteState());
- _server.connection.close();
+ assertEquals(CLOSED, getServer().connection.getRemoteState());
+ getServer().connection.close();
pumpServerToClient();
@@ -331,66 +323,4 @@ public class ProtonEngineExampleTest
Object messageBody = ((AmqpValue)message.getBody()).getValue();
return "Hello".equals(messageBody);
}
-
- private void assertTerminusEquals(
- org.apache.qpid.proton.amqp.transport.Target expectedTarget,
- org.apache.qpid.proton.amqp.transport.Target actualTarget)
- {
- assertEquals(
- ((Target)expectedTarget).getAddress(),
- ((Target)actualTarget).getAddress());
- }
-
- private void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState)
- {
- assertEquals(localState, endpoint.getLocalState());
- assertEquals(remoteState, endpoint.getRemoteState());
- }
-
- private void doOutputInputCycle() throws Exception
- {
- pumpClientToServer();
-
- pumpServerToClient();
- }
-
- private void pumpClientToServer()
- {
- ByteBuffer clientBuffer = _client.transport.getOutputBuffer();
-
- _testLoggingHelper.prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer);
- assertTrue("Client expected to produce some output", clientBuffer.hasRemaining());
-
- ByteBuffer serverBuffer = _server.transport.getInputBuffer();
-
- serverBuffer.put(clientBuffer);
-
- assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining());
-
- _client.transport.outputConsumed();
- _server.transport.processInput().checkIsOk();
- }
-
- private void pumpServerToClient()
- {
- ByteBuffer serverBuffer = _server.transport.getOutputBuffer();
-
- _testLoggingHelper.prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer);
- assertTrue("Server expected to produce some output", serverBuffer.hasRemaining());
-
- ByteBuffer clientBuffer = _client.transport.getInputBuffer();
-
- clientBuffer.put(serverBuffer);
-
- assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining());
-
- _client.transport.processInput().checkIsOk();
- _server.transport.outputConsumed();
- }
-
- private void assertClientHasNothingToOutput()
- {
- assertEquals(0, _client.transport.getOutputBuffer().remaining());
- _client.transport.outputConsumed();
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org