You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/03/06 17:35:43 UTC
[1/8] qpid-jms git commit: remove commented out ignores
Repository: qpid-jms
Updated Branches:
refs/heads/master fb33675dd -> 4ab807b78
remove commented out ignores
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f3820697
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f3820697
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f3820697
Branch: refs/heads/master
Commit: f38206973b1faa6dd2a951b0ebca26d8c50a839d
Parents: fb33675
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 10:56:54 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 10:56:54 2015 +0000
----------------------------------------------------------------------
.../org/apache/qpid/jms/integration/SessionIntegrationTest.java | 2 --
1 file changed, 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f3820697/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index cde40b3..e7cbaea 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -221,7 +221,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
}
- //@Ignore // TODO: Need to complete implementation and update test peer link handle behaviour
@Test(timeout = 5000)
public void testCreateAndDeleteTemporaryQueue() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -264,7 +263,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
}
}
- //@Ignore // TODO: Need to complete implementation and update test peer link handle behaviour
@Test(timeout = 5000)
public void testCreateAndDeleteTemporaryTopic() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/8] qpid-jms git commit: make sending the End response optional,
so the test peer can do other things before we make it reply
Posted by ro...@apache.org.
make sending the End response optional, so the test peer can do other things before we make it reply
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a7c2a9bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a7c2a9bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a7c2a9bf
Branch: refs/heads/master
Commit: a7c2a9bf695d4871449711f6abcc45e3a2863303
Parents: f382069
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 12:17:12 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 12:17:12 2015 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 29 ++++++++++++--------
1 file changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a7c2a9bf/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 350a5f5..90a9b40 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -479,21 +479,28 @@ public class TestAmqpPeer implements AutoCloseable
public void expectEnd()
{
+ expectEnd(true);
+ }
+
+ public void expectEnd(boolean sendResponse)
+ {
final EndMatcher endMatcher = new EndMatcher();
- final EndFrame endResponse = new EndFrame();
+ if (sendResponse) {
+ final EndFrame endResponse = new EndFrame();
- // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
- final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null);
- frameSender.setValueProvider(new ValueProvider()
- {
- @Override
- public void setValues()
+ // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null);
+ frameSender.setValueProvider(new ValueProvider()
{
- frameSender.setChannel(endMatcher.getActualChannel());
- }
- });
- endMatcher.onSuccess(frameSender);
+ @Override
+ public void setValues()
+ {
+ frameSender.setChannel(endMatcher.getActualChannel());
+ }
+ });
+ endMatcher.onSuccess(frameSender);
+ }
addHandler(endMatcher);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[7/8] qpid-jms git commit: remove unused support for closing all the
links opened on the last session
Posted by ro...@apache.org.
remove unused support for closing all the links opened on the last session
reverts df219312638023ca078d9054db23113a82007dd6
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f51836b5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f51836b5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f51836b5
Branch: refs/heads/master
Commit: f51836b57c40fd2354ab1eb5539be5b37f19d512
Parents: f2c7a7a
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:23:08 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:25:13 2015 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 68 +-------------------
1 file changed, 3 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f51836b5/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index b4197e9..0d04787 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -30,7 +30,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -112,7 +111,6 @@ public class TestAmqpPeer implements AutoCloseable
private byte[] _deferredBytes;
private int _lastInitiatedChannel = -1;
private UnsignedInteger _lastInitiatedLinkHandle = null;
- private Map<Integer, List<UnsignedInteger>> _channelToLinkMap = new ConcurrentHashMap<Integer, List<UnsignedInteger>>();
public TestAmqpPeer() throws IOException
{
@@ -550,9 +548,8 @@ public class TestAmqpPeer implements AutoCloseable
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
- int receivedChannel = attachMatcher.getActualChannel();
- attachResponseSender.setChannel(receivedChannel);
+ attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
@@ -564,7 +561,6 @@ public class TestAmqpPeer implements AutoCloseable
attachResponse.setTarget(t);
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
- recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
}
});
@@ -630,9 +626,8 @@ public class TestAmqpPeer implements AutoCloseable
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
- int receivedChannel = attachMatcher.getActualChannel();
- attachResponseSender.setChannel(receivedChannel);
+ attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
@@ -643,7 +638,6 @@ public class TestAmqpPeer implements AutoCloseable
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
- recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
}
});
@@ -727,9 +721,8 @@ public class TestAmqpPeer implements AutoCloseable
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
- int receivedChannel = attachMatcher.getActualChannel();
- attachResponseSender.setChannel(receivedChannel);
+ attachResponseSender.setChannel(attachMatcher.getActualChannel());
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setTarget(attachMatcher.getReceivedTarget());
@@ -740,7 +733,6 @@ public class TestAmqpPeer implements AutoCloseable
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
- recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
}
});
@@ -1246,50 +1238,6 @@ public class TestAmqpPeer implements AutoCloseable
}
}
- /**
- * All links and sessions must have been created before calling this method, unlike
- * {@link #remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean, boolean)}
- */
- public void remotelyDetachLinksOnLastOpenedSession(boolean expectDetachResponse, boolean closed) {
- synchronized (_handlersLock) {
- CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
-
- int channel = _lastInitiatedChannel;
- List<UnsignedInteger> links = _channelToLinkMap.get(channel);
- if(links == null || links.isEmpty())
- {
- throw new IllegalStateException("No links found for channel: " + channel);
- }
-
- for (UnsignedInteger linkHandle : links)
- {
- // Now generate the Detach for the appropriate link on the appropriate session
- final DetachFrame detachFrame = new DetachFrame();
- detachFrame.setClosed(closed);
- detachFrame.setHandle(linkHandle);
- // TODO: add an optional error msg+condition?
-
- final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel, detachFrame, null);
- comp.add(frameSender);
-
- if (expectDetachResponse) {
- Matcher<Boolean> closeMatcher = null;
- if (closed) {
- closeMatcher = equalTo(true);
- } else {
- closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
- }
-
- // Expect a response to our Detach.
- final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
- detachMatcher.withHandle(equalTo(linkHandle));
- // TODO: enable matching on the channel number of the response.
- addHandler(detachMatcher);
- }
- }
- }
- }
-
private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
Handler h = getLastHandler();
@@ -1301,16 +1249,6 @@ public class TestAmqpPeer implements AutoCloseable
return comp;
}
- private void recordLinkCreation(int channel, UnsignedInteger handle) {
- List<UnsignedInteger> links = _channelToLinkMap.get(channel);
- if(links == null) {
- links = new ArrayList<UnsignedInteger>();
- _channelToLinkMap.put(channel, links);
- }
-
- links.add(handle);
- }
-
public void sendTransferToLastOpenedLinkOnLastOpenedSession(boolean deferWrite) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[5/8] qpid-jms git commit: add support for sending a transfer after
an arbitrary handlers success action,
and ability to delay sending end response
Posted by ro...@apache.org.
add support for sending a transfer after an arbitrary handlers success action, and ability to delay sending end response
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8add0dc1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8add0dc1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8add0dc1
Branch: refs/heads/master
Commit: 8add0dc18983b01e9ed5a11ccac4a1e7c93c57b5
Parents: df21931
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:15:43 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:15:43 2015 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 49 ++++++++++++++++++++
1 file changed, 49 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8add0dc1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 8127a8f..b4197e9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -56,6 +56,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame;
import org.apache.qpid.jms.test.testpeer.describedtypes.Source;
import org.apache.qpid.jms.test.testpeer.describedtypes.Target;
import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
@@ -1149,6 +1150,10 @@ public class TestAmqpPeer implements AutoCloseable
}
public void remotelyEndLastOpenedSession(boolean expectEndResponse) {
+ remotelyEndLastOpenedSession(expectEndResponse, 0);
+ }
+
+ public void remotelyEndLastOpenedSession(boolean expectEndResponse, final long delayBeforeSend) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
@@ -1162,6 +1167,15 @@ public class TestAmqpPeer implements AutoCloseable
@Override
public void setValues() {
frameSender.setChannel(_lastInitiatedChannel);
+
+ //Insert a delay if requested
+ if (delayBeforeSend > 0) {
+ try {
+ Thread.sleep(delayBeforeSend);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
}
});
comp.add(frameSender);
@@ -1296,4 +1310,39 @@ public class TestAmqpPeer implements AutoCloseable
links.add(handle);
}
+
+ public void sendTransferToLastOpenedLinkOnLastOpenedSession(boolean deferWrite) {
+ synchronized (_handlersLock) {
+ CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+
+ final int nextId = 0; //TODO: shouldn't be hard coded
+
+ String tagString = "theDeliveryTag" + nextId;
+ Binary dtag = new Binary(tagString.getBytes());
+
+ final TransferFrame transferResponse = new TransferFrame()
+ .setDeliveryId(UnsignedInteger.valueOf(nextId))
+ .setDeliveryTag(dtag)
+ .setMessageFormat(UnsignedInteger.ZERO)
+ .setSettled(false);
+
+ Binary payload = prepareTransferPayload(null, null, null, null, new AmqpValueDescribedType("myTextMessage"));
+
+ // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender transferSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);
+ transferSender.setValueProvider(new ValueProvider()
+ {
+ @Override
+ public void setValues()
+ {
+ transferResponse.setHandle(_lastInitiatedLinkHandle);
+ transferSender.setChannel(_lastInitiatedChannel);
+ }
+ });
+
+ transferSender.setDeferWrite(false);
+
+ comp.add(transferSender);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[6/8] qpid-jms git commit: add test which highlights sending of
disposition frames after we end the session
Posted by ro...@apache.org.
add test which highlights sending of disposition frames after we end the session
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f2c7a7a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f2c7a7a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f2c7a7a3
Branch: refs/heads/master
Commit: f2c7a7a31d968366548427df0454d11cf3ab82ed
Parents: 8add0dc
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:21:04 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:25:13 2015 +0000
----------------------------------------------------------------------
.../jms/integration/SessionIntegrationTest.java | 31 ++++++++++++++++++++
1 file changed, 31 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2c7a7a3/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 1ea2493..adf62e7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1278,4 +1278,35 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
session.close();
}
}
+
+ @Ignore // TODO: fails due to PROTON-833. Needs workaround or 0.9 to resolve.
+ @Test(timeout = 5000)
+ public void testCloseSessionWithConsumerThatRemoteDetachesWithUnackedMessages() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin(true);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // Create a consumer, don't give it any messages
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+
+ Queue queue = session.createQueue("myQueue");
+ session.createConsumer(queue);
+
+ //Expect the session close
+ testPeer.expectEnd(false);
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(false);
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true);
+ testPeer.remotelyEndLastOpenedSession(false, 200);
+
+ session.close();
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[8/8] qpid-jms git commit: fix problem in test peer frame parser
relating to reading channel numbers
Posted by ro...@apache.org.
fix problem in test peer frame parser relating to reading channel numbers
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4ab807b7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4ab807b7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4ab807b7
Branch: refs/heads/master
Commit: 4ab807b789712d241f76bfddcf5cb75279b01bcd
Parents: f51836b
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:24:11 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:25:13 2015 +0000
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4ab807b7/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
index a562038..893a399 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
@@ -277,7 +277,7 @@ class TestFrameParser
// type
int type = currentInput.get() & 0xFF;
- int channel = currentInput.getShort() & 0xFF;
+ int channel = currentInput.getShort() & 0xFFFF;
// note that this skips over the extended header if it's present
if(dataOffset!=8)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/8] qpid-jms git commit: add support for closing all the links
opened on the last session
Posted by ro...@apache.org.
add support for closing all the links opened on the last session
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/df219312
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/df219312
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/df219312
Branch: refs/heads/master
Commit: df219312638023ca078d9054db23113a82007dd6
Parents: d509154
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 15:24:16 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 15:24:16 2015 +0000
----------------------------------------------------------------------
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 68 +++++++++++++++++++-
1 file changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df219312/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 90a9b40..8127a8f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -110,6 +111,7 @@ public class TestAmqpPeer implements AutoCloseable
private byte[] _deferredBytes;
private int _lastInitiatedChannel = -1;
private UnsignedInteger _lastInitiatedLinkHandle = null;
+ private Map<Integer, List<UnsignedInteger>> _channelToLinkMap = new ConcurrentHashMap<Integer, List<UnsignedInteger>>();
public TestAmqpPeer() throws IOException
{
@@ -547,8 +549,9 @@ public class TestAmqpPeer implements AutoCloseable
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
+ int receivedChannel = attachMatcher.getActualChannel();
- attachResponseSender.setChannel(attachMatcher.getActualChannel());
+ attachResponseSender.setChannel(receivedChannel);
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(attachMatcher.getReceivedSource());
@@ -560,6 +563,7 @@ public class TestAmqpPeer implements AutoCloseable
attachResponse.setTarget(t);
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+ recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
}
});
@@ -625,8 +629,9 @@ public class TestAmqpPeer implements AutoCloseable
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
+ int receivedChannel = attachMatcher.getActualChannel();
- attachResponseSender.setChannel(attachMatcher.getActualChannel());
+ attachResponseSender.setChannel(receivedChannel);
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
@@ -637,6 +642,7 @@ public class TestAmqpPeer implements AutoCloseable
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+ recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
}
});
@@ -720,8 +726,9 @@ public class TestAmqpPeer implements AutoCloseable
public void setValues()
{
Object receivedHandle = attachMatcher.getReceivedHandle();
+ int receivedChannel = attachMatcher.getActualChannel();
- attachResponseSender.setChannel(attachMatcher.getActualChannel());
+ attachResponseSender.setChannel(receivedChannel);
attachResponse.setHandle(receivedHandle);
attachResponse.setName(attachMatcher.getReceivedName());
attachResponse.setTarget(attachMatcher.getReceivedTarget());
@@ -732,6 +739,7 @@ public class TestAmqpPeer implements AutoCloseable
}
_lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+ recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
}
});
@@ -1224,6 +1232,50 @@ public class TestAmqpPeer implements AutoCloseable
}
}
+ /**
+ * All links and sessions must have been created before calling this method, unlike
+ * {@link #remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean, boolean)}
+ */
+ public void remotelyDetachLinksOnLastOpenedSession(boolean expectDetachResponse, boolean closed) {
+ synchronized (_handlersLock) {
+ CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+
+ int channel = _lastInitiatedChannel;
+ List<UnsignedInteger> links = _channelToLinkMap.get(channel);
+ if(links == null || links.isEmpty())
+ {
+ throw new IllegalStateException("No links found for channel: " + channel);
+ }
+
+ for (UnsignedInteger linkHandle : links)
+ {
+ // Now generate the Detach for the appropriate link on the appropriate session
+ final DetachFrame detachFrame = new DetachFrame();
+ detachFrame.setClosed(closed);
+ detachFrame.setHandle(linkHandle);
+ // TODO: add an optional error msg+condition?
+
+ final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel, detachFrame, null);
+ comp.add(frameSender);
+
+ if (expectDetachResponse) {
+ Matcher<Boolean> closeMatcher = null;
+ if (closed) {
+ closeMatcher = equalTo(true);
+ } else {
+ closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
+ }
+
+ // Expect a response to our Detach.
+ final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
+ detachMatcher.withHandle(equalTo(linkHandle));
+ // TODO: enable matching on the channel number of the response.
+ addHandler(detachMatcher);
+ }
+ }
+ }
+ }
+
private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
Handler h = getLastHandler();
@@ -1234,4 +1286,14 @@ public class TestAmqpPeer implements AutoCloseable
h.onSuccess(comp);
return comp;
}
+
+ private void recordLinkCreation(int channel, UnsignedInteger handle) {
+ List<UnsignedInteger> links = _channelToLinkMap.get(channel);
+ if(links == null) {
+ links = new ArrayList<UnsignedInteger>();
+ _channelToLinkMap.put(channel, links);
+ }
+
+ links.add(handle);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/8] qpid-jms git commit: add test which highlights sending of
detach frames after we end the session
Posted by ro...@apache.org.
add test which highlights sending of detach frames after we end the session
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d509154c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d509154c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d509154c
Branch: refs/heads/master
Commit: d509154c7ad9df4fbe9ff87ff8cbc26cd58e1a6d
Parents: a7c2a9b
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 12:17:58 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 12:44:58 2015 +0000
----------------------------------------------------------------------
.../jms/integration/SessionIntegrationTest.java | 28 ++++++++++++++++++++
1 file changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d509154c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index e7cbaea..1ea2493 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -76,6 +76,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Ignore;
@@ -1250,4 +1251,31 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
consumer.close();
}
}
+
+ @Ignore // TODO: fails due to PROTON-833. Needs workaround or 0.9 to resolve.
+ @Test(timeout = 5000)
+ public void testCloseSessionWithConsumerThatRemoteDetaches() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin(true);
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+ // Create a consumer
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+
+ // Then locally close the session, provoke a remote-detach when the end reaches the
+ // test peer, followed by the session end 'response'. The test peer should not
+ // expect a reply to the detach, as the session was already ended at the client.
+ testPeer.expectEnd(false);
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true);
+ testPeer.remotelyEndLastOpenedSession(false);
+
+ Queue queue = session.createQueue("myQueue");
+ session.createConsumer(queue);
+
+ session.close();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org