You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/03/31 16:57:41 UTC
qpid-jms git commit: QPIDJMS-282: add a check to the resource builder
to fire the open handling and send resulting output if the remote open was
already received
Repository: qpid-jms
Updated Branches:
refs/heads/master 0c0342090 -> 74eed4cfc
QPIDJMS-282: add a check to the resource builder to fire the open handling and send resulting output if the remote open was already received
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/74eed4cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/74eed4cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/74eed4cf
Branch: refs/heads/master
Commit: 74eed4cfc95ab7ab5b15dda5fa70d94608c76f66
Parents: 0c03420
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Mar 31 17:52:51 2017 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Mar 31 17:52:51 2017 +0100
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 19 ++++++++++
.../amqp/builders/AmqpResourceBuilder.java | 17 ++++++++-
.../integration/ConnectionIntegrationTest.java | 34 +++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 40 +++++++++++++++++---
4 files changed, 102 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index b571244..068beee 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -821,6 +821,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
+ public void scheduleExecuteAndPump(Runnable task) {
+ serializer.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ try {
+ task.run();
+ } finally {
+ pumpToProtonTransport();
+ }
+ } catch (Throwable t) {
+ LOG.warn("Caught problem during task processing: {}", t.getMessage(), t);
+
+ fireProviderException(t);
+ }
+ }
+ });
+ }
+
/**
* Callback method for the Transport to report that the underlying connection
* has closed. When called this method will queue a new task that will check for
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index 69d5f04..b244cfe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -32,6 +32,7 @@ import org.apache.qpid.jms.provider.amqp.AmqpResourceParent;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,11 +80,13 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
// Create the resource object now
resource = createResource(parent, resourceInfo, endpoint);
+ AmqpProvider provider = parent.getProvider();
+
if (getRequestTimeout() > JmsConnectionInfo.INFINITE) {
// Attempt to schedule a cancellation of the pending open request, can return
// null if there is no configured request timeout.
- requestTimeoutTask = parent.getProvider().scheduleRequestTimeout(new AsyncResult() {
+ requestTimeoutTask = provider.scheduleRequestTimeout(new AsyncResult() {
@Override
public void onSuccess() {
@@ -92,7 +95,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
@Override
public void onFailure(Throwable result) {
- handleClosed(parent.getProvider(), result);
+ handleClosed(provider, result);
}
@Override
@@ -102,6 +105,16 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
}, getRequestTimeout(), this);
}
+
+ // Check it wasn't already opened, if it is then handle it
+ if (endpoint.getRemoteState() != EndpointState.UNINITIALIZED) {
+ provider.scheduleExecuteAndPump(new Runnable() {
+ @Override
+ public void run() {
+ handleOpened(provider);
+ }
+ });
+ }
}
//----- Event handlers ---------------------------------------------------//
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index 76c1443..d8c713f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -732,4 +732,38 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
}
}
}
+
+ @Test(timeout = 20000)
+ public void testConnectionWithPreemptiveServerOpen() throws Exception {
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ // Ensure the Connection awaits a ClientID being set or not, giving time for the preemptive server Open
+ String uri = "amqp://localhost:" + testPeer.getServerPort() + "?jms.awaitClientID=true";
+
+ testPeer.expectSaslAnonymousWithServerAmqpHeaderSentPreemptively();
+ testPeer.sendPreemptiveServerAmqpHeader();
+ testPeer.sendPreemptiveServerOpenFrame();
+ // Then expect the clients header to arrive, but defer responding since the servers was already sent.
+ testPeer.expectHeader(AmqpHeader.HEADER, null);
+
+ ConnectionFactory factory = new JmsConnectionFactory(uri);
+ Connection connection = factory.createConnection();
+
+ // Then expect the clients Open frame to arrive, but defer responding since the servers was already sent
+ // before the clients AMQP connection open is provoked.
+ testPeer.expectOpen(null, null, true);
+ testPeer.expectBegin();
+
+ Thread.sleep(10); // Gives a little more time for the preemptive Open to actually arrive.
+
+ // Use the connection to provoke the Open
+ connection.setClientID("client-id");
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/74eed4cf/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index f6ebc7a..65c6a26 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -363,6 +363,17 @@ public class TestAmqpPeer implements AutoCloseable
_driverRunnable.sendBytes(header);
}
+ public void sendPreemptiveServerAmqpHeader() {
+ // Arrange to send the AMQP header after the previous handler
+ CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+ comp.add(new AmqpPeerRunnable() {
+ @Override
+ public void run() {
+ sendHeader(AmqpHeader.HEADER);
+ }
+ });
+ }
+
public void sendEmptyFrame(boolean deferWrite)
{
sendFrame(FrameType.AMQP, 0, null, null, deferWrite, 0);
@@ -438,7 +449,8 @@ public class TestAmqpPeer implements AutoCloseable
addHandler(new HeaderHandlerImpl(header, response));
}
- private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher, boolean sendSaslHeaderResponse)
+ private void expectSaslAuthentication(Symbol mechanism, Matcher<Binary> initialResponseMatcher, Matcher<?> hostnameMatcher,
+ boolean sendSaslHeaderResponse, boolean amqpHeaderSentPreemptively)
{
SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(mechanism);
byte[] saslHeaderResponse = null;
@@ -477,7 +489,10 @@ public class TestAmqpPeer implements AutoCloseable
addHandler(saslInitMatcher);
- addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
+ if (!amqpHeaderSentPreemptively)
+ {
+ addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER));
+ }
}
public void expectSaslPlain(String username, String password)
@@ -490,7 +505,7 @@ public class TestAmqpPeer implements AutoCloseable
Matcher<Binary> initialResponseMatcher = equalTo(new Binary(data));
- expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true);
+ expectSaslAuthentication(PLAIN, initialResponseMatcher, null, true, false);
}
public void expectSaslExternal()
@@ -500,7 +515,7 @@ public class TestAmqpPeer implements AutoCloseable
throw new IllegalStateException("need-client-cert must be enabled on the test peer");
}
- expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true);
+ expectSaslAuthentication(EXTERNAL, equalTo(new Binary(new byte[0])), null, true, false);
}
public void expectSaslAnonymous()
@@ -510,14 +525,19 @@ public class TestAmqpPeer implements AutoCloseable
public void expectSaslAnonymous(Matcher<?> hostnameMatcher)
{
- expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true);
+ expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), hostnameMatcher, true, false);
}
public void expectSaslAnonymousWithPreEmptiveServerHeader()
{
assertThat("Peer should be created with instruction to send preemptively", _driverRunnable.isSendSaslHeaderPreEmptively(), equalTo(true));
boolean sendSaslHeaderResponse = false; // Must arrange for the server to have already sent it preemptively
- expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse);
+ expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, sendSaslHeaderResponse, false);
+ }
+
+ public void expectSaslAnonymousWithServerAmqpHeaderSentPreemptively()
+ {
+ expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, true, true);
}
public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech)
@@ -598,6 +618,14 @@ public class TestAmqpPeer implements AutoCloseable
expectOpen(desiredCapabilities, serverCapabilities, null, serverProperties, null, null, false);
}
+ public void sendPreemptiveServerOpenFrame() {
+ // Arrange to send the Open frame after the previous handler
+ OpenFrame open = createOpenFrame();
+
+ CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
+ comp.add(new FrameSender(this, FrameType.AMQP, 0, open, null));
+ }
+
public void expectOpen(Symbol[] desiredCapabilities, Symbol[] serverCapabilities,
Matcher<?> clientPropertiesMatcher, Map<Symbol, Object> serverProperties,
Matcher<?> idleTimeoutMatcher, Matcher<?> hostnameMatcher, boolean deferOpened) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org