You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/08/21 19:57:56 UTC
[4/5] qpid-jms git commit: QPIDJMS-95: if a [transport] failure is
reported while we are trying to open then fail the attempt to open the new
connection
QPIDJMS-95: if a [transport] failure is reported while we are trying to open then fail the attempt to open the new connection
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/9aabc27a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/9aabc27a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/9aabc27a
Branch: refs/heads/master
Commit: 9aabc27a4de515377cd149c41b6983435a241e3d
Parents: e46354b
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Aug 21 17:29:13 2015 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Aug 21 18:45:13 2015 +0100
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 18 ++-
.../failover/FailoverIntegrationTest.java | 122 +++++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 11 ++
.../jms/test/testpeer/TestAmqpPeerRunner.java | 12 ++
4 files changed, 159 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3a2e709..7383352 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -126,6 +126,7 @@ public class AmqpProvider implements Provider, TransportListener {
private final Transport protonTransport = Transport.Factory.create();
private final Collector protonCollector = new CollectorImpl();
+ private AsyncResult connectionOpenRequest;
private ScheduledFuture<?> nextIdleTimeoutCheck;
/**
@@ -240,7 +241,6 @@ public class AmqpProvider implements Provider, TransportListener {
try {
checkClosed();
resource.visit(new JmsResourceVistor() {
-
@Override
public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
AmqpSession session = connection.createSession(sessionInfo);
@@ -296,8 +296,7 @@ public class AmqpProvider implements Provider, TransportListener {
}
connection = new AmqpConnection(AmqpProvider.this, protonConnection, authenticator, connectionInfo);
- connection.open(new AsyncResult() {
-
+ AsyncResult wrappedOpenRequest = new AsyncResult() {
@Override
public void onSuccess() {
fireConnectionEstablished();
@@ -313,7 +312,11 @@ public class AmqpProvider implements Provider, TransportListener {
public boolean isComplete() {
return request.isComplete();
}
- });
+ };
+
+ connectionOpenRequest = wrappedOpenRequest;
+
+ connection.open(wrappedOpenRequest);
}
@Override
@@ -838,6 +841,8 @@ public class AmqpProvider implements Provider, TransportListener {
}
void fireConnectionEstablished() {
+ //The request onSuccess calls this method
+ connectionOpenRequest = null;
long now = System.currentTimeMillis();
long deadline = protonTransport.tick(now);
@@ -854,6 +859,11 @@ public class AmqpProvider implements Provider, TransportListener {
}
void fireProviderException(Throwable ex) {
+ if(connectionOpenRequest != null) {
+ connectionOpenRequest.onFailure(ex);
+ connectionOpenRequest = null;
+ }
+
ProviderListener listener = this.listener;
if (listener != null) {
listener.onConnectionFailure(IOExceptionSupport.create(ex));
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
new file mode 100644
index 0000000..6a086b3
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.failover;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsConnectionFactory;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverIntegrationTest extends QpidJmsTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class);
+
+ @Test(timeout = 20000)
+ public void testFailoverHandlesImmediateTransportDropAfterConnect() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer rejectingPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, one to fail to reconnect to, and a final one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String rejectingURI = createPeerURI(rejectingPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Rejecting peer is at: {}", rejectingURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ // Connect to the first
+ originalPeer.expectSaslAnonymousConnect();
+ originalPeer.expectBegin();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalURI.equals(remoteURI.toString())) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalURI.equals(remoteURI.toString())) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+ assertEquals("should not yet have connected to final peer", 1L, finalConnected.getCount());
+
+ // Set expectations on rejecting and final peer
+ rejectingPeer.expectSaslHeaderThenDrop();
+
+ finalPeer.expectSaslAnonymousConnect();
+ finalPeer.expectBegin();
+
+ // Close the original peer and wait for things to shake out.
+ originalPeer.close();
+
+ rejectingPeer.waitForAllHandlersToComplete(2000);
+
+ assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+ //Shut it down
+ finalPeer.expectClose();
+ connection.close();
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ private JmsConnection establishAnonymousConnecton(TestAmqpPeer origPeer, TestAmqpPeer rejectingPeer, TestAmqpPeer finalPeer) throws JMSException {
+ final String remoteURI = "failover:(" + createPeerURI(origPeer) + ","
+ + createPeerURI(rejectingPeer) + ","
+ + createPeerURI(finalPeer) + ")";
+
+ ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+ Connection connection = factory.createConnection();
+
+ return (JmsConnection) connection;
+ }
+
+ private String createPeerURI(TestAmqpPeer peer) {
+ return "amqp://localhost:" + peer.getServerPort();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 6f34ea5..62694d7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1562,4 +1562,15 @@ public class TestAmqpPeer implements AutoCloseable
_firstAssertionError = ae;
}
}
+
+ public void expectSaslHeaderThenDrop() {
+ AmqpPeerRunnable exitAfterHeader = new AmqpPeerRunnable() {
+ @Override
+ public void run() {
+ _driverRunnable.exitReadLoopEarly();
+ }
+ };
+
+ addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, exitAfterHeader));
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/9aabc27a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
index 17ec2f6..854e64c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java
@@ -48,6 +48,7 @@ class TestAmqpPeerRunner implements Runnable
private final Object _inputHandlingLock = new Object();
private final TestFrameParser _testFrameParser;
private volatile boolean _suppressReadExceptionOnClose;
+ private volatile boolean _exitReadLoopEarly;
private volatile Throwable _throwable;
@@ -108,6 +109,13 @@ class TestAmqpPeerRunner implements Runnable
_testFrameParser.input(networkInputByteBuffer);
}
+
+ if(_exitReadLoopEarly)
+ {
+ LOGGER.trace("Exiting read loop early");
+ break;
+ }
+
LOGGER.trace("Attempting read");
attemptingRead = true;
}
@@ -217,4 +225,8 @@ class TestAmqpPeerRunner implements Runnable
public boolean isNeedClientCert() {
return needClientCert;
}
+
+ public void exitReadLoopEarly() {
+ _exitReadLoopEarly = true;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org