You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/03/31 16:57:41 UTC

qpid-jms git commit: QPIDJMS-282: add a check to the resource builder to fire the open handling and send resulting output if the remote open was already received

Repository: qpid-jms
Updated Branches:
  refs/heads/master 0c0342090 -> 74eed4cfc


QPIDJMS-282: add a check to the resource builder to fire the open handling and send resulting output if the remote open was already received


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/74eed4cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/74eed4cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/74eed4cf

Branch: refs/heads/master
Commit: 74eed4cfc95ab7ab5b15dda5fa70d94608c76f66
Parents: 0c03420
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 31 17:52:51 2017 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 31 17:52:51 2017 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 19 ++++++++++
 .../amqp/builders/AmqpResourceBuilder.java      | 17 ++++++++-
 .../integration/ConnectionIntegrationTest.java  | 34 +++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 40 +++++++++++++++++---
 4 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index b571244..068beee 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -821,6 +821,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         }
     }
 
+    public void scheduleExecuteAndPump(Runnable task) {
+        serializer.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    try {
+                        task.run();
+                    } finally {
+                        pumpToProtonTransport();
+                    }
+                } catch (Throwable t) {
+                    LOG.warn("Caught problem during task processing: {}", t.getMessage(), t);
+
+                    fireProviderException(t);
+                }
+            }
+        });
+    }
+
     /**
      * Callback method for the Transport to report that the underlying connection
      * has closed.  When called this method will queue a new task that will check for

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 69d5f04..b244cfe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -32,6 +32,7 @@ import org.apache.qpid.jms.provider.amqp.AmqpResourceParent;
 import org.apache.qpid.jms.provider.amqp.AmqpSupport;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,11 +80,13 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
         // Create the resource object now
         resource = createResource(parent, resourceInfo, endpoint);
 
+        AmqpProvider provider = parent.getProvider();
+
         if (getRequestTimeout() > JmsConnectionInfo.INFINITE) {
 
             // Attempt to schedule a cancellation of the pending open request, can return
             // null if there is no configured request timeout.
-            requestTimeoutTask = parent.getProvider().scheduleRequestTimeout(new AsyncResult() {
+            requestTimeoutTask = provider.scheduleRequestTimeout(new AsyncResult() {
 
                 @Override
                 public void onSuccess() {
@@ -92,7 +95,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
 
                 @Override
                 public void onFailure(Throwable result) {
-                    handleClosed(parent.getProvider(), result);
+                    handleClosed(provider, result);
                 }
 
                 @Override
@@ -102,6 +105,16 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
 
             }, getRequestTimeout(), this);
         }
+
+        // Check it wasn't already opened, if it is then handle it
+        if (endpoint.getRemoteState() != EndpointState.UNINITIALIZED) {
+            provider.scheduleExecuteAndPump(new Runnable() {
+                @Override
+                public void run() {
+                    handleOpened(provider);
+                }
+            });
+        }
     }
 
     //----- Event handlers ---------------------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 76c1443..d8c713f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -732,4 +732,38 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
             }
         }
     }
+
+    @Test(timeout = 20000)
+    public void testConnectionWithPreemptiveServerOpen() throws Exception {
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // Ensure the Connection awaits a ClientID being set or not, giving time for the preemptive server Open
+            String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true";
+
+            testPeer.expectSaslAnonymousWithServerAmqpHeaderSentPreemptively();
+            testPeer.sendPreemptiveServerAmqpHeader();
+            testPeer.sendPreemptiveServerOpenFrame();
+            // Then expect the clients header to arrive, but defer responding since the servers was already sent.
+            testPeer.expectHeader(AmqpHeader.HEADER, null);
+
+            ConnectionFactory factory = new JmsConnectionFactory(uri);
+            Connection connection = factory.createConnection();
+
+            // Then expect the clients Open frame to arrive, but defer responding since the servers was already sent
+            // before the clients AMQP connection open is provoked.
+            testPeer.expectOpen(null, null, true);
+            testPeer.expectBegin();
+
+            Thread.sleep(10); // Gives a little more time for the preemptive Open to actually arrive.
+
+            // Use the connection to provoke the Open
+            connection.setClientID("client-id");
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index f6ebc7a..65c6a26 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -363,6 +363,17 @@ public class TestAmqpPeer implements AutoCloseable
         _driverRunnable.sendBytes(header);
     }
 
+    public void sendPreemptiveServerAmqpHeader() {
+        // Arrange to send the AMQP header after the previous handler
+        CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+        comp.add(new AmqpPeerRunnable() {
+            @Override
+            public void run() {
+                sendHeader(AmqpHeader.HEADER);
+            }
+        });
+    }
+
     public void sendEmptyFrame(boolean deferWrite)
     {
         sendFrame(FrameType.AMQP, 0, null, null, deferWrite, 0);
@@ -438,7 +449,8 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(new HeaderHandlerImpl(header, response));
     }
 
-    private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, boolean sendSaslHeaderResponse)
+    private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher,
+                                          boolean sendSaslHeaderResponse, boolean amqpHeaderSentPreemptively)
     {
         SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
         byte[] saslHeaderResponse = null;
@@ -477,7 +489,10 @@ public class TestAmqpPeer implements AutoCloseable
 
         addHandler(saslInitMatcher);
 
-        addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
+        if (!amqpHeaderSentPreemptively)
+        {
+            addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
+        }
     }
 
     public void expectSaslPlain(String username, String password)
@@ -490,7 +505,7 @@ public class TestAmqpPeer implements AutoCloseable
 
         Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
 
-        expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true);
+        expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true, false);
     }
 
     public void expectSaslExternal()
@@ -500,7 +515,7 @@ public class TestAmqpPeer implements AutoCloseable
             throw new IllegalStateException("need-client-cert must be enabled on the test peer");
         }
 
-        expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true);
+        expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true, false);
     }
 
     public void expectSaslAnonymous()
@@ -510,14 +525,19 @@ public class TestAmqpPeer implements AutoCloseable
 
     public void expectSaslAnonymous(Matcher<?> hostnameMatcher)
     {
-        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true);
+        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true, false);
     }
 
     public void expectSaslAnonymousWithPreEmptiveServerHeader()
     {
         assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true));
         boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively
-        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse);
+        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse, false);
+    }
+
+    public void expectSaslAnonymousWithServerAmqpHeaderSentPreemptively()
+    {
+        expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, true, true);
     }
 
     public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech)
@@ -598,6 +618,14 @@ public class TestAmqpPeer implements AutoCloseable
         expectOpen(desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false);
     }
 
+    public void sendPreemptiveServerOpenFrame() {
+        // Arrange to send the Open frame after the previous handler
+        OpenFrame open = createOpenFrame();
+
+        CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+        comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null));
+    }
+
     public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
                            Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties,
                            Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org