You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/07/18 16:41:49 UTC
[1/2] qpid-jms git commit: QPIDJMS-404 Add variants of ProviderFuture
for perf tuning
Repository: qpid-jms
Updated Branches:
refs/heads/master 8b200671e -> 264a9a9b6
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
index 19b8d25..e5291d9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.jms.meta.JmsProducerInfo;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.provider.DefaultProviderListener;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.test.Wait;
import org.junit.After;
import org.junit.Before;
@@ -61,6 +62,8 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(FailoverProviderTest.class);
+ private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap());
+
private List<URI> uris;
private FailoverProvider provider;
private JmsConnectionInfo connection;
@@ -91,7 +94,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testCreateProviderOnlyUris() {
- provider = new FailoverProvider(uris);
+ provider = new FailoverProvider(uris, futuresFactory);
assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
assertNull(provider.getRemoteURI());
assertNotNull(provider.getNestedOptions());
@@ -103,7 +106,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
Map<String, String> options = new HashMap<String, String>();
options.put("transport.tcpNoDelay", "true");
- provider = new FailoverProvider(options);
+ provider = new FailoverProvider(options, futuresFactory);
assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
assertNull(provider.getRemoteURI());
assertNotNull(provider.getNestedOptions());
@@ -113,7 +116,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testCreateProviderWithNestedOptions() {
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
assertNull(provider.getRemoteURI());
assertNotNull(provider.getNestedOptions());
@@ -122,7 +125,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testProviderListener() {
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
assertNull(provider.getProviderListener());
provider.setProviderListener(new DefaultProviderListener());
assertNotNull(provider.getProviderListener());
@@ -130,7 +133,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testGetRemoteURI() throws Exception {
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
assertNull(provider.getRemoteURI());
provider.connect(connection);
@@ -145,7 +148,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testToString() throws Exception {
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
assertNotNull(provider.toString());
provider.connect(connection);
@@ -161,7 +164,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testConnectToMock() throws Exception {
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
assertNull(provider.getRemoteURI());
@@ -177,7 +180,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
provider.connect(connection);
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.create(createConnectionInfo(), request);
request.sync(10, TimeUnit.SECONDS);
@@ -192,7 +195,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testCannotStartWithoutListener() throws Exception {
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
assertNull(provider.getRemoteURI());
provider.connect(connection);
@@ -421,7 +424,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
final long SEND_TIMEOUT = TimeUnit.SECONDS.toMillis(6);
final long REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(7);
- provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+ provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
provider.setProviderListener(new DefaultProviderListener() {
@Override
@@ -439,7 +442,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
connectionInfo.setSendTimeout(SEND_TIMEOUT);
connectionInfo.setRequestTimeout(REQUEST_TIMEOUT);
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.create(connectionInfo, request);
request.sync();
@@ -450,13 +453,13 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testAmqpOpenServerListActionDefault() {
- provider = new FailoverProvider(uris);
+ provider = new FailoverProvider(uris, futuresFactory);
assertEquals("REPLACE", provider.getAmqpOpenServerListAction());
}
@Test(timeout = 30000)
public void testSetGetAmqpOpenServerListAction() {
- provider = new FailoverProvider(uris);
+ provider = new FailoverProvider(uris, futuresFactory);
String action = "ADD";
assertFalse(action.equals(provider.getAmqpOpenServerListAction()));
@@ -466,7 +469,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
@Test(timeout = 30000)
public void testSetInvalidAmqpOpenServerListActionThrowsIAE() {
- provider = new FailoverProvider(uris);
+ provider = new FailoverProvider(uris, futuresFactory);
try {
provider.setAmqpOpenServerListAction("invalid");
fail("no exception was thrown");
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index 7a19867..565d1f0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -43,7 +43,9 @@ import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderClosedException;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
import org.apache.qpid.jms.util.ThreadPoolUtils;
import org.slf4j.Logger;
@@ -66,6 +68,7 @@ public class MockProvider implements Provider {
private final ScheduledThreadPoolExecutor serializer;
private final AtomicBoolean closed = new AtomicBoolean();
private final MockRemotePeer context;
+ private final ProviderFutureFactory futureFactory;
private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
@@ -73,11 +76,12 @@ public class MockProvider implements Provider {
private MockProviderListener eventListener;
private ProviderListener listener;
- public MockProvider(URI remoteURI, MockProviderConfiguration configuration, MockRemotePeer context) {
+ public MockProvider(URI remoteURI, MockProviderConfiguration configuration, MockRemotePeer context, ProviderFutureFactory futureFactory) {
this.remoteURI = remoteURI;
this.configuration = configuration;
this.context = context;
this.stats = new MockProviderStats(context != null ? context.getContextStats() : null);
+ this.futureFactory = futureFactory;
serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@@ -129,7 +133,7 @@ public class MockProvider implements Provider {
if (closed.compareAndSet(false, true)) {
stats.recordCloseAttempt();
- final ProviderFuture request = new ProviderFuture();
+ final ProviderFuture request = futureFactory.createFuture();
serializer.execute(new Runnable() {
@Override
@@ -513,6 +517,16 @@ public class MockProvider implements Provider {
this.connectTimeout = connectTimeout;
}
+ @Override
+ public ProviderFuture newProviderFuture() {
+ return futureFactory.createFuture();
+ }
+
+ @Override
+ public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+ return futureFactory.createFuture(synchronization);
+ }
+
//----- Implementation details -------------------------------------------//
private void checkClosed() throws ProviderClosedException {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
index 75c3293..410a5c7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Map;
import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.util.PropertyUtil;
/**
@@ -29,11 +30,30 @@ public class MockProviderFactory extends ProviderFactory {
@Override
public MockProvider createProvider(URI remoteURI) throws Exception {
+ return createProvider(remoteURI, null);
+ }
+
+ @Override
+ public MockProvider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
- Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "mock.");
+ Map<String, String> mockProviderOptions = PropertyUtil.filterProperties(map, "mock.");
- remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+ Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider.");
+ // If we have been given a futures factory to use then we ignore any URI options indicating
+ // what to create and just go with what we are given.
+ if (futureFactory == null) {
+ // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+ futureFactory = ProviderFutureFactory.create(providerOptions);
+ if (!providerOptions.isEmpty()) {
+ String msg = ""
+ + " Not all Provider options could be applied during Mock Provider creation."
+ + " Check the options are spelled correctly."
+ + " Unused parameters=[" + providerOptions + "]."
+ + " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+ }
MockProviderConfiguration configuration = new MockProviderConfiguration();
MockRemotePeer remote = MockRemotePeer.INSTANCE;
@@ -41,7 +61,7 @@ public class MockProviderFactory extends ProviderFactory {
remote.getContextStats().recordProviderCreated();
}
- Map<String, String> unused = PropertyUtil.setProperties(configuration, providerOptions);
+ Map<String, String> unused = PropertyUtil.setProperties(configuration, mockProviderOptions);
if (!unused.isEmpty()) {
String msg = ""
+ " Not all provider options could be set on the " + getName()
@@ -51,7 +71,7 @@ public class MockProviderFactory extends ProviderFactory {
throw new IllegalArgumentException(msg);
}
- return new MockProvider(remoteURI, configuration, remote);
+ return new MockProvider(PropertyUtil.replaceQuery(remoteURI, map), configuration, remote, futureFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
index 7c3672a..b857994 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.provider.failover.FailoverProvider;
import org.apache.qpid.jms.provider.failover.FailoverProviderFactory;
import org.apache.qpid.jms.util.PropertyUtil;
@@ -45,8 +46,14 @@ public class DiscoveryProviderFactory extends ProviderFactory {
*/
public static final String DISCOVERY_DISCOVERED_OPTION_PREFIX_ADON = "discovered.";
+
@Override
public Provider createProvider(URI remoteURI) throws Exception {
+ return createProvider(remoteURI, null);
+ }
+
+ @Override
+ public Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
CompositeData composite = URISupport.parseComposite(remoteURI);
Map<String, String> options = composite.getParameters();
@@ -67,8 +74,24 @@ public class DiscoveryProviderFactory extends ProviderFactory {
nestedOptions.putAll(failoverNestedOptions);
nestedOptions.putAll(discoveredOptions);
+ Map<String, String> providerOptions = PropertyUtil.filterProperties(options, "provider.");
+ // If we have been given a futures factory to use then we ignore any URI options indicating
+ // what to create and just go with what we are given.
+ if (futureFactory == null) {
+ // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+ futureFactory = ProviderFutureFactory.create(providerOptions);
+ if (!providerOptions.isEmpty()) {
+ String msg = ""
+ + " Not all Provider options could be applied during Failover Provider creation."
+ + " Check the options are spelled correctly."
+ + " Unused parameters=[" + providerOptions + "]."
+ + " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
// Failover will apply the nested options to each URI while attempting to connect.
- FailoverProvider failover = new FailoverProvider(nestedOptions);
+ FailoverProvider failover = new FailoverProvider(nestedOptions, futureFactory);
Map<String, String> leftOverDiscoveryOptions = PropertyUtil.setProperties(failover, mainOptions);
DiscoveryProvider discovery = new DiscoveryProvider(remoteURI, failover);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: QPIDJMS-404 Add variants of ProviderFuture
for perf tuning
Posted by ta...@apache.org.
QPIDJMS-404 Add variants of ProviderFuture for perf tuning
Adds three variations on ProviderFuture that allow for tuning on
platforms that don't benefit from the spin / wait pattern used in
the current implementation and default to using a variant that does
not park on windows to avoid unpredictably long parks that result
in performance decreases due to missing the event completion.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/264a9a9b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/264a9a9b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/264a9a9b
Branch: refs/heads/master
Commit: 264a9a9b6c5d8d8c11a995b7b02289b2938d77ba
Parents: 8b20067
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 18 12:11:53 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jul 18 12:37:35 2018 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 34 ++--
.../qpid/jms/JmsLocalTransactionContext.java | 2 +-
.../org/apache/qpid/jms/JmsMessageConsumer.java | 4 +-
.../org/apache/qpid/jms/JmsMessageProducer.java | 2 +-
.../java/org/apache/qpid/jms/JmsSession.java | 2 +-
.../jms/provider/BalancedProviderFuture.java | 149 ++++++++++++++++
.../provider/ConservativeProviderFuture.java | 128 ++++++++++++++
.../jms/provider/ProgressiveProviderFuture.java | 167 ++++++++++++++++++
.../org/apache/qpid/jms/provider/Provider.java | 22 +++
.../qpid/jms/provider/ProviderFactory.java | 34 +++-
.../qpid/jms/provider/ProviderFuture.java | 165 ++---------------
.../jms/provider/ProviderFutureFactory.java | 176 +++++++++++++++++++
.../qpid/jms/provider/ProviderWrapper.java | 10 ++
.../qpid/jms/provider/amqp/AmqpProvider.java | 30 ++--
.../jms/provider/amqp/AmqpProviderFactory.java | 32 +++-
.../jms/provider/failover/FailoverProvider.java | 28 ++-
.../failover/FailoverProviderFactory.java | 24 ++-
.../jms/provider/ProviderFutureFactoryTest.java | 105 +++++++++++
.../qpid/jms/provider/ProviderFutureTest.java | 44 +++--
.../jms/provider/WrappedAsyncResultTest.java | 9 +-
.../jms/provider/amqp/AmqpProviderTest.java | 6 +-
.../failover/FailoverProviderClosedTest.java | 28 +--
.../provider/failover/FailoverProviderTest.java | 31 ++--
.../qpid/jms/provider/mock/MockProvider.java | 18 +-
.../jms/provider/mock/MockProviderFactory.java | 28 ++-
.../discovery/DiscoveryProviderFactory.java | 25 ++-
26 files changed, 1061 insertions(+), 242 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 7d09a9c..bb456fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -214,7 +214,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
if (isConnected() && !isFailed()) {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
requests.put(request, request);
try {
provider.destroy(connectionInfo, request);
@@ -684,7 +684,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.create(resource, request);
@@ -705,7 +705,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.start(resource, request);
@@ -726,7 +726,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.stop(resource, request);
@@ -747,7 +747,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.destroy(resource, request);
@@ -764,7 +764,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.send(envelope, request);
@@ -785,7 +785,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
provider.acknowledge(envelope, ackType, request);
request.sync();
} catch (Exception ioe) {
@@ -801,7 +801,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
provider.acknowledge(sessionId, ackType, request);
request.sync();
} catch (Exception ioe) {
@@ -817,7 +817,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.unsubscribe(name, request);
@@ -838,7 +838,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.commit(transactionInfo, nextTransactionId, request);
@@ -859,7 +859,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.rollback(transactionInfo, nextTransactionId, request);
@@ -880,7 +880,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.recover(sessionId, request);
@@ -901,7 +901,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
checkClosedOrFailed();
try {
- ProviderFuture request = new ProviderFuture(synchronization);
+ ProviderFuture request = provider.newProviderFuture(synchronization);
requests.put(request, request);
try {
provider.pull(consumerId, timeout, request);
@@ -1256,12 +1256,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
public void onConnectionRecovery(Provider provider) throws Exception {
LOG.debug("Connection {} is starting recovery.", connectionInfo.getId());
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.create(connectionInfo, request);
request.sync();
for (JmsTemporaryDestination tempDestination : tempDestinations.values()) {
- request = new ProviderFuture();
+ request = provider.newProviderFuture();
provider.create(tempDestination, request);
request.sync();
}
@@ -1269,7 +1269,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
if (!consumerInfo.isClosed()) {
- request = new ProviderFuture();
+ request = provider.newProviderFuture();
provider.create(consumerInfo, request);
request.sync();
}
@@ -1290,7 +1290,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
if (!consumerInfo.isClosed()) {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.start(consumerInfo, request);
request.sync();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index c60c5e6..45bd10a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -350,7 +350,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
// current transaction we must mark it as in-doubt so that a commit attempt
// will then roll it back.
transactionInfo = getNextTransactionInfo();
- ProviderFuture request = new ProviderFuture(new ProviderSynchronization() {
+ ProviderFuture request = provider.newProviderFuture(new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index e922679..d444e77 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -651,7 +651,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
protected void onConnectionRecovery(Provider provider) throws Exception {
if (!consumerInfo.isClosed()) {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
try {
provider.create(consumerInfo, request);
request.sync();
@@ -667,7 +667,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
protected void onConnectionRecovered(Provider provider) throws Exception {
if (!consumerInfo.isClosed()) {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.start(consumerInfo, request);
request.sync();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 39f1888..cab1490 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -357,7 +357,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
protected void onConnectionRecovery(Provider provider) throws Exception {
if (!producerInfo.isClosed()) {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
try {
provider.create(producerInfo, request);
request.sync();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 9711728..e3dd65f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -1337,7 +1337,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
protected void onConnectionRecovery(Provider provider) throws Exception {
if (!sessionInfo.isClosed()) {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.create(sessionInfo, request);
request.sync();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java
new file mode 100644
index 0000000..846a796
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * A more balanced implementation of a ProviderFuture that works better on some
+ * platforms such as windows where the thread park and atomic operations used by
+ * a more aggressive implementation could result in poor performance.
+ */
+public class BalancedProviderFuture extends ProviderFuture {
+
+ // Using a progressive wait strategy helps to avoid wait happening before
+ // completion and avoid using expensive thread signalling
+ private static final int SPIN_COUNT = 10;
+ private static final int YIELD_COUNT = 100;
+
+ public BalancedProviderFuture() {
+ this(null);
+ }
+
+ public BalancedProviderFuture(ProviderSynchronization synchronization) {
+ super(synchronization);
+ }
+
+ @Override
+ public boolean sync(long amount, TimeUnit unit) throws IOException {
+ try {
+ if (isComplete() || amount == 0) {
+ failOnError();
+ return true;
+ }
+
+ final long timeout = unit.toNanos(amount);
+ long maxParkNanos = timeout / 8;
+ maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+ final long startTime = System.nanoTime();
+ int idleCount = 0;
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ while (true) {
+ final long elapsed = System.nanoTime() - startTime;
+ final long diff = elapsed - timeout;
+
+ if (diff >= 0) {
+ failOnError();
+ return isComplete();
+ }
+
+ if (isComplete()) {
+ failOnError();
+ return true;
+ }
+
+ if (idleCount < SPIN_COUNT) {
+ idleCount++;
+ } else if (idleCount < YIELD_COUNT) {
+ Thread.yield();
+ idleCount++;
+ } else {
+ synchronized (this) {
+ if (isComplete()) {
+ failOnError();
+ return true;
+ }
+
+ waiting++;
+ try {
+ wait(-diff / 1000000, (int) (-diff % 1000000));
+ } finally {
+ waiting--;
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ int idleCount = 0;
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ while (true) {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ if (idleCount < SPIN_COUNT) {
+ idleCount++;
+ } else if (idleCount < YIELD_COUNT) {
+ Thread.yield();
+ idleCount++;
+ } else {
+ synchronized (this) {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ waiting++;
+ try {
+ wait();
+ } finally {
+ waiting--;
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java
new file mode 100644
index 0000000..e055130
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * A more conservative implementation of a ProviderFuture that is better on some
+ * platforms or resource constrained hardware where high CPU usage can be more
+ * counter productive than other variants that might spin or otherwise avoid
+ * entry into states requiring thread signalling.
+ */
+public class ConservativeProviderFuture extends ProviderFuture {
+
+ public ConservativeProviderFuture() {
+ this(null);
+ }
+
+ public ConservativeProviderFuture(ProviderSynchronization synchronization) {
+ super(synchronization);
+ }
+
+ @Override
+ public boolean sync(long amount, TimeUnit unit) throws IOException {
+ try {
+ if (isComplete() || amount == 0) {
+ failOnError();
+ return true;
+ }
+
+ final long timeout = unit.toNanos(amount);
+ long maxParkNanos = timeout / 8;
+ maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+ final long startTime = System.nanoTime();
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ while (true) {
+ final long elapsed = System.nanoTime() - startTime;
+ final long diff = elapsed - timeout;
+
+ if (diff >= 0) {
+ failOnError();
+ return isComplete();
+ }
+
+ if (isComplete()) {
+ failOnError();
+ return true;
+ }
+
+ synchronized (this) {
+ if (isComplete()) {
+ failOnError();
+ return true;
+ }
+
+ waiting++;
+ try {
+ wait(-diff / 1000000, (int) (-diff % 1000000));
+ } finally {
+ waiting--;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ while (true) {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ synchronized (this) {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ waiting++;
+ try {
+ wait();
+ } finally {
+ waiting--;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java
new file mode 100644
index 0000000..bd4da7f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * An optimized version of a ProviderFuture that makes use of spin waits and other
+ * methods of reacting to asynchronous completion in a more timely manner.
+ */
+public class ProgressiveProviderFuture extends ProviderFuture {
+
+ // Using a progressive wait strategy helps to avoid wait happening before
+ // completion and avoid using expensive thread signaling
+ private static final int SPIN_COUNT = 10;
+ private static final int YIELD_COUNT = 100;
+ private static final int TINY_PARK_COUNT = 1000;
+ private static final int TINY_PARK_NANOS = 1;
+ private static final int SMALL_PARK_COUNT = 101_000;
+ private static final int SMALL_PARK_NANOS = 10_000;
+
+ public ProgressiveProviderFuture() {
+ this(null);
+ }
+
+ public ProgressiveProviderFuture(ProviderSynchronization synchronization) {
+ super(synchronization);
+ }
+
+ @Override
+ public boolean sync(long amount, TimeUnit unit) throws IOException {
+ try {
+ if (isComplete() || amount == 0) {
+ failOnError();
+ return true;
+ }
+
+ final long timeout = unit.toNanos(amount);
+ long maxParkNanos = timeout / 8;
+ maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+ final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS);
+ final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS);
+ final long startTime = System.nanoTime();
+ int idleCount = 0;
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ while (true) {
+ final long elapsed = System.nanoTime() - startTime;
+ final long diff = elapsed - timeout;
+
+ if (diff >= 0) {
+ failOnError();
+ return isComplete();
+ }
+
+ if (isComplete()) {
+ failOnError();
+ return true;
+ }
+
+ if (idleCount < SPIN_COUNT) {
+ idleCount++;
+ } else if (idleCount < YIELD_COUNT) {
+ Thread.yield();
+ idleCount++;
+ } else if (idleCount < TINY_PARK_COUNT) {
+ LockSupport.parkNanos(tinyParkNanos);
+ idleCount++;
+ } else if (idleCount < SMALL_PARK_COUNT) {
+ LockSupport.parkNanos(smallParkNanos);
+ idleCount++;
+ } else {
+ synchronized (this) {
+ if (isComplete()) {
+ failOnError();
+ return true;
+ }
+
+ waiting++;
+ try {
+ wait(-diff / 1000000, (int) (-diff % 1000000));
+ } finally {
+ waiting--;
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ }
+
+ @Override
+ public void sync() throws IOException {
+ try {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ int idleCount = 0;
+
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+
+ while (true) {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ if (idleCount < SPIN_COUNT) {
+ idleCount++;
+ } else if (idleCount < YIELD_COUNT) {
+ Thread.yield();
+ idleCount++;
+ } else if (idleCount < TINY_PARK_COUNT) {
+ LockSupport.parkNanos(TINY_PARK_NANOS);
+ idleCount++;
+ } else if (idleCount < SMALL_PARK_COUNT) {
+ LockSupport.parkNanos(SMALL_PARK_NANOS);
+ idleCount++;
+ } else {
+ synchronized (this) {
+ if (isComplete()) {
+ failOnError();
+ return;
+ }
+
+ waiting++;
+ try {
+ wait();
+ } finally {
+ waiting--;
+ }
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw IOExceptionSupport.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index a61cb34..85a626d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -340,6 +340,27 @@ public interface Provider {
JmsMessageFactory getMessageFactory();
/**
+ * Gets a ProviderFuture instance from the Provider for use in performing Provider calls
+ * that require an asynchronous completion to know when the call to the provider has succeeded
+ * or failed.
+ *
+ * @return a ProviderFuture for use in calling Provider methods that require a completion object.
+ */
+ ProviderFuture newProviderFuture();
+
+ /**
+ * Gets a ProviderFuture instance from the Provider for use in performing Provider calls
+ * that require an asynchronous completion to know when the call to the provider has succeeded
+ * or failed.
+ *
+ * @param synchronization
+ * A {@link ProviderSynchronization} to assign to the resulting {@link ProviderFuture}.
+ *
+ * @return a ProviderFuture for use in calling Provider methods that require a completion object.
+ */
+ ProviderFuture newProviderFuture(ProviderSynchronization synchronization);
+
+ /**
* Sets the listener of events from this Provider instance.
*
* @param listener
@@ -353,4 +374,5 @@ public interface Provider {
* @return the currently set ProviderListener instance.
*/
ProviderListener getProviderListener();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
index 19a41db..6341e75 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
@@ -48,6 +48,21 @@ public abstract class ProviderFactory {
public abstract Provider createProvider(URI remoteURI) throws Exception;
/**
+ * Creates an instance of the given AsyncProvider and configures it using the
+ * properties set on the given remote broker URI.
+ *
+ * @param remoteURI
+ * The URI used to connect to a remote Broker.
+ * @param futureFactory
+ * The {@link ProviderFutureFactory} to use when creating the new {@link Provider}.
+ *
+ * @return a new AsyncProvider instance.
+ *
+ * @throws Exception if an error occurs while creating the Provider instance.
+ */
+ public abstract Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception;
+
+ /**
* @return the name of this Provider.
*/
public abstract String getName();
@@ -64,11 +79,28 @@ public abstract class ProviderFactory {
* @throws Exception if an error occurs while creating the AsyncProvider instance.
*/
public static Provider create(URI remoteURI) throws Exception {
+ return create(remoteURI, null);
+ }
+
+ /**
+ * Static create method that performs the ProviderFactory search and handles the
+ * configuration and setup.
+ *
+ * @param remoteURI
+ * the URI of the remote peer.
+ * @param futureFactory
+ * the {@link ProviderFutureFactory} to use when building the new {@link Provider}.
+ *
+ * @return a new AsyncProvider instance that is ready for use.
+ *
+ * @throws Exception if an error occurs while creating the AsyncProvider instance.
+ */
+ public static Provider create(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
Provider result = null;
try {
ProviderFactory factory = findProviderFactory(remoteURI);
- result = factory.createProvider(remoteURI);
+ result = factory.createProvider(remoteURI, futureFactory);
} catch (Exception ex) {
LOG.error("Failed to create Provider instance for {}, due to: {}", remoteURI.getScheme(), ex);
LOG.trace("Error: ", ex);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
index e62db63..75805a6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
@@ -19,39 +19,28 @@ package org.apache.qpid.jms.provider;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
import org.apache.qpid.jms.util.IOExceptionSupport;
/**
* Asynchronous Provider Future class.
*/
-public class ProviderFuture implements AsyncResult {
+public abstract class ProviderFuture implements AsyncResult {
- // Using a progressive wait strategy helps to avoid await happening before countDown
- // and avoids expensive thread signaling
- private static final int SPIN_COUNT = 10;
- private static final int YIELD_COUNT = 100;
- private static final int TINY_PARK_COUNT = 1000;
- private static final int TINY_PARK_NANOS = 1;
- private static final int SMALL_PARK_COUNT = 101_000;
- private static final int SMALL_PARK_NANOS = 10_000;
+ protected final ProviderSynchronization synchronization;
// States used to track progress of this future
- private static final int INCOMPLETE = 0;
- private static final int COMPLETING = 1;
- private static final int SUCCESS = 2;
- private static final int FAILURE = 3;
+ protected static final int INCOMPLETE = 0;
+ protected static final int COMPLETING = 1;
+ protected static final int SUCCESS = 2;
+ protected static final int FAILURE = 3;
- private static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state");
+ protected static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER =
+ AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state");
private volatile int state = INCOMPLETE;
- private Throwable error;
-
- private int waiting;
-
- private final ProviderSynchronization synchronization;
+ protected Throwable error;
+ protected int waiting;
public ProviderFuture() {
this(null);
@@ -102,6 +91,13 @@ public class ProviderFuture implements AsyncResult {
}
/**
+ * Waits for a response to some Provider requested operation.
+ *
+ * @throws IOException if an error occurs while waiting for the response.
+ */
+ public abstract void sync() throws IOException;
+
+ /**
* Timed wait for a response to a Provider operation.
*
* @param amount
@@ -114,132 +110,9 @@ public class ProviderFuture implements AsyncResult {
*
* @throws IOException if an error occurs while waiting for the response.
*/
- public boolean sync(long amount, TimeUnit unit) throws IOException {
- try {
- if (isComplete() || amount == 0) {
- failOnError();
- return true;
- }
-
- final Thread currentThread = Thread.currentThread();
- final long timeout = unit.toNanos(amount);
- long maxParkNanos = timeout / 8;
- maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
- final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS);
- final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS);
- final long startTime = System.nanoTime();
- int idleCount = 0;
-
- while (true) {
- if (currentThread.isInterrupted()) {
- throw new InterruptedException();
- }
-
- final long elapsed = System.nanoTime() - startTime;
- final long diff = elapsed - timeout;
-
- if (diff >= 0) {
- failOnError();
- return isComplete();
- }
-
- if (isComplete()) {
- failOnError();
- return true;
- }
-
- if (idleCount < SPIN_COUNT) {
- idleCount++;
- } else if (idleCount < YIELD_COUNT) {
- Thread.yield();
- idleCount++;
- } else if (idleCount < TINY_PARK_COUNT) {
- LockSupport.parkNanos(tinyParkNanos);
- idleCount++;
- } else if (idleCount < SMALL_PARK_COUNT) {
- LockSupport.parkNanos(smallParkNanos);
- idleCount++;
- } else {
- synchronized (this) {
- if (isComplete()) {
- failOnError();
- return true;
- }
-
- waiting++;
- try {
- wait(-diff / 1000000, (int) (-diff % 1000000));
- } finally {
- waiting--;
- }
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw IOExceptionSupport.create(e);
- }
- }
-
- /**
- * Waits for a response to some Provider requested operation.
- *
- * @throws IOException if an error occurs while waiting for the response.
- */
- public void sync() throws IOException {
- try {
- if (isComplete()) {
- failOnError();
- return;
- }
-
- final Thread currentThread = Thread.currentThread();
- int idleCount = 0;
-
- while (true) {
- if (currentThread.isInterrupted()) {
- throw new InterruptedException();
- }
-
- if (isComplete()) {
- failOnError();
- return;
- }
-
- if (idleCount < SPIN_COUNT) {
- idleCount++;
- } else if (idleCount < YIELD_COUNT) {
- Thread.yield();
- idleCount++;
- } else if (idleCount < TINY_PARK_COUNT) {
- LockSupport.parkNanos(TINY_PARK_NANOS);
- idleCount++;
- } else if (idleCount < SMALL_PARK_COUNT) {
- LockSupport.parkNanos(SMALL_PARK_NANOS);
- idleCount++;
- } else {
- synchronized (this) {
- if (isComplete()) {
- failOnError();
- return;
- }
-
- waiting++;
- try {
- wait();
- } finally {
- waiting--;
- }
- }
- }
- }
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw IOExceptionSupport.create(e);
- }
- }
+ public abstract boolean sync(long amount, TimeUnit unit) throws IOException;
- private void failOnError() throws IOException {
+ protected void failOnError() throws IOException {
Throwable cause = error;
if (cause != null) {
throw IOExceptionSupport.create(cause);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java
new file mode 100644
index 0000000..95868c0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.util.Map;
+
+/**
+ * Factory for provider future instances that will create specific versions based on
+ * configuration.
+ */
+public abstract class ProviderFutureFactory {
+
+ public static final String PROVIDER_FUTURE_TYPE_KEY = "futureType";
+
+ private static final String OS_NAME = System.getProperty("os.name");
+ private static final String WINDOWS_OS_PREFIX = "Windows";
+ private static final boolean IS_WINDOWS = isOsNameMatch(OS_NAME, WINDOWS_OS_PREFIX);
+
+ private static final String CONSERVATIVE = "conservative";
+ private static final String BALANCED = "balanced";
+ private static final String PROGRESSIVE = "progressive";
+
+ /**
+ * Create a new Provider
+ *
+ * @param providerOptions
+ * Configuration options to be consumed by this factory create method
+ *
+ * @return a new ProviderFutureFactory that will be used to create the desired future types.
+ */
+ public static ProviderFutureFactory create(Map<String, String> providerOptions) {
+ String futureTypeKey = providerOptions.remove(PROVIDER_FUTURE_TYPE_KEY);
+
+ if (futureTypeKey == null || futureTypeKey.isEmpty()) {
+ if (Runtime.getRuntime().availableProcessors() < 4) {
+ return new ConservativeProviderFutureFactory();
+ } else if (isWindows()) {
+ return new BalancedProviderFutureFactory();
+ } else {
+ return new ProgressiveProviderFutureFactory();
+ }
+ }
+
+ switch (futureTypeKey.toLowerCase()) {
+ case CONSERVATIVE:
+ return new ConservativeProviderFutureFactory();
+ case BALANCED:
+ return new BalancedProviderFutureFactory();
+ case PROGRESSIVE:
+ return new ProgressiveProviderFutureFactory();
+ default:
+ throw new IllegalArgumentException(
+ "No ProviderFuture implementation with name " + futureTypeKey + " found");
+ }
+ }
+
+ /**
+ * @return a new ProviderFuture instance.
+ */
+ public abstract ProviderFuture createFuture();
+
+ /**
+ * @param synchronization
+ * The {@link ProviderSynchronization} to assign to the returned {@link ProviderFuture}.
+ *
+ * @return a new ProviderFuture instance.
+ */
+ public abstract ProviderFuture createFuture(ProviderSynchronization synchronization);
+
+ /**
+ * @return a ProviderFuture that treats failures as success calls that simply complete the operation.
+ */
+ public abstract ProviderFuture createUnfailableFuture();
+
+ //----- Internal support methods -----------------------------------------//
+
+ private static boolean isWindows() {
+ return IS_WINDOWS;
+ }
+
+ private static boolean isOsNameMatch(final String currentOSName, final String osNamePrefix) {
+ if (currentOSName == null || currentOSName.isEmpty()) {
+ return false;
+ }
+
+ return currentOSName.startsWith(osNamePrefix);
+ }
+
+ //----- ProviderFutureFactory implementation -----------------------------//
+
+ private static class ConservativeProviderFutureFactory extends ProviderFutureFactory {
+
+ @Override
+ public ProviderFuture createFuture() {
+ return new ConservativeProviderFuture();
+ }
+
+ @Override
+ public ProviderFuture createFuture(ProviderSynchronization synchronization) {
+ return new ConservativeProviderFuture(synchronization);
+ }
+
+ @Override
+ public ProviderFuture createUnfailableFuture() {
+ return new ConservativeProviderFuture() {
+
+ @Override
+ public void onFailure(Throwable t) {
+ this.onSuccess();
+ }
+ };
+ }
+ }
+
+ private static class BalancedProviderFutureFactory extends ProviderFutureFactory {
+
+ @Override
+ public ProviderFuture createFuture() {
+ return new BalancedProviderFuture();
+ }
+
+ @Override
+ public ProviderFuture createFuture(ProviderSynchronization synchronization) {
+ return new BalancedProviderFuture(synchronization);
+ }
+
+ @Override
+ public ProviderFuture createUnfailableFuture() {
+ return new BalancedProviderFuture() {
+
+ @Override
+ public void onFailure(Throwable t) {
+ this.onSuccess();
+ }
+ };
+ }
+ }
+
+ private static class ProgressiveProviderFutureFactory extends ProviderFutureFactory {
+
+ @Override
+ public ProviderFuture createFuture() {
+ return new ProgressiveProviderFuture();
+ }
+
+ @Override
+ public ProviderFuture createFuture(ProviderSynchronization synchronization) {
+ return new ProgressiveProviderFuture(synchronization);
+ }
+
+ @Override
+ public ProviderFuture createUnfailableFuture() {
+ return new ProgressiveProviderFuture() {
+
+ @Override
+ public void onFailure(Throwable t) {
+ this.onSuccess();
+ }
+ };
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 6dfa4c6..2a4eb09 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -146,6 +146,16 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
}
@Override
+ public ProviderFuture newProviderFuture() {
+ return next.newProviderFuture();
+ }
+
+ @Override
+ public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+ return next.newProviderFuture(synchronization);
+ }
+
+ @Override
public void setProviderListener(ProviderListener listener) {
this.listener = listener;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index a4dbc39..1c97dfe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -60,7 +60,9 @@ import org.apache.qpid.jms.provider.ProviderClosedException;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFailedException;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder;
import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
import org.apache.qpid.jms.sasl.Mechanism;
@@ -145,6 +147,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private final Collector protonCollector = new CollectorImpl();
private final Connection protonConnection = Connection.Factory.create();
+ private final ProviderFutureFactory futureFactory;
private AsyncResult connectionRequest;
private ScheduledFuture<?> nextIdleTimeoutCheck;
@@ -155,10 +158,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
* The URI of the AMQP broker this Provider instance will connect to.
* @param transport
* The underlying Transport that will be used for wire level communications.
+ * @param futureFactory
+ * The ProviderFutureFactory to use when futures are requested.
*/
- public AmqpProvider(URI remoteURI, Transport transport) {
+ public AmqpProvider(URI remoteURI, Transport transport, ProviderFutureFactory futureFactory) {
this.remoteURI = remoteURI;
this.transport = transport;
+ this.futureFactory = futureFactory;
serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory(
"AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
@@ -172,7 +178,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
public void connect(final JmsConnectionInfo connectionInfo) throws IOException {
checkClosedOrFailed();
- final ProviderFuture connectRequest = new ProviderFuture();
+ final ProviderFuture connectRequest = futureFactory.createFuture();
serializer.execute(new Runnable() {
@@ -302,15 +308,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
- final ProviderFuture request = new ProviderFuture() {
-
- @Override
- public void onFailure(Throwable result) {
- // During close it is fine if the close call fails
- // this in unrecoverable so we just log the event.
- onSuccess();
- }
- };
+ final ProviderFuture request = futureFactory.createUnfailableFuture();
serializer.execute(new Runnable() {
@@ -1138,6 +1136,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
return connection.getAmqpMessageFactory();
}
+ @Override
+ public ProviderFuture newProviderFuture() {
+ return futureFactory.createFuture();
+ }
+
+ @Override
+ public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+ return futureFactory.createFuture(synchronization);
+ }
+
public void setTraceFrames(boolean trace) {
this.traceFrames = trace;
updateTracer();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
index c141e26..a0083cf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
@@ -20,6 +20,7 @@ import java.net.URI;
import java.util.Map;
import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.transports.Transport;
import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.util.PropertyUtil;
@@ -37,19 +38,40 @@ public class AmqpProviderFactory extends ProviderFactory {
@Override
public AmqpProvider createProvider(URI remoteURI) throws Exception {
+ return createProvider(remoteURI, null);
+ }
+ @Override
+ public AmqpProvider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
Map<String, String> map = PropertyUtil.parseQuery(remoteURI);
- Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "amqp.");
- // Clear off any amqp.X values from the transport before creation.
+ // Clear off any amqp.X and provider.X values from the transport before creation.
+ Map<String, String> amqpProviderOptions = PropertyUtil.filterProperties(map, "amqp.");
+ Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider.");
+
Transport transport = TransportFactory.create(getTransportScheme(), PropertyUtil.replaceQuery(remoteURI, map));
- AmqpProvider result = new AmqpProvider(remoteURI, transport);
+ // If we have been given a futures factory to use then we ignore any URI options indicating
+ // what to create and just go with what we are given.
+ if (futureFactory == null) {
+ // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+ futureFactory = ProviderFutureFactory.create(providerOptions);
+ if (!providerOptions.isEmpty()) {
+ String msg = ""
+ + " Not all Provider options could be applied during AMQP Provider creation."
+ + " Check the options are spelled correctly."
+ + " Unused parameters=[" + providerOptions + "]."
+ + " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ AmqpProvider result = new AmqpProvider(remoteURI, transport, futureFactory);
- Map<String, String> unused = PropertyUtil.setProperties(result, providerOptions);
+ Map<String, String> unused = PropertyUtil.setProperties(result, amqpProviderOptions);
if (!unused.isEmpty()) {
String msg = ""
- + " Not all provider options could be set on the AMQP Provider."
+ + " Not all AMQP provider options could be set on the AMQP Provider."
+ " Check the options are spelled correctly."
+ " Unused parameters=[" + unused + "]."
+ " This provider instance cannot be started.";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index b48f4c9..654ec48 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -52,8 +52,10 @@ import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFactory;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.ProviderRedirectedException;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
import org.apache.qpid.jms.provider.WrappedAsyncResult;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
@@ -100,6 +102,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>();
private final DefaultProviderListener closedListener = new DefaultProviderListener();
private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>();
+ private final ProviderFutureFactory futureFactory;
// Current state of connection / reconnection
private final ReconnectControls reconnectControl = new ReconnectControls();
@@ -124,16 +127,17 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
private FailoverServerListAction amqpOpenServerListAction = FailoverServerListAction.REPLACE;
- public FailoverProvider(Map<String, String> nestedOptions) {
- this(null, nestedOptions);
+ public FailoverProvider(Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) {
+ this(null, nestedOptions, futureFactory);
}
- public FailoverProvider(List<URI> uris) {
- this(uris, null);
+ public FailoverProvider(List<URI> uris, ProviderFutureFactory futureFactory) {
+ this(uris, null, futureFactory);
}
- public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions) {
+ public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) {
this.uris = new FailoverUriPool(uris, nestedOptions);
+ this.futureFactory = futureFactory;
serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory("FailoverProvider: serialization thread", true));
serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@@ -167,7 +171,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
@Override
public void close() {
if (closed.compareAndSet(false, true)) {
- final ProviderFuture request = new ProviderFuture();
+ final ProviderFuture request = futureFactory.createFuture();
serializer.execute(new Runnable() {
@Override
@@ -700,7 +704,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
try {
LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts,
target.getScheme() + "://" + target.getHost() + ":" + target.getPort());
- provider = ProviderFactory.create(target);
+ provider = ProviderFactory.create(target, futureFactory);
provider.connect(connectionInfo);
initializeNewConnection(provider);
return;
@@ -1045,6 +1049,16 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
}
@Override
+ public ProviderFuture newProviderFuture() {
+ return futureFactory.createFuture();
+ }
+
+ @Override
+ public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+ return futureFactory.createFuture(synchronization);
+ }
+
+ @Override
public String toString() {
return "FailoverProvider: " +
(connectedURI == null ? "unconnected" : connectedURI.toString());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
index 3914784..5d33763 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
@@ -21,6 +21,7 @@ import java.util.Map;
import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.apache.qpid.jms.util.PropertyUtil;
import org.apache.qpid.jms.util.URISupport;
import org.apache.qpid.jms.util.URISupport.CompositeData;
@@ -42,13 +43,34 @@ public class FailoverProviderFactory extends ProviderFactory {
@Override
public Provider createProvider(URI remoteURI) throws Exception {
+ return createProvider(remoteURI, null);
+ }
+
+ @Override
+ public Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
CompositeData composite = URISupport.parseComposite(remoteURI);
Map<String, String> options = composite.getParameters();
Map<String, String> filtered = PropertyUtil.filterProperties(options, FAILOVER_OPTION_PREFIX);
Map<String, String> nested = PropertyUtil.filterProperties(filtered, FAILOVER_NESTED_OPTION_PREFIX_ADDON);
- FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested);
+ Map<String, String> providerOptions = PropertyUtil.filterProperties(options, "provider.");
+ // If we have been given a futures factory to use then we ignore any URI options indicating
+ // what to create and just go with what we are given.
+ if (futureFactory == null) {
+ // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+ futureFactory = ProviderFutureFactory.create(providerOptions);
+ if (!providerOptions.isEmpty()) {
+ String msg = ""
+ + " Not all Provider options could be applied during Failover Provider creation."
+ + " Check the options are spelled correctly."
+ + " Unused parameters=[" + providerOptions + "]."
+ + " This provider instance cannot be started.";
+ throw new IllegalArgumentException(msg);
+ }
+ }
+
+ FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested, futureFactory);
Map<String, String> unused = PropertyUtil.setProperties(provider, filtered);
if (!unused.isEmpty()) {
String msg = ""
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java
new file mode 100644
index 0000000..8cd03ce
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ProviderFutureFactoryTest {
+
+ @Test
+ public void testCreateFailsWithNullOptions() {
+ try {
+ ProviderFutureFactory.create(null);
+ fail("Should throw NullPointerException");
+ } catch (NullPointerException npe) {}
+ }
+
+ @Test
+ public void testCreateFailsWhenFutureTypeNotValid() {
+ Map<String, String> options = new HashMap<>();
+
+ options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "super-fast");
+
+ try {
+ ProviderFutureFactory.create(options);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException iae) {}
+ }
+
+ @Test
+ public void testCreateFactoryWithNoConfigurationOptionsGiven() {
+ ProviderFutureFactory factory = ProviderFutureFactory.create(Collections.emptyMap());
+
+ ProviderFuture future = factory.createFuture();
+ assertNotNull(future);
+ assertFalse(future.isComplete());
+ }
+
+ @Test
+ public void testCreateConservativeFactoryFromConfiguration() {
+ Map<String, String> options = new HashMap<>();
+
+ options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "conservative");
+
+ ProviderFutureFactory factory = ProviderFutureFactory.create(options);
+
+ ProviderFuture future = factory.createFuture();
+ assertNotNull(future);
+ assertFalse(future.isComplete());
+
+ assertTrue(future instanceof ConservativeProviderFuture);
+ }
+
+ @Test
+ public void testCreateBalancedFactoryFromConfiguration() {
+ Map<String, String> options = new HashMap<>();
+
+ options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "balanced");
+
+ ProviderFutureFactory factory = ProviderFutureFactory.create(options);
+
+ ProviderFuture future = factory.createFuture();
+ assertNotNull(future);
+ assertFalse(future.isComplete());
+
+ assertTrue(future instanceof BalancedProviderFuture);
+ }
+
+ @Test
+ public void testCreateProgressiveFactoryFromConfiguration() {
+ Map<String, String> options = new HashMap<>();
+
+ options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "progressive");
+
+ ProviderFutureFactory factory = ProviderFutureFactory.create(options);
+
+ ProviderFuture future = factory.createFuture();
+ assertNotNull(future);
+ assertFalse(future.isComplete());
+
+ assertTrue(future instanceof ProgressiveProviderFuture);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
index 42258ed..601c879 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
@@ -22,17 +22,41 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+@RunWith(Parameterized.class)
public class ProviderFutureTest {
+ private final ProviderFutureFactory futuresFactory;
+
+ @Parameters(name = "{index}: futureType={0}")
+ public static Collection<Object> data() {
+ return Arrays.asList(new Object[] {
+ "conservative", "balanced", "progressive" }
+ );
+ }
+
+ public ProviderFutureTest(String futureTypeName) {
+ Map<String, String> options = new HashMap<>();
+ options.put("futureType", futureTypeName);
+
+ futuresFactory = ProviderFutureFactory.create(options);
+ }
+
@Test
public void testIsComplete() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
assertFalse(future.isComplete());
future.onSuccess();
@@ -41,7 +65,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testOnSuccess() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
future.onSuccess();
try {
@@ -53,7 +77,7 @@ public class ProviderFutureTest {
@Test(timeout = 90000)
public void testTimedSync() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
try {
assertFalse(future.sync(1, TimeUnit.SECONDS));
@@ -64,7 +88,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testOnFailure() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
IOException ex = new IOException();
future.onFailure(ex);
@@ -79,7 +103,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testOnSuccessCallsSynchronization() {
final AtomicBoolean syncCalled = new AtomicBoolean(false);
- ProviderFuture future = new ProviderFuture(new ProviderSynchronization() {
+ ProviderFuture future = futuresFactory.createFuture(new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
@@ -104,7 +128,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testOnFailureCallsSynchronization() {
final AtomicBoolean syncCalled = new AtomicBoolean(false);
- ProviderFuture future = new ProviderFuture(new ProviderSynchronization() {
+ ProviderFuture future = futuresFactory.createFuture(new ProviderSynchronization() {
@Override
public void onPendingSuccess() {
@@ -131,7 +155,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testSuccessfulStateIsFixed() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
IOException ex = new IOException();
future.onSuccess();
@@ -145,7 +169,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testFailedStateIsFixed() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
IOException ex = new IOException();
future.onFailure(ex);
@@ -160,7 +184,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testSyncHandlesInterruption() throws InterruptedException {
- final ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
final CountDownLatch syncing = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
@@ -193,7 +217,7 @@ public class ProviderFutureTest {
@Test(timeout = 10000)
public void testTimedSyncHandlesInterruption() throws InterruptedException {
- final ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
final CountDownLatch syncing = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
index 52e888b..6fea518 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
@@ -21,11 +21,14 @@ import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.util.Collections;
import org.junit.Test;
public class WrappedAsyncResultTest {
+ private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap());
+
@Test
public void testCreateWithNull() {
try {
@@ -35,7 +38,7 @@ public class WrappedAsyncResultTest {
@Test
public void testGetWrapped() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {};
assertSame(wrapped.getWrappedRequest(), future);
@@ -43,7 +46,7 @@ public class WrappedAsyncResultTest {
@Test
public void testOnSuccessPassthrough() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {};
assertFalse(future.isComplete());
@@ -55,7 +58,7 @@ public class WrappedAsyncResultTest {
@Test
public void testOnFailurePassthrough() {
- ProviderFuture future = new ProviderFuture();
+ ProviderFuture future = futuresFactory.createFuture();
WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {};
assertFalse(future.isComplete());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index b26cb5f..53f3c6a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -323,7 +323,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
fail("Should have thrown an IOException when closed.");
} catch (IOException ex) {}
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
try {
provider.unsubscribe("subscription-name", request);
fail("Should have thrown an IOException when closed.");
@@ -360,7 +360,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
connectionInfo.setSendTimeout(SEND_TIMEOUT);
connectionInfo.setRequestTimeout(REQUEST_TIMEOUT);
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
provider.create(connectionInfo, request);
request.sync();
@@ -415,7 +415,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
};
assertFalse("Error should not yet be thrown", errorThrown.get());
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = provider.newProviderFuture();
switch(operation) {
case CREATE:
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
index 7cf686c..c5d4197 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
@@ -28,6 +29,7 @@ import org.apache.qpid.jms.meta.JmsTransactionId;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
import org.junit.Before;
import org.junit.Test;
@@ -36,6 +38,8 @@ import org.junit.Test;
*/
public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
+ private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap());
+
private FailoverProvider provider;
private JmsConnectionInfo connection;
private JmsSessionInfo session;
@@ -71,49 +75,49 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
@Test(timeout=30000, expected=IOException.class)
public void testCreateResource() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.create(connection, request);
}
@Test(timeout=30000, expected=IOException.class)
public void testStartResource() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.start(session, request);
}
@Test(timeout=30000, expected=IOException.class)
public void testStopResource() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.stop(session, request);
}
@Test(timeout=30000, expected=IOException.class)
public void testDestroyResource() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.destroy(session, request);
}
@Test(timeout=30000, expected=IOException.class)
public void testSend() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.send(new JmsOutboundMessageDispatch(), request);
}
@Test(timeout=30000, expected=IOException.class)
public void testSessionAcknowledge() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.acknowledge(session.getId(), ACK_TYPE.ACCEPTED, request);
}
@Test(timeout=30000, expected=IOException.class)
public void testAcknowledgeMessage() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.acknowledge(new JmsInboundMessageDispatch(1), ACK_TYPE.ACCEPTED, request);
}
@Test(timeout=30000, expected=IOException.class)
public void testCommit() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
JmsTransactionId txId = new JmsTransactionId(connection.getId(), 1);
JmsTransactionInfo txInfo = new JmsTransactionInfo(session.getId(), txId);
provider.commit(txInfo, null, request);
@@ -121,7 +125,7 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
@Test(timeout=30000, expected=IOException.class)
public void testRollback() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
JmsTransactionId txId = new JmsTransactionId(connection.getId(), 1);
JmsTransactionInfo txInfo = new JmsTransactionInfo(session.getId(), txId);
provider.rollback(txInfo, null, request);
@@ -129,19 +133,19 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
@Test(timeout=30000, expected=IOException.class)
public void testRecover() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.recover(session.getId(), request);
}
@Test(timeout=30000, expected=IOException.class)
public void testUnsubscribe() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.unsubscribe("subscription-name", request);
}
@Test(timeout=30000, expected=IOException.class)
public void testMessagePull() throws Exception {
- ProviderFuture request = new ProviderFuture();
+ ProviderFuture request = futuresFactory.createFuture();
provider.pull(consumer.getId(), 1, request);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org