You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/09 20:25:42 UTC
[1/2] qpid-jms git commit: QPIDJMS-416 Run all AmqpProvider work on
netty event loop thread
Repository: qpid-jms
Updated Branches:
refs/heads/master 4b8739b75 -> 4314482de
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
index f463971..469be6e 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyTcpTransportTest.java
@@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
@@ -29,6 +30,7 @@ import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -36,8 +38,12 @@ import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.transports.Transport;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.QpidJMSTestRunner;
+import org.apache.qpid.jms.util.QpidJMSThreadFactory;
+import org.apache.qpid.jms.util.Repeat;
import org.junit.Ignore;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +59,7 @@ import io.netty.util.ResourceLeakDetector.Level;
/**
* Test basic functionality of the Netty based TCP transport.
*/
+@RunWith(QpidJMSTestRunner.class)
public class NettyTcpTransportTest extends QpidJmsTestCase {
private static final Logger LOG = LoggerFactory.getLogger(NettyTcpTransportTest.class);
@@ -91,6 +98,42 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
}
}
+ @Test(timeout = 60000)
+ public void testConnectWithCustomThreadFactoryConfigured() throws Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ QpidJMSThreadFactory factory = new QpidJMSThreadFactory("NettyTransportTest", true);
+
+ Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+ transport.setThreadFactory(factory);
+
+ try {
+ transport.connect(null, null);
+ } catch (Exception e) {
+ LOG.info("Failed to connect to: {} as expected.", serverLocation);
+ fail("Should have failed to connect to the server: " + serverLocation);
+ }
+
+ assertTrue(transport.isConnected());
+ assertSame(factory, transport.getThreadFactory());
+
+ try {
+ transport.setThreadFactory(factory);
+ } catch (IllegalStateException expected) {
+ LOG.trace("Caught expected state exception");
+ }
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
@Test(timeout = 60 * 1000)
public void testConnectWithoutRunningServer() throws Exception {
try (NettyEchoServer server = createEchoServer(createServerOptions())) {
@@ -103,7 +146,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
fail("Should have failed to connect to the server: " + serverLocation);
} catch (Exception e) {
LOG.info("Failed to connect to: {} as expected.", serverLocation);
@@ -129,7 +172,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
Transport transport = createTransport(serverLocation, null, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
fail("Should have failed to connect to the server: " + serverLocation);
} catch (Exception e) {
LOG.info("Failed to connect to: {} as expected.", serverLocation);
@@ -155,7 +198,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
assertNotNull(transport.getTransportListener());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should not have failed to connect to the server at " + serverLocation + " but got exception: " + e);
@@ -177,7 +220,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -212,7 +255,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
for (int i = 0; i < CONNECTION_COUNT; ++i) {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
assertTrue(transport.isConnected());
LOG.info("Connected to server:{} as expected.", serverLocation);
transports.add(transport);
@@ -252,7 +295,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
for (int i = 0; i < CONNECTION_COUNT; ++i) {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
transport.writeAndFlush(sendBuffer.copy());
transports.add(transport);
} catch (Exception e) {
@@ -288,7 +331,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -326,7 +369,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -354,7 +397,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -405,7 +448,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -448,7 +491,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -467,6 +510,63 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
}
}
+ @Test(timeout = 60000)
+ public void testConnectRunsInitializationMethod() throws Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+ final AtomicBoolean initialized = new AtomicBoolean();
+
+ Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+ try {
+ transport.connect(() -> initialized.set(true), null);
+ LOG.info("Connected to server:{} as expected.", serverLocation);
+ } catch (Exception e) {
+ fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
+ }
+
+ assertTrue(transport.isConnected());
+ assertEquals(serverLocation, transport.getRemoteLocation());
+ assertTrue(initialized.get());
+
+ transport.close();
+ }
+
+ assertTrue(!transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
+ @Test(timeout = 60000)
+ @Repeat(repetitions = 1)
+ public void testFailureInInitializationRoutineFailsConnect() throws Exception {
+ try (NettyEchoServer server = createEchoServer(createServerOptions())) {
+ server.start();
+
+ int port = server.getServerPort();
+ URI serverLocation = new URI("tcp://localhost:" + port);
+
+ Transport transport = createTransport(serverLocation, testListener, createClientOptions());
+ try {
+ transport.connect(() -> { throw new RuntimeException(); }, null);
+ fail("Should not have connected to the server at " + serverLocation);
+ } catch (Exception e) {
+ LOG.info("Failed to connect to server:{} as expected", serverLocation);
+ }
+
+ assertFalse("Should not be connected", transport.isConnected());
+ assertEquals("Server location is incorrect", serverLocation, transport.getRemoteLocation());
+
+ transport.close();
+ }
+
+ assertTrue(transportClosed); // Normal shutdown does not trigger the event.
+ assertTrue(exceptions.isEmpty());
+ assertTrue(data.isEmpty());
+ }
+
@Ignore("Used for checking for transport level leaks, my be unstable on CI.")
@Test(timeout = 60 * 1000)
public void testSendToClosedTransportFailsButDoesNotLeak() throws Exception {
@@ -483,7 +583,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
for (int i = 0; i < 256; ++i) {
transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -531,7 +631,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
options.setUseKQueue(false);
Transport transport = createTransport(serverLocation, testListener, options);
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -601,7 +701,7 @@ public class NettyTcpTransportTest extends QpidJmsTestCase {
options.setUseEpoll(false);
Transport transport = createTransport(serverLocation, testListener, options);
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
index ead740c..2ea873d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyWsTransportTest.java
@@ -75,7 +75,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -110,7 +110,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, createClientOptions());
try {
- transport.connect(null);
+ transport.connect(null, null);
fail("Should have failed to connect to the server: " + serverLocation);
} catch (Exception e) {
LOG.info("Failed to connect to: {} as expected.", serverLocation);
@@ -149,7 +149,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
try {
// The transport should allow for the size of data we sent.
transport.setMaxFrameSize(FRAME_SIZE);
- transport.connect(null);
+ transport.connect(null, null);
transports.add(transport);
transport.writeAndFlush(sendBuffer.copy());
} catch (Exception e) {
@@ -200,7 +200,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, wsListener, createClientOptions);
try {
transport.setMaxFrameSize(FRAME_SIZE);
- transport.connect(null);
+ transport.connect(null, null);
transports.add(transport);
transport.writeAndFlush(sendBuffer.copy());
} catch (Exception e) {
@@ -261,7 +261,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
// Transport can't receive anything bigger so it should fail the connection
// when data arrives that is larger than this value.
transport.setMaxFrameSize(FRAME_SIZE / 2);
- transport.connect(null);
+ transport.connect(null, null);
transports.add(transport);
transport.writeAndFlush(sendBuffer.copy());
} catch (Exception e) {
@@ -297,7 +297,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
try {
// Transport allows bigger frames in so that server is the one causing the failure.
transport.setMaxFrameSize(FRAME_SIZE);
- transport.connect(null);
+ transport.connect(null, null);
transports.add(transport);
transport.writeAndFlush(sendBuffer.copy());
} catch (Exception e) {
@@ -358,7 +358,7 @@ public class NettyWsTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, clientOptions);
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-416 Run all AmqpProvider work on
netty event loop thread
Posted by ta...@apache.org.
QPIDJMS-416 Run all AmqpProvider work on netty event loop thread
Move from a separate provider executor to using the netty event loop for
AMQP protocol handling.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/4314482d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/4314482d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/4314482d
Branch: refs/heads/master
Commit: 4314482de1e8cb7e58d8331ca0e459f32d78ab4e
Parents: 4b8739b
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Oct 9 16:25:23 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Oct 9 16:25:23 2018 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 969 +++++++++----------
.../apache/qpid/jms/transports/Transport.java | 26 +-
.../jms/transports/netty/NettyTcpTransport.java | 85 +-
.../jms/provider/amqp/AmqpProviderTest.java | 104 +-
.../netty/NettyOpenSslTransportTest.java | 4 +-
.../transports/netty/NettySslTransportTest.java | 12 +-
.../transports/netty/NettyTcpTransportTest.java | 128 ++-
.../transports/netty/NettyWsTransportTest.java | 14 +-
8 files changed, 788 insertions(+), 554 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index cc3915d..dbdc977 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.security.ProviderException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -26,9 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -72,7 +74,6 @@ import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
-import org.apache.qpid.jms.util.ThreadPoolUtils;
import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Delivery;
@@ -140,7 +141,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private final URI remoteURI;
private final AtomicBoolean closed = new AtomicBoolean();
private volatile Throwable failureCause;
- private ScheduledThreadPoolExecutor serializer;
+ private ScheduledExecutorService serializer;
private final org.apache.qpid.proton.engine.Transport protonTransport =
org.apache.qpid.proton.engine.Transport.Factory.create();
private final Collector protonCollector = new CollectorImpl();
@@ -164,127 +165,130 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
this.remoteURI = remoteURI;
this.transport = transport;
this.futureFactory = futureFactory;
-
- serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory(
- "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
- remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "]", true));
-
- serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
- serializer.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
}
@Override
public void connect(final JmsConnectionInfo connectionInfo) throws IOException {
checkClosedOrFailed();
+ if (serializer != null) {
+ throw new IllegalStateException("Connect cannot be called more than once");
+ }
+
final ProviderFuture connectRequest = futureFactory.createFuture();
- serializer.execute(new Runnable() {
+ // Configure Transport prior to initialization at which point configuration is set and
+ // cannot be updated. All further interaction should take place on the serializer for
+ // thread safety.
- @Override
- public void run() {
+ ThreadFactory transportThreadFactory = new QpidJMSThreadFactory(
+ "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
+ remoteURI.getScheme() + "://" + remoteURI.getHost() + ":" + remoteURI.getPort() + "]", true);
- connectionRequest = connectRequest;
- AmqpProvider.this.connectionInfo = connectionInfo;
+ transport.setThreadFactory(transportThreadFactory);
+ transport.setTransportListener(AmqpProvider.this);
+ transport.setMaxFrameSize(maxFrameSize);
- try {
- protonTransport.setEmitFlowEventOnSend(false);
-
- try {
- ((TransportInternal) protonTransport).setUseReadOnlyOutputBuffer(false);
- } catch (NoSuchMethodError nsme) {
- // using a version at runtime where the optimisation isn't available, ignore
- LOG.trace("Proton output buffer optimisation unavailable");
- }
+ final SSLContext sslContextOverride;
+ if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.SSL_CONTEXT)) {
+ sslContextOverride =
+ (SSLContext) connectionInfo.getExtensionMap().get(
+ JmsConnectionExtensions.SSL_CONTEXT).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
+ } else {
+ sslContextOverride = null;
+ }
- if (getMaxFrameSize() > 0) {
- protonTransport.setMaxFrameSize(getMaxFrameSize());
- protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize());
- }
+ if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE)) {
+ @SuppressWarnings({ "unchecked" })
+ Map<String, String> headers = (Map<String, String>)
+ connectionInfo.getExtensionMap().get(
+ JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
+ if (headers != null) {
+ transport.getTransportOptions().getHttpHeaders().putAll(headers);
+ }
+ }
- protonTransport.setChannelMax(getChannelMax());
- protonTransport.setIdleTimeout(idleTimeout);
- protonTransport.bind(protonConnection);
- protonConnection.collect(protonCollector);
-
- final SSLContext sslContextOverride;
- if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.SSL_CONTEXT)) {
- sslContextOverride =
- (SSLContext) connectionInfo.getExtensionMap().get(
- JmsConnectionExtensions.SSL_CONTEXT).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
- } else {
- sslContextOverride = null;
- }
+ try {
+ serializer = transport.connect(() -> {
+ this.connectionInfo = connectionInfo;
+ this.connectionRequest = connectRequest;
- transport.setTransportListener(AmqpProvider.this);
- transport.setMaxFrameSize(maxFrameSize);
+ protonTransport.setEmitFlowEventOnSend(false);
- if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE)) {
- @SuppressWarnings({ "unchecked" })
- Map<String, String> headers = (Map<String, String>)
- connectionInfo.getExtensionMap().get(
- JmsConnectionExtensions.HTTP_HEADERS_OVERRIDE).apply(connectionInfo.getConnection(), transport.getRemoteLocation());
- if (headers != null) {
- transport.getTransportOptions().getHttpHeaders().putAll(headers);
- }
- }
+ try {
+ ((TransportInternal) protonTransport).setUseReadOnlyOutputBuffer(false);
+ } catch (NoSuchMethodError nsme) {
+ // using a version at runtime where the optimisation isn't available, ignore
+ LOG.trace("Proton output buffer optimisation unavailable");
+ }
- transport.connect(sslContextOverride);
+ if (getMaxFrameSize() > 0) {
+ protonTransport.setMaxFrameSize(getMaxFrameSize());
+ protonTransport.setOutboundFrameSizeLimit(getMaxFrameSize());
+ }
- if (saslLayer) {
- Sasl sasl = protonTransport.sasl();
- sasl.client();
+ protonTransport.setChannelMax(getChannelMax());
+ protonTransport.setIdleTimeout(idleTimeout);
+ protonTransport.bind(protonConnection);
+ protonConnection.collect(protonCollector);
- String hostname = getVhost();
- if (hostname == null) {
- hostname = remoteURI.getHost();
- } else if (hostname.isEmpty()) {
- hostname = null;
- }
+ if (saslLayer) {
+ Sasl sasl = protonTransport.sasl();
+ sasl.client();
- sasl.setRemoteHostname(hostname);
- sasl.setListener(new SaslListener() {
+ String hostname = getVhost();
+ if (hostname == null) {
+ hostname = remoteURI.getHost();
+ } else if (hostname.isEmpty()) {
+ hostname = null;
+ }
- @Override
- public void onSaslMechanisms(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
- authenticator.handleSaslMechanisms(sasl, transport);
- checkSaslAuthenticationState();
- }
+ sasl.setRemoteHostname(hostname);
+ sasl.setListener(new SaslListener() {
- @Override
- public void onSaslChallenge(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
- authenticator.handleSaslChallenge(sasl, transport);
- checkSaslAuthenticationState();
- }
+ @Override
+ public void onSaslMechanisms(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+ authenticator.handleSaslMechanisms(sasl, transport);
+ checkSaslAuthenticationState();
+ }
- @Override
- public void onSaslOutcome(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
- authenticator.handleSaslOutcome(sasl, transport);
- checkSaslAuthenticationState();
- }
+ @Override
+ public void onSaslChallenge(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+ authenticator.handleSaslChallenge(sasl, transport);
+ checkSaslAuthenticationState();
+ }
- @Override
- public void onSaslInit(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
- // Server only event
- }
+ @Override
+ public void onSaslOutcome(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+ authenticator.handleSaslOutcome(sasl, transport);
+ checkSaslAuthenticationState();
+ }
- @Override
- public void onSaslResponse(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
- // Server only event
- }
- });
+ @Override
+ public void onSaslInit(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+ // Server only event
+ }
- authenticator = new AmqpSaslAuthenticator((remoteMechanisms) -> findSaslMechanism(remoteMechanisms));
+ @Override
+ public void onSaslResponse(Sasl sasl, org.apache.qpid.proton.engine.Transport transport) {
+ // Server only event
+ }
+ });
- pumpToProtonTransport();
- } else {
- connectRequest.onSuccess();
- }
- } catch (Throwable t) {
- connectionRequest.onFailure(IOExceptionSupport.create(t));
+ authenticator = new AmqpSaslAuthenticator((remoteMechanisms) -> findSaslMechanism(remoteMechanisms));
}
+ }, sslContextOverride);
+
+ // Once connected pump the transport to write the header and respond to any
+ // data that arrived at connect such as pipelined Header etc
+ serializer.execute(() -> pumpToProtonTransport());
+
+ if (!saslLayer) {
+ connectRequest.onSuccess();
}
- });
+ } catch (Throwable t) {
+ connectRequest.onFailure(IOExceptionSupport.create(t));
+ }
if (connectionInfo.getConnectTimeout() != JmsConnectionInfo.INFINITE) {
if (!connectRequest.sync(connectionInfo.getConnectTimeout(), TimeUnit.MILLISECONDS)) {
@@ -309,52 +313,61 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
if (closed.compareAndSet(false, true)) {
final ProviderFuture request = futureFactory.createUnfailableFuture();
- serializer.execute(new Runnable() {
-
- @Override
- public void run() {
- try {
- // If we are not connected then there is nothing we can do now
- // just signal success.
- if (transport == null || !transport.isConnected()) {
- request.onSuccess();
- return;
- }
-
- if (connection != null) {
- connection.close(request);
- } else {
- // If the SASL authentication occurred but failed then we don't
- // need to do an open / close
- if (authenticator != null && (!authenticator.isComplete() || !authenticator.wasSuccessful())) {
+ // Possible that the connect call failed before calling transport connect or the connect
+ // call failed and shutdown the event loop in which case we have no work to do other than
+ // to clean up the transport by closing it down.
+ if (serializer != null && !serializer.isShutdown()) {
+ try {
+ serializer.execute(() -> {
+ try {
+ // If we are not connected then there is nothing we can do now
+ // just signal success.
+ if (transport == null || !transport.isConnected()) {
request.onSuccess();
return;
}
- // Connection attempt might have been tried and failed so only perform
- // an open / close cycle if one hasn't been done already.
- if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
- AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
- builder.buildResource(request);
-
- protonConnection.setContext(builder);
+ if (connection != null) {
+ connection.close(request);
} else {
- request.onSuccess();
+ // If the SASL authentication occurred but failed then we don't
+ // need to do an open / close
+ if (authenticator != null && (!authenticator.isComplete() || !authenticator.wasSuccessful())) {
+ request.onSuccess();
+ return;
+ }
+
+ // Connection attempt might have been tried and failed so only perform
+ // an open / close cycle if one hasn't been done already.
+ if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED) {
+ AmqpClosedConnectionBuilder builder = new AmqpClosedConnectionBuilder(getProvider(), connectionInfo);
+ builder.buildResource(request);
+ protonConnection.setContext(builder);
+ } else {
+ request.onSuccess();
+ }
}
- }
- pumpToProtonTransport(request);
- } catch (Exception e) {
- LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
- } finally {
- if (nextIdleTimeoutCheck != null) {
- LOG.trace("Cancelling scheduled IdleTimeoutCheck");
- nextIdleTimeoutCheck.cancel(false);
- nextIdleTimeoutCheck = null;
+ pumpToProtonTransport(request);
+ } catch (Exception e) {
+ LOG.debug("Caught exception while closing proton connection: {}", e.getMessage());
+ } finally {
+ if (nextIdleTimeoutCheck != null) {
+ LOG.trace("Cancelling scheduled IdleTimeoutCheck");
+ nextIdleTimeoutCheck.cancel(false);
+ nextIdleTimeoutCheck = null;
+ }
}
- }
+ });
+ } catch (RejectedExecutionException rje) {
+ // Transport likely encountered some critical error on connect and the executor
+ // resource is not initialized now, in which case just ignore and continue on.
+ LOG.trace("Close of provider resources was rejected from Transport IO thread: ", rje);
+ request.onSuccess();
}
- });
+ } else {
+ request.onSuccess();
+ }
try {
if (getCloseTimeout() < 0) {
@@ -365,16 +378,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
} catch (IOException e) {
LOG.warn("Error caught while closing Provider: {}", e.getMessage() != null ? e.getMessage() : "<Unknown Error>");
} finally {
- try {
- if (transport != null) {
- try {
- transport.close();
- } catch (Exception e) {
- LOG.debug("Caught exception while closing down Transport: {}", e.getMessage());
- }
+ if (transport != null) {
+ try {
+ transport.close();
+ } catch (Exception e) {
+ LOG.debug("Caught exception while closing down Transport: {}", e.getMessage());
}
- } finally {
- ThreadPoolUtils.shutdownGraceful(serializer);
}
}
}
@@ -383,83 +392,82 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void create(final JmsResource resource, final AsyncResult request) throws IOException, JMSException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- resource.visit(new JmsResourceVistor() {
- @Override
- public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
- connection.createSession(sessionInfo, request);
- }
+ serializer.execute(() -> {
- @Override
- public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
- AmqpSession session = connection.getSession(producerInfo.getParentId());
- session.createProducer(producerInfo, request);
- }
+ try {
+ checkClosedOrFailed();
+ resource.visit(new JmsResourceVistor() {
+ @Override
+ public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
+ connection.createSession(sessionInfo, request);
+ }
- @Override
- public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
- final AmqpSession session;
+ @Override
+ public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
+ AmqpSession session = connection.getSession(producerInfo.getParentId());
+ session.createProducer(producerInfo, request);
+ }
- if (consumerInfo.isConnectionConsumer()) {
- session = connection.getConnectionSession();
- } else {
- session = connection.getSession(consumerInfo.getParentId());
- }
+ @Override
+ public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+ final AmqpSession session;
- session.createConsumer(consumerInfo, request);
+ if (consumerInfo.isConnectionConsumer()) {
+ session = connection.getConnectionSession();
+ } else {
+ session = connection.getSession(consumerInfo.getParentId());
}
- @Override
- public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
- AmqpProvider.this.connectionInfo = connectionInfo;
-
- AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
- connectionRequest = new AsyncResult() {
- @Override
- public void onSuccess() {
- fireConnectionEstablished();
- request.onSuccess();
- }
+ session.createConsumer(consumerInfo, request);
+ }
- @Override
- public void onFailure(Throwable result) {
- request.onFailure(result);
- }
+ @Override
+ public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
+ AmqpProvider.this.connectionInfo = connectionInfo;
- @Override
- public boolean isComplete() {
- return request.isComplete();
- }
- };
+ AmqpConnectionBuilder builder = new AmqpConnectionBuilder(AmqpProvider.this, connectionInfo);
+ connectionRequest = new AsyncResult() {
+ @Override
+ public void onSuccess() {
+ fireConnectionEstablished();
+ request.onSuccess();
+ }
- builder.buildResource(connectionRequest);
- }
+ @Override
+ public void onFailure(Throwable result) {
+ request.onFailure(result);
+ }
- @Override
- public void processDestination(JmsTemporaryDestination destination) throws Exception {
- if (destination.isTemporary()) {
- connection.createTemporaryDestination(destination, request);
- } else {
- request.onSuccess();
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
}
- }
+ };
- @Override
- public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
- AmqpSession session = connection.getSession(transactionInfo.getSessionId());
- session.begin(transactionInfo.getId(), request);
+ builder.buildResource(connectionRequest);
+ }
+
+ @Override
+ public void processDestination(JmsTemporaryDestination destination) throws Exception {
+ if (destination.isTemporary()) {
+ connection.createTemporaryDestination(destination, request);
+ } else {
+ request.onSuccess();
}
- });
+ }
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ @Override
+ public void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception {
+ AmqpSession session = connection.getSession(transactionInfo.getSessionId());
+ session.begin(transactionInfo.getId(), request);
+ }
+ });
+
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -467,26 +475,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void start(final JmsResource resource, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- resource.visit(new JmsDefaultResourceVisitor() {
+ serializer.execute(() -> {
- @Override
- public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
- AmqpSession session = connection.getSession(consumerInfo.getParentId());
- AmqpConsumer consumer = session.getConsumer(consumerInfo);
- consumer.start(request);
- }
- });
+ try {
+ checkClosedOrFailed();
+ resource.visit(new JmsDefaultResourceVisitor() {
+
+ @Override
+ public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+ AmqpSession session = connection.getSession(consumerInfo.getParentId());
+ AmqpConsumer consumer = session.getConsumer(consumerInfo);
+ consumer.start(request);
+ }
+ });
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -494,26 +501,25 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void stop(final JmsResource resource, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- resource.visit(new JmsDefaultResourceVisitor() {
+ serializer.execute(() -> {
- @Override
- public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
- AmqpSession session = connection.getSession(consumerInfo.getParentId());
- AmqpConsumer consumer = session.getConsumer(consumerInfo);
- consumer.stop(request);
- }
- });
+ try {
+ checkClosedOrFailed();
+ resource.visit(new JmsDefaultResourceVisitor() {
+
+ @Override
+ public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
+ AmqpSession session = connection.getSession(consumerInfo.getParentId());
+ AmqpConsumer consumer = session.getConsumer(consumerInfo);
+ consumer.stop(request);
+ }
+ });
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -521,101 +527,100 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void destroy(final JmsResource resource, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- resource.visit(new JmsDefaultResourceVisitor() {
+ serializer.execute(() -> {
- @Override
- public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
- final AmqpSession session = connection.getSession(sessionInfo.getId());
- session.close(new AsyncResult() {
- // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
- @Override
- public void onSuccess() {
- onComplete();
- request.onSuccess();
- }
+ try {
+ checkClosedOrFailed();
+ resource.visit(new JmsDefaultResourceVisitor() {
+
+ @Override
+ public void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception {
+ final AmqpSession session = connection.getSession(sessionInfo.getId());
+ session.close(new AsyncResult() {
+ // TODO: bit of a hack, but works. Similarly below for locally initiated consumer close.
+ @Override
+ public void onSuccess() {
+ onComplete();
+ request.onSuccess();
+ }
- @Override
- public void onFailure(Throwable result) {
- onComplete();
- request.onFailure(result);
- }
+ @Override
+ public void onFailure(Throwable result) {
+ onComplete();
+ request.onFailure(result);
+ }
- @Override
- public boolean isComplete() {
- return request.isComplete();
- }
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
- void onComplete() {
- // Mark the sessions resources closed, which in turn calls
- // the subscription cleanup.
- session.handleResourceClosure(AmqpProvider.this, null);
- }
- });
- }
+ void onComplete() {
+ // Mark the sessions resources closed, which in turn calls
+ // the subscription cleanup.
+ session.handleResourceClosure(AmqpProvider.this, null);
+ }
+ });
+ }
- @Override
- public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
- AmqpSession session = connection.getSession(producerInfo.getParentId());
- AmqpProducer producer = session.getProducer(producerInfo);
- producer.close(request);
- }
+ @Override
+ public void processProducerInfo(JmsProducerInfo producerInfo) throws Exception {
+ AmqpSession session = connection.getSession(producerInfo.getParentId());
+ AmqpProducer producer = session.getProducer(producerInfo);
+ producer.close(request);
+ }
- @Override
- public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
- AmqpSession session = connection.getSession(consumerInfo.getParentId());
- final AmqpConsumer consumer = session.getConsumer(consumerInfo);
- consumer.close(new AsyncResult() {
- // TODO: bit of a hack, but works. Similarly above for locally initiated session close.
- @Override
- public void onSuccess() {
- onComplete();
- request.onSuccess();
- }
+ @Override
+ public void processConsumerInfo(final JmsConsumerInfo consumerInfo) throws Exception {
+ AmqpSession session = connection.getSession(consumerInfo.getParentId());
+ final AmqpConsumer consumer = session.getConsumer(consumerInfo);
+ consumer.close(new AsyncResult() {
+ // TODO: bit of a hack, but works. Similarly above for locally initiated session close.
+ @Override
+ public void onSuccess() {
+ onComplete();
+ request.onSuccess();
+ }
- @Override
- public void onFailure(Throwable result) {
- onComplete();
- request.onFailure(result);
- }
+ @Override
+ public void onFailure(Throwable result) {
+ onComplete();
+ request.onFailure(result);
+ }
- @Override
- public boolean isComplete() {
- return request.isComplete();
- }
+ @Override
+ public boolean isComplete() {
+ return request.isComplete();
+ }
- void onComplete() {
- connection.getSubTracker().consumerRemoved(consumerInfo);
- }
- });
- }
+ void onComplete() {
+ connection.getSubTracker().consumerRemoved(consumerInfo);
+ }
+ });
+ }
- @Override
- public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
- connection.close(request);
- }
+ @Override
+ public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception {
+ connection.close(request);
+ }
- @Override
- public void processDestination(JmsTemporaryDestination destination) throws Exception {
- AmqpTemporaryDestination temporary = connection.getTemporaryDestination(destination);
- if (temporary != null) {
- temporary.close(request);
- } else {
- LOG.debug("Could not find temporary destination {} to delete.", destination);
- request.onSuccess();
- }
+ @Override
+ public void processDestination(JmsTemporaryDestination destination) throws Exception {
+ AmqpTemporaryDestination temporary = connection.getTemporaryDestination(destination);
+ if (temporary != null) {
+ temporary.close(request);
+ } else {
+ LOG.debug("Could not find temporary destination {} to delete.", destination);
+ request.onSuccess();
}
- });
+ }
+ });
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -623,18 +628,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void send(final JmsOutboundMessageDispatch envelope, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- JmsProducerId producerId = envelope.getProducerId();
- AmqpProducer producer = (AmqpProducer) producerId.getProviderHint();
- producer.send(envelope, request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ JmsProducerId producerId = envelope.getProducerId();
+ AmqpProducer producer = (AmqpProducer) producerId.getProviderHint();
+ producer.send(envelope, request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -642,19 +646,18 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void acknowledge(final JmsSessionId sessionId, final ACK_TYPE ackType, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- AmqpSession amqpSession = connection.getSession(sessionId);
- amqpSession.acknowledge(ackType);
- pumpToProtonTransport(request);
- request.onSuccess();
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ AmqpSession amqpSession = connection.getSession(sessionId);
+ amqpSession.acknowledge(ackType);
+ pumpToProtonTransport(request);
+ request.onSuccess();
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -662,29 +665,28 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void acknowledge(final JmsInboundMessageDispatch envelope, final ACK_TYPE ackType, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
+ serializer.execute(() -> {
- JmsConsumerId consumerId = envelope.getConsumerId();
- AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
+ try {
+ checkClosedOrFailed();
- consumer.acknowledge(envelope, ackType);
+ JmsConsumerId consumerId = envelope.getConsumerId();
+ AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
- if (consumer.getSession().isAsyncAck()) {
- request.onSuccess();
- pumpToProtonTransport(request);
- } else {
- pumpToProtonTransport(request, false);
- request.onSuccess();
- transport.flush();
- }
- } catch (Throwable t) {
- request.onFailure(t);
+ consumer.acknowledge(envelope, ackType);
+
+ if (consumer.getSession().isAsyncAck()) {
+ request.onSuccess();
+ pumpToProtonTransport(request);
+ } else {
+ pumpToProtonTransport(request, false);
+ request.onSuccess();
+ transport.flush();
}
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -692,18 +694,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void commit(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- AmqpSession session = connection.getSession(transactionInfo.getSessionId());
- session.commit(transactionInfo, nextTransactionId, request);
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ AmqpSession session = connection.getSession(transactionInfo.getSessionId());
+ session.commit(transactionInfo, nextTransactionId, request);
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -711,18 +712,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void rollback(final JmsTransactionInfo transactionInfo, JmsTransactionInfo nextTransactionId, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- AmqpSession session = connection.getSession(transactionInfo.getSessionId());
- session.rollback(transactionInfo, nextTransactionId, request);
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ AmqpSession session = connection.getSession(transactionInfo.getSessionId());
+ session.rollback(transactionInfo, nextTransactionId, request);
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -730,19 +730,18 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void recover(final JmsSessionId sessionId, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- AmqpSession session = connection.getSession(sessionId);
- session.recover();
- pumpToProtonTransport(request);
- request.onSuccess();
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ AmqpSession session = connection.getSession(sessionId);
+ session.recover();
+ pumpToProtonTransport(request);
+ request.onSuccess();
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -750,17 +749,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void unsubscribe(final String subscription, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- connection.unsubscribe(subscription, request);
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ connection.unsubscribe(subscription, request);
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -768,18 +766,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void pull(final JmsConsumerId consumerId, final long timeout, final AsyncResult request) throws IOException {
checkClosedOrFailed();
- serializer.execute(new Runnable() {
+ checkConnected();
- @Override
- public void run() {
- try {
- checkClosedOrFailed();
- AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
- consumer.pull(timeout, request);
- pumpToProtonTransport(request);
- } catch (Throwable t) {
- request.onFailure(t);
- }
+ serializer.execute(() -> {
+
+ try {
+ checkClosedOrFailed();
+ AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint();
+ consumer.pull(timeout, request);
+ pumpToProtonTransport(request);
+ } catch (Throwable t) {
+ request.onFailure(t);
}
});
}
@@ -792,44 +789,46 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
- @Override
- public void onData(final ByteBuf input) {
-
- // We need to retain until the serializer gets around to processing it.
- input.retain();
-
- serializer.execute(new Runnable() {
-
- @Override
- public void run() {
+ public void scheduleExecuteAndPump(Runnable task) {
+ serializer.execute(() -> {
+ try {
try {
- if (isTraceBytes()) {
- TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
- }
-
- do {
- ByteBuffer buffer = protonTransport.tail();
- int chunkSize = Math.min(buffer.remaining(), input.readableBytes());
- buffer.limit(buffer.position() + chunkSize);
- input.readBytes(buffer);
- protonTransport.process();
- } while (input.isReadable());
-
- // Free for pooled memory to be put back now.
- input.release();
-
- // Process the state changes from the latest data and then answer back
- // any pending updates to the Broker.
- processUpdates();
+ task.run();
+ } finally {
pumpToProtonTransport();
- } catch (Throwable t) {
- LOG.warn("Caught problem during data processing: {}", t.getMessage(), t);
- fireProviderException(t);
}
+ } catch (Throwable t) {
+ LOG.warn("Caught problem during task processing: {}", t.getMessage(), t);
+ fireProviderException(t);
}
});
}
+ @Override
+ public void onData(final ByteBuf input) {
+ try {
+ if (isTraceBytes()) {
+ TRACE_BYTES.info("Received: {}", ByteBufUtil.hexDump(input));
+ }
+
+ do {
+ ByteBuffer buffer = protonTransport.tail();
+ int chunkSize = Math.min(buffer.remaining(), input.readableBytes());
+ buffer.limit(buffer.position() + chunkSize);
+ input.readBytes(buffer);
+ protonTransport.process();
+ } while (input.isReadable());
+
+ // Process the state changes from the latest data and then answer back
+ // any pending updates to the Broker.
+ processUpdates();
+ pumpToProtonTransport();
+ } catch (Throwable t) {
+ LOG.warn("Caught problem during data processing: {}", t.getMessage(), t);
+ fireProviderException(t);
+ }
+ }
+
/**
* Callback method for the Transport to report connection errors. When called
* the method will queue a new task to fire the failure error back to the listener.
@@ -840,39 +839,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void onTransportError(final Throwable error) {
if (!serializer.isShutdown()) {
- serializer.execute(new Runnable() {
- @Override
- public void run() {
- LOG.info("Transport failed: {}", error.getMessage());
- if (!closed.get()) {
- // We can't send any more output, so close the transport
- protonTransport.close_head();
- fireProviderException(error);
- }
+ serializer.execute(() -> {
+ LOG.info("Transport failed: {}", error.getMessage());
+ if (!closed.get()) {
+ // We can't send any more output, so close the transport
+ protonTransport.close_head();
+ fireProviderException(error);
}
});
}
}
- public void scheduleExecuteAndPump(Runnable task) {
- serializer.execute(new Runnable() {
- @Override
- public void run() {
- try {
- try {
- task.run();
- } finally {
- pumpToProtonTransport();
- }
- } catch (Throwable t) {
- LOG.warn("Caught problem during task processing: {}", t.getMessage(), t);
-
- fireProviderException(t);
- }
- }
- });
- }
-
/**
* Callback method for the Transport to report that the underlying connection
* has closed. When called this method will queue a new task that will check for
@@ -882,15 +859,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void onTransportClosed() {
if (!serializer.isShutdown()) {
- serializer.execute(new Runnable() {
- @Override
- public void run() {
- LOG.debug("Transport connection remotely closed");
- if (!closed.get()) {
- // We can't send any more output, so close the transport
- protonTransport.close_head();
- fireProviderException(new IOException("Transport connection remotely closed."));
- }
+ serializer.execute(() -> {
+ LOG.debug("Transport connection remotely closed");
+ if (!closed.get()) {
+ // We can't send any more output, so close the transport
+ protonTransport.close_head();
+ fireProviderException(new IOException("Transport connection remotely closed."));
}
});
}
@@ -1412,14 +1386,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
*/
public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final Exception error) {
if (timeout != JmsConnectionInfo.INFINITE) {
- return serializer.schedule(new Runnable() {
-
- @Override
- public void run() {
- request.onFailure(error);
- pumpToProtonTransport();
- }
-
+ return serializer.schedule(() -> {
+ request.onFailure(error);
+ pumpToProtonTransport();
}, timeout, TimeUnit.MILLISECONDS);
}
@@ -1442,14 +1411,9 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
*/
public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, long timeout, final AmqpExceptionBuilder builder) {
if (timeout != JmsConnectionInfo.INFINITE) {
- return serializer.schedule(new Runnable() {
-
- @Override
- public void run() {
- request.onFailure(builder.createException());
- pumpToProtonTransport();
- }
-
+ return serializer.schedule(() -> {
+ request.onFailure(builder.createException());
+ pumpToProtonTransport();
}, timeout, TimeUnit.MILLISECONDS);
}
@@ -1468,8 +1432,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
- private Mechanism findSaslMechanism(String[] remoteMechanisms) throws JMSSecurityRuntimeException {
+ private void checkConnected() throws ProviderClosedException, ProviderFailedException {
+ if (serializer == null) {
+ throw new ProviderException("Transport has not been properly connected.");
+ }
+ }
+ private Mechanism findSaslMechanism(String[] remoteMechanisms) throws JMSSecurityRuntimeException {
final String username;
if (connectionInfo.getExtensionMap().containsKey(JmsConnectionExtensions.USERNAME_OVERRIDE)) {
username = (String) connectionInfo.getExtensionMap().get(
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
index 5c92b06..d9c50ed 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/Transport.java
@@ -19,6 +19,8 @@ package org.apache.qpid.jms.transports;
import java.io.IOException;
import java.net.URI;
import java.security.Principal;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import javax.net.ssl.SSLContext;
@@ -33,13 +35,19 @@ public interface Transport {
* Performs the connect operation for the implemented Transport type
* such as a TCP socket connection, SSL/TLS handshake etc.
*
+ * @param initRoutine
+ * a runnable initialization method that is executed in the context
+ * of the transport's IO thread to allow thread safe setup of resources
+ * that will be run from the transport executor service.
* @param sslContextOverride
* a user-provided SSLContext to use if establishing a secure
* connection, overrides applicable URI configuration
*
+ * @return A ScheduledThreadPoolExecutor that can run work on the Transport IO thread.
+ *
* @throws IOException if an error occurs while attempting the connect.
*/
- void connect(SSLContext sslContextOverride) throws IOException;
+ ScheduledExecutorService connect(Runnable initRoutine, SSLContext sslContextOverride) throws IOException;
/**
* @return true if transport is connected or false if the connection is down.
@@ -119,6 +127,22 @@ public interface Transport {
void setTransportListener(TransportListener listener);
/**
+ * @return the {@link ThreadFactory} used to create the IO thread for this Transport
+ */
+ ThreadFactory getThreadFactory();
+
+ /**
+ * Sets the {@link ThreadFactory} that the Transport should use when creating the Transport
+ * IO thread for processing.
+ *
+ * @param factory
+ * The {@link ThreadFactory}
+ *
+ * @throws IllegalStateException if called after a call to {@link #connect(Runnable, SSLContext)}
+ */
+ void setThreadFactory(ThreadFactory factory);
+
+ /**
* @return the TransportOptions instance that holds the configuration for this Transport.
*/
TransportOptions getTransportOptions();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
index 5df7899..e9c66b7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyTcpTransport.java
@@ -20,6 +20,8 @@ import java.io.IOException;
import java.net.URI;
import java.security.Principal;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -74,6 +76,7 @@ public class NettyTcpTransport implements Transport {
protected EventLoopGroup group;
protected Channel channel;
protected TransportListener listener;
+ protected ThreadFactory ioThreadfactory;
protected int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
private final boolean secure;
@@ -126,26 +129,27 @@ public class NettyTcpTransport implements Transport {
}
@Override
- public void connect(SSLContext sslContextOverride) throws IOException {
+ public ScheduledExecutorService connect(final Runnable initRoutine, SSLContext sslContextOverride) throws IOException {
+ if (closed.get()) {
+ throw new IllegalStateException("Transport has already been closed");
+ }
if (listener == null) {
throw new IllegalStateException("A transport listener must be set before connection attempts.");
}
- getTransportOptions().setSslContextOverride(sslContextOverride);
-
boolean useKQueue = getTransportOptions().isUseKQueue() && KQueue.isAvailable();
boolean useEpoll = getTransportOptions().isUseEpoll() && Epoll.isAvailable();
if (useKQueue) {
LOG.trace("Netty Transport using KQueue mode");
- group = new KQueueEventLoopGroup(1);
+ group = new KQueueEventLoopGroup(1, ioThreadfactory);
} else if (useEpoll) {
LOG.trace("Netty Transport using Epoll mode");
- group = new EpollEventLoopGroup(1);
+ group = new EpollEventLoopGroup(1, ioThreadfactory);
} else {
LOG.trace("Netty Transport using NIO mode");
- group = new NioEventLoopGroup(1);
+ group = new NioEventLoopGroup(1, ioThreadfactory);
}
bootstrap = new Bootstrap();
@@ -160,11 +164,19 @@ public class NettyTcpTransport implements Transport {
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel connectedChannel) throws Exception {
+ if (initRoutine != null) {
+ try {
+ initRoutine.run();
+ } catch (Throwable initError) {
+ failureCause = IOExceptionSupport.create(initError);
+ }
+ }
configureChannel(connectedChannel);
}
});
configureNetty(bootstrap, getTransportOptions());
+ getTransportOptions().setSslContextOverride(sslContextOverride);
ChannelFuture future = bootstrap.connect(getRemoteHost(), getRemotePort());
future.addListener(new ChannelFutureListener() {
@@ -202,16 +214,14 @@ public class NettyTcpTransport implements Transport {
throw failureCause;
} else {
// Connected, allow any held async error to fire now and close the transport.
- channel.eventLoop().execute(new Runnable() {
-
- @Override
- public void run() {
- if (failureCause != null) {
- channel.pipeline().fireExceptionCaught(failureCause);
- }
+ channel.eventLoop().execute(() -> {
+ if (failureCause != null) {
+ channel.pipeline().fireExceptionCaught(failureCause);
}
});
}
+
+ return group;
}
@Override
@@ -316,6 +326,20 @@ public class NettyTcpTransport implements Transport {
return maxFrameSize;
}
+ @Override
+ public ThreadFactory getThreadFactory() {
+ return ioThreadfactory;
+ }
+
+ @Override
+ public void setThreadFactory(ThreadFactory factory) {
+ if (isConnected() || channel != null) {
+ throw new IllegalStateException("Cannot set IO ThreadFactory after Transport connect");
+ }
+
+ this.ioThreadfactory = factory;
+ }
+
//----- Internal implementation details, can be overridden as needed -----//
protected String getRemoteHost() {
@@ -349,7 +373,13 @@ public class NettyTcpTransport implements Transport {
LOG.trace("Channel has gone inactive! Channel is {}", channel);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportClosed listener");
- listener.onTransportClosed();
+ if (channel.eventLoop().inEventLoop()) {
+ listener.onTransportClosed();
+ } else {
+ channel.eventLoop().execute(() -> {
+ listener.onTransportClosed();
+ });
+ }
}
}
@@ -357,10 +387,20 @@ public class NettyTcpTransport implements Transport {
LOG.trace("Exception on channel! Channel is {}", channel);
if (connected.compareAndSet(true, false) && !closed.get()) {
LOG.trace("Firing onTransportError listener");
- if (failureCause != null) {
- listener.onTransportError(failureCause);
+ if (channel.eventLoop().inEventLoop()) {
+ if (failureCause != null) {
+ listener.onTransportError(failureCause);
+ } else {
+ listener.onTransportError(cause);
+ }
} else {
- listener.onTransportError(cause);
+ channel.eventLoop().execute(() -> {
+ if (failureCause != null) {
+ listener.onTransportError(failureCause);
+ } else {
+ listener.onTransportError(cause);
+ }
+ });
}
} else {
// Hold the first failure for later dispatch if connect succeeds.
@@ -498,8 +538,15 @@ public class NettyTcpTransport implements Transport {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
- LOG.trace("New data read: {} bytes incoming: {}", buffer.readableBytes(), buffer);
- listener.onData(buffer);
+ LOG.trace("New data read: {} bytes incomsing: {}", buffer.readableBytes(), buffer);
+ // Avoid all doubts to the contrary
+ if (channel.eventLoop().inEventLoop()) {
+ listener.onData(buffer);
+ } else {
+ channel.eventLoop().execute(() -> {
+ listener.onData(buffer);
+ });
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 53f3c6a..bcafa45 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.security.ProviderException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -377,27 +378,32 @@ public class AmqpProviderTest extends QpidJmsTestCase {
@Test(timeout = 20000)
public void testErrorDuringCreateResourceFailsRequest() throws Exception {
- doErrorDuringOperationFailsRequesTTestImpl(Op.CREATE);
+ doErrorDuringOperationFailsRequestTestImpl(Op.CREATE);
}
@Test(timeout = 20000)
public void testErrorDuringStartResourceFailsRequest() throws Exception {
- doErrorDuringOperationFailsRequesTTestImpl(Op.START);
+ doErrorDuringOperationFailsRequestTestImpl(Op.START);
}
@Test(timeout = 20000)
public void testErrorDuringStopResourceFailsRequest() throws Exception {
- doErrorDuringOperationFailsRequesTTestImpl(Op.STOP);
+ doErrorDuringOperationFailsRequestTestImpl(Op.STOP);
}
@Test(timeout = 20000)
public void testErrorDuringDestroyResourceFailsRequest() throws Exception {
- doErrorDuringOperationFailsRequesTTestImpl(Op.DESTROY);
+ doErrorDuringOperationFailsRequestTestImpl(Op.DESTROY);
}
- private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) throws Exception {
+ private void doErrorDuringOperationFailsRequestTestImpl(Op operation) throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
+ testPeer.expectSaslAnonymous();
+ testPeer.expectOpen();
+ testPeer.expectClose();
+
provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
+ provider.connect(connectionInfo);
final AtomicBoolean errorThrown = new AtomicBoolean();
JmsResource resourceInfo = new JmsAbstractResource() {
@@ -449,6 +455,94 @@ public class AmqpProviderTest extends QpidJmsTestCase {
}
}
+ @Test(timeout = 20000)
+ public void testCreateResourceFailsWhenNoConnectCalled() throws Exception {
+ doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.CREATE);
+ }
+
+ @Test(timeout = 20000)
+ public void testStartResourceFailsWhenNoConnectCalled() throws Exception {
+ doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.START);
+ }
+
+ @Test(timeout = 20000)
+ public void testStopResourceFailsWhenNoConnectCalled() throws Exception {
+ doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.STOP);
+ }
+
+ @Test(timeout = 20000)
+ public void testDestroyResourceFailsWhenNoConnectCalled() throws Exception {
+ doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op.DESTROY);
+ }
+
+ private void doErrorDuringOperationFailsWhenNoConnectCalledTestImpl(Op operation) throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer()) {
+
+ provider = new AmqpProviderFactory().createProvider(getPeerURI(testPeer));
+
+ final AtomicBoolean errorThrown = new AtomicBoolean();
+ JmsResource resourceInfo = new JmsAbstractResource() {
+ @Override
+ public void visit(JmsResourceVistor visitor) {
+ errorThrown.set(true);
+ throw new Error("Deliberate error for testing");
+ }
+
+ @Override
+ public JmsResourceId getId() {
+ return new JmsAbstractResourceId() {
+ };
+ }
+ };
+
+ assertFalse("Error should not have been thrown", errorThrown.get());
+ ProviderFuture request = provider.newProviderFuture();
+
+ switch(operation) {
+ case CREATE:
+ try {
+ provider.create(resourceInfo, request);
+ fail("Request should have failed");
+ } catch (ProviderException e) {
+ // Expected
+ }
+ break;
+ case START:
+ try {
+ provider.start(resourceInfo, request);
+ fail("Request should have failed");
+ } catch (ProviderException e) {
+ // Expected
+ }
+ break;
+ case STOP:
+ try {
+ provider.stop(resourceInfo, request);
+ fail("Request should have failed");
+ } catch (ProviderException e) {
+ // Expected
+ }
+ break;
+ case DESTROY:
+ try {
+ provider.destroy(resourceInfo, request);
+ fail("Request should have failed");
+ } catch (ProviderException e) {
+ // Expected
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected operation given");
+ }
+
+ assertFalse("Error should not have been thrown", errorThrown.get());
+
+ provider.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
private JmsConnectionInfo createConnectionInfo() {
JmsConnectionId connectionId = new JmsConnectionId(connectionIdGenerator.generateId());
JmsConnectionInfo connectionInfo = new JmsConnectionInfo(connectionId);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
index 3c78f16..b8ef700 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettyOpenSslTransportTest.java
@@ -77,7 +77,7 @@ public class NettyOpenSslTransportTest extends NettySslTransportTest {
Transport transport = createTransport(serverLocation, testListener, options);
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -125,7 +125,7 @@ public class NettyOpenSslTransportTest extends NettySslTransportTest {
Transport transport = createTransport(serverLocation, testListener, options);
try {
- transport.connect(sslContext);
+ transport.connect(null, sslContext);
LOG.info("Connected to server:{} as expected.", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/4314482d/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
index f5d71be..9b4f47a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySslTransportTest.java
@@ -84,7 +84,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, createClientOptionsWithoutTrustStore(false));
try {
- transport.connect(null);
+ transport.connect(null, null);
fail("Should not have connected to the server: " + serverLocation);
} catch (Exception e) {
LOG.info("Connection failed to untrusted test server: {}", serverLocation);
@@ -116,7 +116,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, options);
try {
- transport.connect(null);
+ transport.connect(null, null);
fail("Should not have connected to the server: " + serverLocation);
} catch (Exception e) {
LOG.info("Connection failed to untrusted test server: {}", serverLocation);
@@ -138,7 +138,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, createClientOptionsWithoutTrustStore(true));
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connection established to untrusted test server: {}", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -168,7 +168,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
NettyTcpTransport transport = createTransport(serverLocation, testListener, clientOptions);
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connection established to test server: {}", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -208,7 +208,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
NettyTcpTransport transport = createTransport(serverLocation, testListener, clientOptions);
try {
- transport.connect(null);
+ transport.connect(null, null);
LOG.info("Connection established to test server: {}", serverLocation);
} catch (Exception e) {
fail("Should have connected to the server at " + serverLocation + " but got exception: " + e);
@@ -262,7 +262,7 @@ public class NettySslTransportTest extends NettyTcpTransportTest {
Transport transport = createTransport(serverLocation, testListener, clientOptions);
try {
- transport.connect(null);
+ transport.connect(null, null);
if (verifyHost) {
fail("Should not have connected to the server: " + serverLocation);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org