You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/03/06 17:35:43 UTC

[1/8] qpid-jms git commit: remove commented out ignores

Repository: qpid-jms
Updated Branches:
  refs/heads/master fb33675dd -> 4ab807b78


remove commented out ignores


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f3820697
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f3820697
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f3820697

Branch: refs/heads/master
Commit: f38206973b1faa6dd2a951b0ebca26d8c50a839d
Parents: fb33675
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 10:56:54 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 10:56:54 2015 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/jms/integration/SessionIntegrationTest.java    | 2 --
 1 file changed, 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f3820697/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index cde40b3..e7cbaea 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -221,7 +221,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    //@Ignore // TODO: Need to complete implementation and update test peer link handle behaviour
     @Test(timeout = 5000)
     public void testCreateAndDeleteTemporaryQueue() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
@@ -264,7 +263,6 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    //@Ignore // TODO: Need to complete implementation and update test peer link handle behaviour
     @Test(timeout = 5000)
     public void testCreateAndDeleteTemporaryTopic() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/8] qpid-jms git commit: make sending the End response optional, so the test peer can do other things before we make it reply

Posted by ro...@apache.org.
make sending the End response optional, so the test peer can do other things before we make it reply


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a7c2a9bf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a7c2a9bf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a7c2a9bf

Branch: refs/heads/master
Commit: a7c2a9bf695d4871449711f6abcc45e3a2863303
Parents: f382069
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 12:17:12 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 12:17:12 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 29 ++++++++++++--------
 1 file changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a7c2a9bf/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 350a5f5..90a9b40 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -479,21 +479,28 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectEnd()
     {
+        expectEnd(true);
+    }
+
+    public void expectEnd(boolean sendResponse)
+    {
         final EndMatcher endMatcher = new EndMatcher();
 
-        final EndFrame endResponse = new EndFrame();
+        if (sendResponse) {
+            final EndFrame endResponse = new EndFrame();
 
-        // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
-        final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null);
-        frameSender.setValueProvider(new ValueProvider()
-        {
-            @Override
-            public void setValues()
+            // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, -1, endResponse, null);
+            frameSender.setValueProvider(new ValueProvider()
             {
-                frameSender.setChannel(endMatcher.getActualChannel());
-            }
-        });
-        endMatcher.onSuccess(frameSender);
+                @Override
+                public void setValues()
+                {
+                    frameSender.setChannel(endMatcher.getActualChannel());
+                }
+            });
+            endMatcher.onSuccess(frameSender);
+        }
 
         addHandler(endMatcher);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[7/8] qpid-jms git commit: remove unused support for closing all the links opened on the last session

Posted by ro...@apache.org.
remove unused support for closing all the links opened on the last session

reverts df219312638023ca078d9054db23113a82007dd6


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f51836b5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f51836b5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f51836b5

Branch: refs/heads/master
Commit: f51836b57c40fd2354ab1eb5539be5b37f19d512
Parents: f2c7a7a
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:23:08 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:25:13 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 68 +-------------------
 1 file changed, 3 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f51836b5/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index b4197e9..0d04787 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -30,7 +30,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -112,7 +111,6 @@ public class TestAmqpPeer implements AutoCloseable
     private byte[] _deferredBytes;
     private int _lastInitiatedChannel = -1;
     private UnsignedInteger _lastInitiatedLinkHandle = null;
-    private Map<Integer, List<UnsignedInteger>> _channelToLinkMap = new ConcurrentHashMap<Integer, List<UnsignedInteger>>();
 
     public TestAmqpPeer() throws IOException
     {
@@ -550,9 +548,8 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
-                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(receivedChannel);
+                attachResponseSender.setChannel(attachMatcher.getActualChannel());
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setSource(attachMatcher.getReceivedSource());
@@ -564,7 +561,6 @@ public class TestAmqpPeer implements AutoCloseable
                 attachResponse.setTarget(t);
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
-                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -630,9 +626,8 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
-                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(receivedChannel);
+                attachResponseSender.setChannel(attachMatcher.getActualChannel());
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
@@ -643,7 +638,6 @@ public class TestAmqpPeer implements AutoCloseable
                 }
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
-                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -727,9 +721,8 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
-                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(receivedChannel);
+                attachResponseSender.setChannel(attachMatcher.getActualChannel());
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setTarget(attachMatcher.getReceivedTarget());
@@ -740,7 +733,6 @@ public class TestAmqpPeer implements AutoCloseable
                 }
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
-                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -1246,50 +1238,6 @@ public class TestAmqpPeer implements AutoCloseable
         }
     }
 
-    /**
-     * All links and sessions must have been created before calling this method, unlike
-     * {@link #remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean, boolean)}
-     */
-    public void remotelyDetachLinksOnLastOpenedSession(boolean expectDetachResponse, boolean closed) {
-        synchronized (_handlersLock) {
-            CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
-
-            int channel = _lastInitiatedChannel;
-            List<UnsignedInteger> links = _channelToLinkMap.get(channel);
-            if(links == null || links.isEmpty())
-            {
-                throw new IllegalStateException("No links found for channel: " + channel);
-            }
-
-            for (UnsignedInteger linkHandle : links)
-            {
-                // Now generate the Detach for the appropriate link on the appropriate session
-                final DetachFrame detachFrame = new DetachFrame();
-                detachFrame.setClosed(closed);
-                detachFrame.setHandle(linkHandle);
-                // TODO: add an optional error msg+condition?
-
-                final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel, detachFrame, null);
-                comp.add(frameSender);
-
-                if (expectDetachResponse) {
-                    Matcher<Boolean> closeMatcher = null;
-                    if (closed) {
-                        closeMatcher = equalTo(true);
-                    } else {
-                        closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
-                    }
-
-                    // Expect a response to our Detach.
-                    final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
-                    detachMatcher.withHandle(equalTo(linkHandle));
-                    // TODO: enable matching on the channel number of the response.
-                    addHandler(detachMatcher);
-                }
-            }
-        }
-    }
-
     private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
         CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
         Handler h = getLastHandler();
@@ -1301,16 +1249,6 @@ public class TestAmqpPeer implements AutoCloseable
         return comp;
     }
 
-    private void recordLinkCreation(int channel, UnsignedInteger handle) {
-        List<UnsignedInteger> links = _channelToLinkMap.get(channel);
-        if(links == null) {
-            links = new ArrayList<UnsignedInteger>();
-            _channelToLinkMap.put(channel, links);
-        }
-
-        links.add(handle);
-    }
-
     public void sendTransferToLastOpenedLinkOnLastOpenedSession(boolean deferWrite) {
         synchronized (_handlersLock) {
             CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/8] qpid-jms git commit: add support for sending a transfer after an arbitrary handlers success action, and ability to delay sending end response

Posted by ro...@apache.org.
add support for sending a transfer after an arbitrary handlers success action, and ability to delay sending end response


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8add0dc1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8add0dc1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8add0dc1

Branch: refs/heads/master
Commit: 8add0dc18983b01e9ed5a11ccac4a1e7c93c57b5
Parents: df21931
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:15:43 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:15:43 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 49 ++++++++++++++++++++
 1 file changed, 49 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8add0dc1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 8127a8f..b4197e9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -56,6 +56,7 @@ import org.apache.qpid.jms.test.testpeer.describedtypes.SaslOutcomeFrame;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Source;
 import org.apache.qpid.jms.test.testpeer.describedtypes.Target;
 import org.apache.qpid.jms.test.testpeer.describedtypes.TransferFrame;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.ApplicationPropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
@@ -1149,6 +1150,10 @@ public class TestAmqpPeer implements AutoCloseable
     }
 
     public void remotelyEndLastOpenedSession(boolean expectEndResponse) {
+        remotelyEndLastOpenedSession(expectEndResponse, 0);
+    }
+
+    public void remotelyEndLastOpenedSession(boolean expectEndResponse, final long delayBeforeSend) {
         synchronized (_handlersLock) {
             CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
 
@@ -1162,6 +1167,15 @@ public class TestAmqpPeer implements AutoCloseable
                 @Override
                 public void setValues() {
                     frameSender.setChannel(_lastInitiatedChannel);
+
+                    //Insert a delay if requested
+                    if (delayBeforeSend > 0) {
+                        try {
+                            Thread.sleep(delayBeforeSend);
+                        } catch (InterruptedException e) {
+                            // Ignore
+                        }
+                    }
                 }
             });
             comp.add(frameSender);
@@ -1296,4 +1310,39 @@ public class TestAmqpPeer implements AutoCloseable
 
         links.add(handle);
     }
+
+    public void sendTransferToLastOpenedLinkOnLastOpenedSession(boolean deferWrite) {
+        synchronized (_handlersLock) {
+            CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+
+            final int nextId = 0; //TODO: shouldn't be hard coded
+
+            String tagString = "theDeliveryTag" + nextId;
+            Binary dtag = new Binary(tagString.getBytes());
+
+            final TransferFrame transferResponse = new TransferFrame()
+            .setDeliveryId(UnsignedInteger.valueOf(nextId))
+            .setDeliveryTag(dtag)
+            .setMessageFormat(UnsignedInteger.ZERO)
+            .setSettled(false);
+
+            Binary payload = prepareTransferPayload(null, null, null, null, new AmqpValueDescribedType("myTextMessage"));
+
+            // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender transferSender = new FrameSender(this, FrameType.AMQP, -1, transferResponse, payload);
+            transferSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    transferResponse.setHandle(_lastInitiatedLinkHandle);
+                    transferSender.setChannel(_lastInitiatedChannel);
+                }
+            });
+
+            transferSender.setDeferWrite(false);
+
+            comp.add(transferSender);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[6/8] qpid-jms git commit: add test which highlights sending of disposition frames after we end the session

Posted by ro...@apache.org.
add test which highlights sending of disposition frames after we end the session


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f2c7a7a3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f2c7a7a3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f2c7a7a3

Branch: refs/heads/master
Commit: f2c7a7a31d968366548427df0454d11cf3ab82ed
Parents: 8add0dc
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:21:04 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:25:13 2015 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 31 ++++++++++++++++++++
 1 file changed, 31 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f2c7a7a3/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index 1ea2493..adf62e7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -1278,4 +1278,35 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             session.close();
         }
     }
+
+    @Ignore // TODO: fails due to PROTON-833. Needs workaround or 0.9 to resolve.
+    @Test(timeout = 5000)
+    public void testCloseSessionWithConsumerThatRemoteDetachesWithUnackedMessages() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            // Create a consumer, don't give it any messages
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            Queue queue = session.createQueue("myQueue");
+            session.createConsumer(queue);
+
+            //Expect the session close
+            testPeer.expectEnd(false);
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(false);
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true);
+            testPeer.remotelyEndLastOpenedSession(false, 200);
+
+            session.close();
+
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[8/8] qpid-jms git commit: fix problem in test peer frame parser relating to reading channel numbers

Posted by ro...@apache.org.
fix problem in test peer frame parser relating to reading channel numbers


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4ab807b7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4ab807b7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4ab807b7

Branch: refs/heads/master
Commit: 4ab807b789712d241f76bfddcf5cb75279b01bcd
Parents: f51836b
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 16:24:11 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 16:25:13 2015 +0000

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4ab807b7/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
index a562038..893a399 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestFrameParser.java
@@ -277,7 +277,7 @@ class TestFrameParser
                     // type
 
                     int type = currentInput.get() & 0xFF;
-                    int channel = currentInput.getShort() & 0xFF;
+                    int channel = currentInput.getShort() & 0xFFFF;
 
                     // note that this skips over the extended header if it's present
                     if(dataOffset!=8)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/8] qpid-jms git commit: add support for closing all the links opened on the last session

Posted by ro...@apache.org.
add support for closing all the links opened on the last session


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/df219312
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/df219312
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/df219312

Branch: refs/heads/master
Commit: df219312638023ca078d9054db23113a82007dd6
Parents: d509154
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 15:24:16 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 15:24:16 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 68 +++++++++++++++++++-
 1 file changed, 65 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/df219312/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 90a9b40..8127a8f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -110,6 +111,7 @@ public class TestAmqpPeer implements AutoCloseable
     private byte[] _deferredBytes;
     private int _lastInitiatedChannel = -1;
     private UnsignedInteger _lastInitiatedLinkHandle = null;
+    private Map<Integer, List<UnsignedInteger>> _channelToLinkMap = new ConcurrentHashMap<Integer, List<UnsignedInteger>>();
 
     public TestAmqpPeer() throws IOException
     {
@@ -547,8 +549,9 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
+                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponseSender.setChannel(receivedChannel);
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setSource(attachMatcher.getReceivedSource());
@@ -560,6 +563,7 @@ public class TestAmqpPeer implements AutoCloseable
                 attachResponse.setTarget(t);
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -625,8 +629,9 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
+                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponseSender.setChannel(receivedChannel);
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setSource(trimSourceOutcomesCapabilities(createSourceObjectFromDescribedType(attachMatcher.getReceivedSource())));
@@ -637,6 +642,7 @@ public class TestAmqpPeer implements AutoCloseable
                 }
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -720,8 +726,9 @@ public class TestAmqpPeer implements AutoCloseable
             public void setValues()
             {
                 Object receivedHandle = attachMatcher.getReceivedHandle();
+                int receivedChannel = attachMatcher.getActualChannel();
 
-                attachResponseSender.setChannel(attachMatcher.getActualChannel());
+                attachResponseSender.setChannel(receivedChannel);
                 attachResponse.setHandle(receivedHandle);
                 attachResponse.setName(attachMatcher.getReceivedName());
                 attachResponse.setTarget(attachMatcher.getReceivedTarget());
@@ -732,6 +739,7 @@ public class TestAmqpPeer implements AutoCloseable
                 }
 
                 _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle;
+                recordLinkCreation(receivedChannel, (UnsignedInteger) receivedHandle);
             }
         });
 
@@ -1224,6 +1232,50 @@ public class TestAmqpPeer implements AutoCloseable
         }
     }
 
+    /**
+     * All links and sessions must have been created before calling this method, unlike
+     * {@link #remotelyDetachLastOpenedLinkOnLastOpenedSession(boolean, boolean)}
+     */
+    public void remotelyDetachLinksOnLastOpenedSession(boolean expectDetachResponse, boolean closed) {
+        synchronized (_handlersLock) {
+            CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+
+            int channel = _lastInitiatedChannel;
+            List<UnsignedInteger> links = _channelToLinkMap.get(channel);
+            if(links == null || links.isEmpty())
+            {
+                throw new IllegalStateException("No links found for channel: " + channel);
+            }
+
+            for (UnsignedInteger linkHandle : links)
+            {
+                // Now generate the Detach for the appropriate link on the appropriate session
+                final DetachFrame detachFrame = new DetachFrame();
+                detachFrame.setClosed(closed);
+                detachFrame.setHandle(linkHandle);
+                // TODO: add an optional error msg+condition?
+
+                final FrameSender frameSender = new FrameSender(this, FrameType.AMQP, channel, detachFrame, null);
+                comp.add(frameSender);
+
+                if (expectDetachResponse) {
+                    Matcher<Boolean> closeMatcher = null;
+                    if (closed) {
+                        closeMatcher = equalTo(true);
+                    } else {
+                        closeMatcher = Matchers.anyOf(equalTo(false), nullValue());
+                    }
+
+                    // Expect a response to our Detach.
+                    final DetachMatcher detachMatcher = new DetachMatcher().withClosed(closeMatcher);
+                    detachMatcher.withHandle(equalTo(linkHandle));
+                    // TODO: enable matching on the channel number of the response.
+                    addHandler(detachMatcher);
+                }
+            }
+        }
+    }
+
     private CompositeAmqpPeerRunnable insertCompsiteActionForLastHandler() {
         CompositeAmqpPeerRunnable comp = new CompositeAmqpPeerRunnable();
         Handler h = getLastHandler();
@@ -1234,4 +1286,14 @@ public class TestAmqpPeer implements AutoCloseable
         h.onSuccess(comp);
         return comp;
     }
+
+    private void recordLinkCreation(int channel, UnsignedInteger handle) {
+        List<UnsignedInteger> links = _channelToLinkMap.get(channel);
+        if(links == null) {
+            links = new ArrayList<UnsignedInteger>();
+            _channelToLinkMap.put(channel, links);
+        }
+
+        links.add(handle);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/8] qpid-jms git commit: add test which highlights sending of detach frames after we end the session

Posted by ro...@apache.org.
add test which highlights sending of detach frames after we end the session


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d509154c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d509154c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d509154c

Branch: refs/heads/master
Commit: d509154c7ad9df4fbe9ff87ff8cbc26cd58e1a6d
Parents: a7c2a9b
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 6 12:17:58 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 6 12:44:58 2015 +0000

----------------------------------------------------------------------
 .../jms/integration/SessionIntegrationTest.java | 28 ++++++++++++++++++++
 1 file changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d509154c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
index e7cbaea..1ea2493 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java
@@ -76,6 +76,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Ignore;
@@ -1250,4 +1251,31 @@ public class SessionIntegrationTest extends QpidJmsTestCase {
             consumer.close();
         }
     }
+
+    @Ignore // TODO: fails due to PROTON-833. Needs workaround or 0.9 to resolve.
+    @Test(timeout = 5000)
+    public void testCloseSessionWithConsumerThatRemoteDetaches() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin(true);
+            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+            // Create a consumer
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            // Then locally close the session, provoke a remote-detach when the end reaches the
+            // test peer, followed by the session end 'response'. The test peer should not
+            // expect a reply to the detach, as the session was already ended at the client.
+            testPeer.expectEnd(false);
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(false, true);
+            testPeer.remotelyEndLastOpenedSession(false);
+
+            Queue queue = session.createQueue("myQueue");
+            session.createConsumer(queue);
+
+            session.close();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org