You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/07/18 16:41:49 UTC

[1/2] qpid-jms git commit: QPIDJMS-404 Add variants of ProviderFuture for perf tuning

Repository: qpid-jms
Updated Branches:
  refs/heads/master 8b200671e -> 264a9a9b6


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
index 19b8d25..e5291d9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.jms.meta.JmsProducerInfo;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.provider.DefaultProviderListener;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.test.Wait;
 import org.junit.After;
 import org.junit.Before;
@@ -61,6 +62,8 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(FailoverProviderTest.class);
 
+    private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap());
+
     private List<URI> uris;
     private FailoverProvider provider;
     private JmsConnectionInfo connection;
@@ -91,7 +94,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testCreateProviderOnlyUris() {
-        provider = new FailoverProvider(uris);
+        provider = new FailoverProvider(uris, futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
         assertNotNull(provider.getNestedOptions());
@@ -103,7 +106,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         Map<String, String> options = new HashMap<String, String>();
         options.put("transport.tcpNoDelay", "true");
 
-        provider = new FailoverProvider(options);
+        provider = new FailoverProvider(options, futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
         assertNotNull(provider.getNestedOptions());
@@ -113,7 +116,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testCreateProviderWithNestedOptions() {
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
         assertNotNull(provider.getNestedOptions());
@@ -122,7 +125,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testProviderListener() {
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
         assertNull(provider.getProviderListener());
         provider.setProviderListener(new DefaultProviderListener());
         assertNotNull(provider.getProviderListener());
@@ -130,7 +133,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testGetRemoteURI() throws Exception {
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
 
         assertNull(provider.getRemoteURI());
         provider.connect(connection);
@@ -145,7 +148,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testToString() throws Exception {
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
 
         assertNotNull(provider.toString());
         provider.connect(connection);
@@ -161,7 +164,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testConnectToMock() throws Exception {
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
 
@@ -177,7 +180,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
         provider.connect(connection);
 
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = provider.newProviderFuture();
         provider.create(createConnectionInfo(), request);
 
         request.sync(10, TimeUnit.SECONDS);
@@ -192,7 +195,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testCannotStartWithoutListener() throws Exception {
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
         assertEquals(FailoverUriPool.DEFAULT_RANDOMIZE_ENABLED, provider.isRandomize());
         assertNull(provider.getRemoteURI());
         provider.connect(connection);
@@ -421,7 +424,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         final long SEND_TIMEOUT = TimeUnit.SECONDS.toMillis(6);
         final long REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(7);
 
-        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap());
+        provider = new FailoverProvider(uris, Collections.<String, String>emptyMap(), futuresFactory);
         provider.setProviderListener(new DefaultProviderListener() {
 
             @Override
@@ -439,7 +442,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
         connectionInfo.setSendTimeout(SEND_TIMEOUT);
         connectionInfo.setRequestTimeout(REQUEST_TIMEOUT);
 
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = provider.newProviderFuture();
         provider.create(connectionInfo, request);
         request.sync();
 
@@ -450,13 +453,13 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testAmqpOpenServerListActionDefault() {
-        provider = new FailoverProvider(uris);
+        provider = new FailoverProvider(uris, futuresFactory);
         assertEquals("REPLACE", provider.getAmqpOpenServerListAction());
     }
 
     @Test(timeout = 30000)
     public void testSetGetAmqpOpenServerListAction() {
-        provider = new FailoverProvider(uris);
+        provider = new FailoverProvider(uris, futuresFactory);
         String action = "ADD";
         assertFalse(action.equals(provider.getAmqpOpenServerListAction()));
 
@@ -466,7 +469,7 @@ public class FailoverProviderTest extends FailoverProviderTestSupport {
 
     @Test(timeout = 30000)
     public void testSetInvalidAmqpOpenServerListActionThrowsIAE() {
-        provider = new FailoverProvider(uris);
+        provider = new FailoverProvider(uris, futuresFactory);
         try {
             provider.setAmqpOpenServerListAction("invalid");
             fail("no exception was thrown");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
index 7a19867..565d1f0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProvider.java
@@ -43,7 +43,9 @@ import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.provider.amqp.AmqpProvider;
 import org.apache.qpid.jms.util.ThreadPoolUtils;
 import org.slf4j.Logger;
@@ -66,6 +68,7 @@ public class MockProvider implements Provider {
     private final ScheduledThreadPoolExecutor serializer;
     private final AtomicBoolean closed = new AtomicBoolean();
     private final MockRemotePeer context;
+    private final ProviderFutureFactory futureFactory;
 
     private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
     private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
@@ -73,11 +76,12 @@ public class MockProvider implements Provider {
     private MockProviderListener eventListener;
     private ProviderListener listener;
 
-    public MockProvider(URI remoteURI, MockProviderConfiguration configuration, MockRemotePeer context) {
+    public MockProvider(URI remoteURI, MockProviderConfiguration configuration, MockRemotePeer context, ProviderFutureFactory futureFactory) {
         this.remoteURI = remoteURI;
         this.configuration = configuration;
         this.context = context;
         this.stats = new MockProviderStats(context != null ? context.getContextStats() : null);
+        this.futureFactory = futureFactory;
 
         serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
 
@@ -129,7 +133,7 @@ public class MockProvider implements Provider {
         if (closed.compareAndSet(false, true)) {
             stats.recordCloseAttempt();
 
-            final ProviderFuture request = new ProviderFuture();
+            final ProviderFuture request = futureFactory.createFuture();
             serializer.execute(new Runnable() {
 
                 @Override
@@ -513,6 +517,16 @@ public class MockProvider implements Provider {
         this.connectTimeout = connectTimeout;
     }
 
+    @Override
+    public ProviderFuture newProviderFuture() {
+        return futureFactory.createFuture();
+    }
+
+    @Override
+    public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return futureFactory.createFuture(synchronization);
+    }
+
     //----- Implementation details -------------------------------------------//
 
     private void checkClosed() throws ProviderClosedException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
index 75c3293..410a5c7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/mock/MockProviderFactory.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.Map;
 
 import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.util.PropertyUtil;
 
 /**
@@ -29,11 +30,30 @@ public class MockProviderFactory extends ProviderFactory {
 
     @Override
     public MockProvider createProvider(URI remoteURI) throws Exception {
+        return createProvider(remoteURI, null);
+    }
+
+    @Override
+    public MockProvider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
 
         Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
-        Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "mock.");
+        Map<String, String> mockProviderOptions = PropertyUtil.filterProperties(map, "mock.");
 
-        remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+        Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider.");
+        // If we have been given a futures factory to use then we ignore any URI options indicating
+        // what to create and just go with what we are given.
+        if (futureFactory == null) {
+            // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+            futureFactory = ProviderFutureFactory.create(providerOptions);
+            if (!providerOptions.isEmpty()) {
+                String msg = ""
+                    + " Not all Provider options could be applied during Mock Provider creation."
+                    + " Check the options are spelled correctly."
+                    + " Unused parameters=[" + providerOptions + "]."
+                    + " This provider instance cannot be started.";
+                throw new IllegalArgumentException(msg);
+            }
+        }
 
         MockProviderConfiguration configuration = new MockProviderConfiguration();
         MockRemotePeer remote = MockRemotePeer.INSTANCE;
@@ -41,7 +61,7 @@ public class MockProviderFactory extends ProviderFactory {
             remote.getContextStats().recordProviderCreated();
         }
 
-        Map<String, String> unused = PropertyUtil.setProperties(configuration, providerOptions);
+        Map<String, String> unused = PropertyUtil.setProperties(configuration, mockProviderOptions);
         if (!unused.isEmpty()) {
             String msg = ""
                 + " Not all provider options could be set on the " + getName()
@@ -51,7 +71,7 @@ public class MockProviderFactory extends ProviderFactory {
             throw new IllegalArgumentException(msg);
         }
 
-        return new MockProvider(remoteURI, configuration, remote);
+        return new MockProvider(PropertyUtil.replaceQuery(remoteURI, map), configuration, remote, futureFactory);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
index 7c3672a..b857994 100644
--- a/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
+++ b/qpid-jms-discovery/src/main/java/org/apache/qpid/jms/provider/discovery/DiscoveryProviderFactory.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.provider.failover.FailoverProvider;
 import org.apache.qpid.jms.provider.failover.FailoverProviderFactory;
 import org.apache.qpid.jms.util.PropertyUtil;
@@ -45,8 +46,14 @@ public class DiscoveryProviderFactory extends ProviderFactory {
      */
     public static final String DISCOVERY_DISCOVERED_OPTION_PREFIX_ADON = "discovered.";
 
+
     @Override
     public Provider createProvider(URI remoteURI) throws Exception {
+        return createProvider(remoteURI, null);
+    }
+
+    @Override
+    public Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
 
         CompositeData composite = URISupport.parseComposite(remoteURI);
         Map<String, String> options = composite.getParameters();
@@ -67,8 +74,24 @@ public class DiscoveryProviderFactory extends ProviderFactory {
         nestedOptions.putAll(failoverNestedOptions);
         nestedOptions.putAll(discoveredOptions);
 
+        Map<String, String> providerOptions = PropertyUtil.filterProperties(options, "provider.");
+        // If we have been given a futures factory to use then we ignore any URI options indicating
+        // what to create and just go with what we are given.
+        if (futureFactory == null) {
+            // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+            futureFactory = ProviderFutureFactory.create(providerOptions);
+            if (!providerOptions.isEmpty()) {
+                String msg = ""
+                    + " Not all Provider options could be applied during Failover Provider creation."
+                    + " Check the options are spelled correctly."
+                    + " Unused parameters=[" + providerOptions + "]."
+                    + " This provider instance cannot be started.";
+                throw new IllegalArgumentException(msg);
+            }
+        }
+
         // Failover will apply the nested options to each URI while attempting to connect.
-        FailoverProvider failover = new FailoverProvider(nestedOptions);
+        FailoverProvider failover = new FailoverProvider(nestedOptions, futureFactory);
         Map<String, String> leftOverDiscoveryOptions = PropertyUtil.setProperties(failover, mainOptions);
 
         DiscoveryProvider discovery = new DiscoveryProvider(remoteURI, failover);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-404 Add variants of ProviderFuture for perf tuning

Posted by ta...@apache.org.
QPIDJMS-404 Add variants of ProviderFuture for perf tuning

Adds three variations on ProviderFuture that allow for tuning on
platforms that don't benefit from the spin / wait pattern used in
the current implementation and default to using a variant that does
not park on windows to avoid unpredictably long parks that result
in performance decreases due to missing the event completion.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/264a9a9b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/264a9a9b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/264a9a9b

Branch: refs/heads/master
Commit: 264a9a9b6c5d8d8c11a995b7b02289b2938d77ba
Parents: 8b20067
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Jul 18 12:11:53 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Jul 18 12:37:35 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java |  34 ++--
 .../qpid/jms/JmsLocalTransactionContext.java    |   2 +-
 .../org/apache/qpid/jms/JmsMessageConsumer.java |   4 +-
 .../org/apache/qpid/jms/JmsMessageProducer.java |   2 +-
 .../java/org/apache/qpid/jms/JmsSession.java    |   2 +-
 .../jms/provider/BalancedProviderFuture.java    | 149 ++++++++++++++++
 .../provider/ConservativeProviderFuture.java    | 128 ++++++++++++++
 .../jms/provider/ProgressiveProviderFuture.java | 167 ++++++++++++++++++
 .../org/apache/qpid/jms/provider/Provider.java  |  22 +++
 .../qpid/jms/provider/ProviderFactory.java      |  34 +++-
 .../qpid/jms/provider/ProviderFuture.java       | 165 ++---------------
 .../jms/provider/ProviderFutureFactory.java     | 176 +++++++++++++++++++
 .../qpid/jms/provider/ProviderWrapper.java      |  10 ++
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  30 ++--
 .../jms/provider/amqp/AmqpProviderFactory.java  |  32 +++-
 .../jms/provider/failover/FailoverProvider.java |  28 ++-
 .../failover/FailoverProviderFactory.java       |  24 ++-
 .../jms/provider/ProviderFutureFactoryTest.java | 105 +++++++++++
 .../qpid/jms/provider/ProviderFutureTest.java   |  44 +++--
 .../jms/provider/WrappedAsyncResultTest.java    |   9 +-
 .../jms/provider/amqp/AmqpProviderTest.java     |   6 +-
 .../failover/FailoverProviderClosedTest.java    |  28 +--
 .../provider/failover/FailoverProviderTest.java |  31 ++--
 .../qpid/jms/provider/mock/MockProvider.java    |  18 +-
 .../jms/provider/mock/MockProviderFactory.java  |  28 ++-
 .../discovery/DiscoveryProviderFactory.java     |  25 ++-
 26 files changed, 1061 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 7d09a9c..bb456fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -214,7 +214,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
                 }
 
                 if (isConnected() && !isFailed()) {
-                    ProviderFuture request = new ProviderFuture();
+                    ProviderFuture request = provider.newProviderFuture();
                     requests.put(request, request);
                     try {
                         provider.destroy(connectionInfo, request);
@@ -684,7 +684,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.create(resource, request);
@@ -705,7 +705,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.start(resource, request);
@@ -726,7 +726,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.stop(resource, request);
@@ -747,7 +747,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.destroy(resource, request);
@@ -764,7 +764,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.send(envelope, request);
@@ -785,7 +785,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             provider.acknowledge(envelope, ackType, request);
             request.sync();
         } catch (Exception ioe) {
@@ -801,7 +801,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             provider.acknowledge(sessionId, ackType, request);
             request.sync();
         } catch (Exception ioe) {
@@ -817,7 +817,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.unsubscribe(name, request);
@@ -838,7 +838,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.commit(transactionInfo, nextTransactionId, request);
@@ -859,7 +859,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.rollback(transactionInfo, nextTransactionId, request);
@@ -880,7 +880,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.recover(sessionId, request);
@@ -901,7 +901,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         checkClosedOrFailed();
 
         try {
-            ProviderFuture request = new ProviderFuture(synchronization);
+            ProviderFuture request = provider.newProviderFuture(synchronization);
             requests.put(request, request);
             try {
                 provider.pull(consumerId, timeout, request);
@@ -1256,12 +1256,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     public void onConnectionRecovery(Provider provider) throws Exception {
         LOG.debug("Connection {} is starting recovery.", connectionInfo.getId());
 
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = provider.newProviderFuture();
         provider.create(connectionInfo, request);
         request.sync();
 
         for (JmsTemporaryDestination tempDestination : tempDestinations.values()) {
-            request = new ProviderFuture();
+            request = provider.newProviderFuture();
             provider.create(tempDestination, request);
             request.sync();
         }
@@ -1269,7 +1269,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
             JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
             if (!consumerInfo.isClosed()) {
-                request = new ProviderFuture();
+                request = provider.newProviderFuture();
                 provider.create(consumerInfo, request);
                 request.sync();
             }
@@ -1290,7 +1290,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
         for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
             JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
             if (!consumerInfo.isClosed()) {
-                ProviderFuture request = new ProviderFuture();
+                ProviderFuture request = provider.newProviderFuture();
                 provider.start(consumerInfo, request);
                 request.sync();
             }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
index c60c5e6..45bd10a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -350,7 +350,7 @@ public class JmsLocalTransactionContext implements JmsTransactionContext {
                 // current transaction we must mark it as in-doubt so that a commit attempt
                 // will then roll it back.
                 transactionInfo = getNextTransactionInfo();
-                ProviderFuture request = new ProviderFuture(new ProviderSynchronization() {
+                ProviderFuture request = provider.newProviderFuture(new ProviderSynchronization() {
 
                     @Override
                     public void onPendingSuccess() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index e922679..d444e77 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -651,7 +651,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
 
     protected void onConnectionRecovery(Provider provider) throws Exception {
         if (!consumerInfo.isClosed()) {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
             try {
                 provider.create(consumerInfo, request);
                 request.sync();
@@ -667,7 +667,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
 
     protected void onConnectionRecovered(Provider provider) throws Exception {
         if (!consumerInfo.isClosed()) {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
             provider.start(consumerInfo, request);
             request.sync();
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 39f1888..cab1490 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -357,7 +357,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
 
     protected void onConnectionRecovery(Provider provider) throws Exception {
         if (!producerInfo.isClosed()) {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
             try {
                 provider.create(producerInfo, request);
                 request.sync();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 9711728..e3dd65f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -1337,7 +1337,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
 
     protected void onConnectionRecovery(Provider provider) throws Exception {
         if (!sessionInfo.isClosed()) {
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
             provider.create(sessionInfo, request);
             request.sync();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java
new file mode 100644
index 0000000..846a796
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/BalancedProviderFuture.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * A more balanced implementation of a ProviderFuture that works better on some
+ * platforms such as windows where the thread park and atomic operations used by
+ * a more aggressive implementation could result in poor performance.
+ */
+public class BalancedProviderFuture extends ProviderFuture {
+
+    // Using a progressive wait strategy helps to avoid wait happening before
+    // completion and avoid using expensive thread signalling
+    private static final int SPIN_COUNT = 10;
+    private static final int YIELD_COUNT = 100;
+
+    public BalancedProviderFuture() {
+        this(null);
+    }
+
+    public BalancedProviderFuture(ProviderSynchronization synchronization) {
+        super(synchronization);
+    }
+
+    @Override
+    public boolean sync(long amount, TimeUnit unit) throws IOException {
+        try {
+            if (isComplete() || amount == 0) {
+                failOnError();
+                return true;
+            }
+
+            final long timeout = unit.toNanos(amount);
+            long maxParkNanos = timeout / 8;
+            maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+            final long startTime = System.nanoTime();
+            int idleCount = 0;
+
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException();
+            }
+
+            while (true) {
+                final long elapsed = System.nanoTime() - startTime;
+                final long diff = elapsed - timeout;
+
+                if (diff >= 0) {
+                    failOnError();
+                    return isComplete();
+                }
+
+                if (isComplete()) {
+                    failOnError();
+                    return true;
+                }
+
+                if (idleCount < SPIN_COUNT) {
+                    idleCount++;
+                } else if (idleCount < YIELD_COUNT) {
+                    Thread.yield();
+                    idleCount++;
+                } else {
+                    synchronized (this) {
+                        if (isComplete()) {
+                            failOnError();
+                            return true;
+                        }
+
+                        waiting++;
+                        try {
+                            wait(-diff / 1000000, (int) (-diff % 1000000));
+                        } finally {
+                            waiting--;
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            if (isComplete()) {
+                failOnError();
+                return;
+            }
+
+            int idleCount = 0;
+
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException();
+            }
+
+            while (true) {
+                if (isComplete()) {
+                    failOnError();
+                    return;
+                }
+
+                if (idleCount < SPIN_COUNT) {
+                    idleCount++;
+                } else if (idleCount < YIELD_COUNT) {
+                    Thread.yield();
+                    idleCount++;
+                } else {
+                    synchronized (this) {
+                        if (isComplete()) {
+                            failOnError();
+                            return;
+                        }
+
+                        waiting++;
+                        try {
+                            wait();
+                        } finally {
+                            waiting--;
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java
new file mode 100644
index 0000000..e055130
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ConservativeProviderFuture.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * A more conservative implementation of a ProviderFuture that is better on some
+ * platforms or resource constrained hardware where high CPU usage can be more
+ * counter productive than other variants that might spin or otherwise avoid
+ * entry into states requiring thread signalling.
+ */
+public class ConservativeProviderFuture extends ProviderFuture {
+
+    public ConservativeProviderFuture() {
+        this(null);
+    }
+
+    public ConservativeProviderFuture(ProviderSynchronization synchronization) {
+        super(synchronization);
+    }
+
+    @Override
+    public boolean sync(long amount, TimeUnit unit) throws IOException {
+        try {
+            if (isComplete() || amount == 0) {
+                failOnError();
+                return true;
+            }
+
+            final long timeout = unit.toNanos(amount);
+            long maxParkNanos = timeout / 8;
+            maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+            final long startTime = System.nanoTime();
+
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException();
+            }
+
+            while (true) {
+                final long elapsed = System.nanoTime() - startTime;
+                final long diff = elapsed - timeout;
+
+                if (diff >= 0) {
+                    failOnError();
+                    return isComplete();
+                }
+
+                if (isComplete()) {
+                    failOnError();
+                    return true;
+                }
+
+                synchronized (this) {
+                    if (isComplete()) {
+                        failOnError();
+                        return true;
+                    }
+
+                    waiting++;
+                    try {
+                        wait(-diff / 1000000, (int) (-diff % 1000000));
+                    } finally {
+                        waiting--;
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            if (isComplete()) {
+                failOnError();
+                return;
+            }
+
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException();
+            }
+
+            while (true) {
+                if (isComplete()) {
+                    failOnError();
+                    return;
+                }
+
+                synchronized (this) {
+                    if (isComplete()) {
+                        failOnError();
+                        return;
+                    }
+
+                    waiting++;
+                    try {
+                        wait();
+                    } finally {
+                        waiting--;
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java
new file mode 100644
index 0000000..bd4da7f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProgressiveProviderFuture.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.LockSupport;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * An optimized version of a ProviderFuture that makes use of spin waits and other
+ * methods of reacting to asynchronous completion in a more timely manner.
+ */
+public class ProgressiveProviderFuture extends ProviderFuture {
+
+    // Using a progressive wait strategy helps to avoid wait happening before
+    // completion and avoid using expensive thread signaling
+    private static final int SPIN_COUNT = 10;
+    private static final int YIELD_COUNT = 100;
+    private static final int TINY_PARK_COUNT = 1000;
+    private static final int TINY_PARK_NANOS = 1;
+    private static final int SMALL_PARK_COUNT = 101_000;
+    private static final int SMALL_PARK_NANOS = 10_000;
+
+    public ProgressiveProviderFuture() {
+        this(null);
+    }
+
+    public ProgressiveProviderFuture(ProviderSynchronization synchronization) {
+        super(synchronization);
+    }
+
+    @Override
+    public boolean sync(long amount, TimeUnit unit) throws IOException {
+        try {
+            if (isComplete() || amount == 0) {
+                failOnError();
+                return true;
+            }
+
+            final long timeout = unit.toNanos(amount);
+            long maxParkNanos = timeout / 8;
+            maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
+            final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS);
+            final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS);
+            final long startTime = System.nanoTime();
+            int idleCount = 0;
+
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException();
+            }
+
+            while (true) {
+                final long elapsed = System.nanoTime() - startTime;
+                final long diff = elapsed - timeout;
+
+                if (diff >= 0) {
+                    failOnError();
+                    return isComplete();
+                }
+
+                if (isComplete()) {
+                    failOnError();
+                    return true;
+                }
+
+                if (idleCount < SPIN_COUNT) {
+                    idleCount++;
+                } else if (idleCount < YIELD_COUNT) {
+                    Thread.yield();
+                    idleCount++;
+                } else if (idleCount < TINY_PARK_COUNT) {
+                    LockSupport.parkNanos(tinyParkNanos);
+                    idleCount++;
+                } else if (idleCount < SMALL_PARK_COUNT) {
+                    LockSupport.parkNanos(smallParkNanos);
+                    idleCount++;
+                } else {
+                    synchronized (this) {
+                        if (isComplete()) {
+                            failOnError();
+                            return true;
+                        }
+
+                        waiting++;
+                        try {
+                            wait(-diff / 1000000, (int) (-diff % 1000000));
+                        } finally {
+                            waiting--;
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+    }
+
+    @Override
+    public void sync() throws IOException {
+        try {
+            if (isComplete()) {
+                failOnError();
+                return;
+            }
+
+            int idleCount = 0;
+
+            if (Thread.currentThread().isInterrupted()) {
+                throw new InterruptedException();
+            }
+
+            while (true) {
+                if (isComplete()) {
+                    failOnError();
+                    return;
+                }
+
+                if (idleCount < SPIN_COUNT) {
+                    idleCount++;
+                } else if (idleCount < YIELD_COUNT) {
+                    Thread.yield();
+                    idleCount++;
+                } else if (idleCount < TINY_PARK_COUNT) {
+                    LockSupport.parkNanos(TINY_PARK_NANOS);
+                    idleCount++;
+                } else if (idleCount < SMALL_PARK_COUNT) {
+                    LockSupport.parkNanos(SMALL_PARK_NANOS);
+                    idleCount++;
+                } else {
+                    synchronized (this) {
+                        if (isComplete()) {
+                            failOnError();
+                            return;
+                        }
+
+                        waiting++;
+                        try {
+                            wait();
+                        } finally {
+                            waiting--;
+                        }
+                    }
+                }
+            }
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
index a61cb34..85a626d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -340,6 +340,27 @@ public interface Provider {
     JmsMessageFactory getMessageFactory();
 
     /**
+     * Gets a ProviderFuture instance from the Provider for use in performing Provider calls
+     * that require an asynchronous completion to know when the call to the provider has succeeded
+     * or failed.
+     *
+     * @return a ProviderFuture for use in calling Provider methods that require a completion object.
+     */
+    ProviderFuture newProviderFuture();
+
+    /**
+     * Gets a ProviderFuture instance from the Provider for use in performing Provider calls
+     * that require an asynchronous completion to know when the call to the provider has succeeded
+     * or failed.
+     *
+     * @param synchronization
+     * 		A {@link ProviderSynchronization} to assign to the resulting {@link ProviderFuture}.
+     *
+     * @return a ProviderFuture for use in calling Provider methods that require a completion object.
+     */
+    ProviderFuture newProviderFuture(ProviderSynchronization synchronization);
+
+    /**
      * Sets the listener of events from this Provider instance.
      *
      * @param listener
@@ -353,4 +374,5 @@ public interface Provider {
      * @return the currently set ProviderListener instance.
      */
     ProviderListener getProviderListener();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
index 19a41db..6341e75 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
@@ -48,6 +48,21 @@ public abstract class ProviderFactory {
     public abstract Provider createProvider(URI remoteURI) throws Exception;
 
     /**
+     * Creates an instance of the given AsyncProvider and configures it using the
+     * properties set on the given remote broker URI.
+     *
+     * @param remoteURI
+     *        The URI used to connect to a remote Broker.
+     * @param futureFactory
+     * 		  The {@link ProviderFutureFactory} to use when creating the new {@link Provider}.
+     *
+     * @return a new AsyncProvider instance.
+     *
+     * @throws Exception if an error occurs while creating the Provider instance.
+     */
+    public abstract Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception;
+
+    /**
      * @return the name of this Provider.
      */
     public abstract String getName();
@@ -64,11 +79,28 @@ public abstract class ProviderFactory {
      * @throws Exception if an error occurs while creating the AsyncProvider instance.
      */
     public static Provider create(URI remoteURI) throws Exception {
+        return create(remoteURI, null);
+    }
+
+    /**
+     * Static create method that performs the ProviderFactory search and handles the
+     * configuration and setup.
+     *
+     * @param remoteURI
+     *        the URI of the remote peer.
+     * @param futureFactory
+     * 		  the {@link ProviderFutureFactory} to use when building the new {@link Provider}.
+     *
+     * @return a new AsyncProvider instance that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the AsyncProvider instance.
+     */
+    public static Provider create(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
         Provider result = null;
 
         try {
             ProviderFactory factory = findProviderFactory(remoteURI);
-            result = factory.createProvider(remoteURI);
+            result = factory.createProvider(remoteURI, futureFactory);
         } catch (Exception ex) {
             LOG.error("Failed to create Provider instance for {}, due to: {}", remoteURI.getScheme(), ex);
             LOG.trace("Error: ", ex);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
index e62db63..75805a6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
@@ -19,39 +19,28 @@ package org.apache.qpid.jms.provider;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.concurrent.locks.LockSupport;
 
 import org.apache.qpid.jms.util.IOExceptionSupport;
 
 /**
  * Asynchronous Provider Future class.
  */
-public class ProviderFuture implements AsyncResult {
+public abstract class ProviderFuture implements AsyncResult {
 
-    // Using a progressive wait strategy helps to avoid await happening before countDown
-    // and avoids expensive thread signaling
-    private static final int SPIN_COUNT = 10;
-    private static final int YIELD_COUNT = 100;
-    private static final int TINY_PARK_COUNT = 1000;
-    private static final int TINY_PARK_NANOS = 1;
-    private static final int SMALL_PARK_COUNT = 101_000;
-    private static final int SMALL_PARK_NANOS = 10_000;
+    protected final ProviderSynchronization synchronization;
 
     // States used to track progress of this future
-    private static final int INCOMPLETE = 0;
-    private static final int COMPLETING = 1;
-    private static final int SUCCESS = 2;
-    private static final int FAILURE = 3;
+    protected static final int INCOMPLETE = 0;
+    protected static final int COMPLETING = 1;
+    protected static final int SUCCESS = 2;
+    protected static final int FAILURE = 3;
 
-    private static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER =
-            AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state");
+    protected static final AtomicIntegerFieldUpdater<ProviderFuture> STATE_FIELD_UPDATER =
+             AtomicIntegerFieldUpdater.newUpdater(ProviderFuture.class,"state");
 
     private volatile int state = INCOMPLETE;
-    private Throwable error;
-
-    private int waiting;
-
-    private final ProviderSynchronization synchronization;
+    protected Throwable error;
+    protected int waiting;
 
     public ProviderFuture() {
         this(null);
@@ -102,6 +91,13 @@ public class ProviderFuture implements AsyncResult {
     }
 
     /**
+     * Waits for a response to some Provider requested operation.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public abstract void sync() throws IOException;
+
+    /**
      * Timed wait for a response to a Provider operation.
      *
      * @param amount
@@ -114,132 +110,9 @@ public class ProviderFuture implements AsyncResult {
      *
      * @throws IOException if an error occurs while waiting for the response.
      */
-    public boolean sync(long amount, TimeUnit unit) throws IOException {
-        try {
-            if (isComplete() || amount == 0) {
-                failOnError();
-                return true;
-            }
-
-            final Thread currentThread = Thread.currentThread();
-            final long timeout = unit.toNanos(amount);
-            long maxParkNanos = timeout / 8;
-            maxParkNanos = maxParkNanos > 0 ? maxParkNanos : timeout;
-            final long tinyParkNanos = Math.min(maxParkNanos, TINY_PARK_NANOS);
-            final long smallParkNanos = Math.min(maxParkNanos, SMALL_PARK_NANOS);
-            final long startTime = System.nanoTime();
-            int idleCount = 0;
-
-            while (true) {
-                if (currentThread.isInterrupted()) {
-                    throw new InterruptedException();
-                }
-
-                final long elapsed = System.nanoTime() - startTime;
-                final long diff = elapsed - timeout;
-
-                if (diff >= 0) {
-                    failOnError();
-                    return isComplete();
-                }
-
-                if (isComplete()) {
-                    failOnError();
-                    return true;
-                }
-
-                if (idleCount < SPIN_COUNT) {
-                    idleCount++;
-                } else if (idleCount < YIELD_COUNT) {
-                    Thread.yield();
-                    idleCount++;
-                } else if (idleCount < TINY_PARK_COUNT) {
-                    LockSupport.parkNanos(tinyParkNanos);
-                    idleCount++;
-                } else if (idleCount < SMALL_PARK_COUNT) {
-                    LockSupport.parkNanos(smallParkNanos);
-                    idleCount++;
-                } else {
-                    synchronized (this) {
-                        if (isComplete()) {
-                            failOnError();
-                            return true;
-                        }
-
-                        waiting++;
-                        try {
-                            wait(-diff / 1000000, (int) (-diff % 1000000));
-                        } finally {
-                            waiting--;
-                        }
-                    }
-                }
-            }
-        } catch (InterruptedException e) {
-            Thread.interrupted();
-            throw IOExceptionSupport.create(e);
-        }
-    }
-
-    /**
-     * Waits for a response to some Provider requested operation.
-     *
-     * @throws IOException if an error occurs while waiting for the response.
-     */
-    public void sync() throws IOException {
-        try {
-            if (isComplete()) {
-                failOnError();
-                return;
-            }
-
-            final Thread currentThread = Thread.currentThread();
-            int idleCount = 0;
-
-            while (true) {
-                if (currentThread.isInterrupted()) {
-                    throw new InterruptedException();
-                }
-
-                if (isComplete()) {
-                    failOnError();
-                    return;
-                }
-
-                if (idleCount < SPIN_COUNT) {
-                    idleCount++;
-                } else if (idleCount < YIELD_COUNT) {
-                    Thread.yield();
-                    idleCount++;
-                } else if (idleCount < TINY_PARK_COUNT) {
-                    LockSupport.parkNanos(TINY_PARK_NANOS);
-                    idleCount++;
-                } else if (idleCount < SMALL_PARK_COUNT) {
-                    LockSupport.parkNanos(SMALL_PARK_NANOS);
-                    idleCount++;
-                } else {
-                    synchronized (this) {
-                        if (isComplete()) {
-                            failOnError();
-                            return;
-                        }
-
-                        waiting++;
-                        try {
-                            wait();
-                        } finally {
-                            waiting--;
-                        }
-                    }
-                }
-            }
-        } catch (InterruptedException e) {
-            Thread.interrupted();
-            throw IOExceptionSupport.create(e);
-        }
-    }
+    public abstract boolean sync(long amount, TimeUnit unit) throws IOException;
 
-    private void failOnError() throws IOException {
+    protected void failOnError() throws IOException {
         Throwable cause = error;
         if (cause != null) {
             throw IOExceptionSupport.create(cause);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java
new file mode 100644
index 0000000..95868c0
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFutureFactory.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.util.Map;
+
+/**
+ * Factory for provider future instances that will create specific versions based on
+ * configuration.
+ */
+public abstract class ProviderFutureFactory {
+
+    public static final String PROVIDER_FUTURE_TYPE_KEY = "futureType";
+
+    private static final String OS_NAME = System.getProperty("os.name");
+    private static final String WINDOWS_OS_PREFIX = "Windows";
+    private static final boolean IS_WINDOWS = isOsNameMatch(OS_NAME, WINDOWS_OS_PREFIX);
+
+    private static final String CONSERVATIVE = "conservative";
+    private static final String BALANCED = "balanced";
+    private static final String PROGRESSIVE = "progressive";
+
+    /**
+     * Create a new Provider
+     *
+     * @param providerOptions
+     * 		Configuration options to be consumed by this factory create method
+     *
+     * @return a new ProviderFutureFactory that will be used to create the desired future types.
+     */
+    public static ProviderFutureFactory create(Map<String, String> providerOptions) {
+        String futureTypeKey = providerOptions.remove(PROVIDER_FUTURE_TYPE_KEY);
+
+        if (futureTypeKey == null || futureTypeKey.isEmpty()) {
+            if (Runtime.getRuntime().availableProcessors() < 4) {
+                return new ConservativeProviderFutureFactory();
+            } else if (isWindows()) {
+                return new BalancedProviderFutureFactory();
+            } else {
+                return new ProgressiveProviderFutureFactory();
+            }
+        }
+
+        switch (futureTypeKey.toLowerCase()) {
+            case CONSERVATIVE:
+                return new ConservativeProviderFutureFactory();
+            case BALANCED:
+                return new BalancedProviderFutureFactory();
+            case PROGRESSIVE:
+                return new ProgressiveProviderFutureFactory();
+            default:
+                throw new IllegalArgumentException(
+                    "No ProviderFuture implementation with name " + futureTypeKey + " found");
+        }
+    }
+
+    /**
+     * @return a new ProviderFuture instance.
+     */
+    public abstract ProviderFuture createFuture();
+
+    /**
+     * @param synchronization
+     * 		The {@link ProviderSynchronization} to assign to the returned {@link ProviderFuture}.
+     *
+     * @return a new ProviderFuture instance.
+     */
+    public abstract ProviderFuture createFuture(ProviderSynchronization synchronization);
+
+    /**
+     * @return a ProviderFuture that treats failures as success calls that simply complete the operation.
+     */
+    public abstract ProviderFuture createUnfailableFuture();
+
+    //----- Internal support methods -----------------------------------------//
+
+    private static boolean isWindows() {
+        return IS_WINDOWS;
+    }
+
+    private static boolean isOsNameMatch(final String currentOSName, final String osNamePrefix) {
+        if (currentOSName == null || currentOSName.isEmpty()) {
+            return false;
+        }
+
+        return currentOSName.startsWith(osNamePrefix);
+    }
+
+    //----- ProviderFutureFactory implementation -----------------------------//
+
+    private static class ConservativeProviderFutureFactory extends ProviderFutureFactory {
+
+        @Override
+        public ProviderFuture createFuture() {
+            return new ConservativeProviderFuture();
+        }
+
+        @Override
+        public ProviderFuture createFuture(ProviderSynchronization synchronization) {
+            return new ConservativeProviderFuture(synchronization);
+        }
+
+        @Override
+        public ProviderFuture createUnfailableFuture() {
+            return new ConservativeProviderFuture() {
+
+                @Override
+                public void onFailure(Throwable t) {
+                    this.onSuccess();
+                }
+            };
+        }
+    }
+
+    private static class BalancedProviderFutureFactory extends ProviderFutureFactory {
+
+        @Override
+        public ProviderFuture createFuture() {
+            return new BalancedProviderFuture();
+        }
+
+        @Override
+        public ProviderFuture createFuture(ProviderSynchronization synchronization) {
+            return new BalancedProviderFuture(synchronization);
+        }
+
+        @Override
+        public ProviderFuture createUnfailableFuture() {
+            return new BalancedProviderFuture() {
+
+                @Override
+                public void onFailure(Throwable t) {
+                    this.onSuccess();
+                }
+            };
+        }
+    }
+
+    private static class ProgressiveProviderFutureFactory extends ProviderFutureFactory {
+
+        @Override
+        public ProviderFuture createFuture() {
+            return new ProgressiveProviderFuture();
+        }
+
+        @Override
+        public ProviderFuture createFuture(ProviderSynchronization synchronization) {
+            return new ProgressiveProviderFuture(synchronization);
+        }
+
+        @Override
+        public ProviderFuture createUnfailableFuture() {
+            return new ProgressiveProviderFuture() {
+
+                @Override
+                public void onFailure(Throwable t) {
+                    this.onSuccess();
+                }
+            };
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 6dfa4c6..2a4eb09 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -146,6 +146,16 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
     }
 
     @Override
+    public ProviderFuture newProviderFuture() {
+        return next.newProviderFuture();
+    }
+
+    @Override
+    public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return next.newProviderFuture(synchronization);
+    }
+
+    @Override
     public void setProviderListener(ProviderListener listener) {
         this.listener = listener;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index a4dbc39..1c97dfe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -60,7 +60,9 @@ import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFailedException;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder;
 import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
 import org.apache.qpid.jms.sasl.Mechanism;
@@ -145,6 +147,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     private final Collector protonCollector = new CollectorImpl();
     private final Connection protonConnection = Connection.Factory.create();
 
+    private final ProviderFutureFactory futureFactory;
     private AsyncResult connectionRequest;
     private ScheduledFuture<?> nextIdleTimeoutCheck;
 
@@ -155,10 +158,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
      *        The URI of the AMQP broker this Provider instance will connect to.
      * @param transport
      * 		  The underlying Transport that will be used for wire level communications.
+     * @param futureFactory
+     * 		  The ProviderFutureFactory to use when futures are requested.
      */
-    public AmqpProvider(URI remoteURI, Transport transport) {
+    public AmqpProvider(URI remoteURI, Transport transport, ProviderFutureFactory futureFactory) {
         this.remoteURI = remoteURI;
         this.transport = transport;
+        this.futureFactory = futureFactory;
 
         serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory(
             "AmqpProvider :(" + PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
@@ -172,7 +178,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     public void connect(final JmsConnectionInfo connectionInfo) throws IOException {
         checkClosedOrFailed();
 
-        final ProviderFuture connectRequest = new ProviderFuture();
+        final ProviderFuture connectRequest = futureFactory.createFuture();
 
         serializer.execute(new Runnable() {
 
@@ -302,15 +308,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
     @Override
     public void close() {
         if (closed.compareAndSet(false, true)) {
-            final ProviderFuture request = new ProviderFuture() {
-
-                @Override
-                public void onFailure(Throwable result) {
-                    // During close it is fine if the close call fails
-                    // this in unrecoverable so we just log the event.
-                    onSuccess();
-                }
-            };
+            final ProviderFuture request = futureFactory.createUnfailableFuture();
 
             serializer.execute(new Runnable() {
 
@@ -1138,6 +1136,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
         return connection.getAmqpMessageFactory();
     }
 
+    @Override
+    public ProviderFuture newProviderFuture() {
+        return futureFactory.createFuture();
+    }
+
+    @Override
+    public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return futureFactory.createFuture(synchronization);
+    }
+
     public void setTraceFrames(boolean trace) {
         this.traceFrames = trace;
         updateTracer();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
index c141e26..a0083cf 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
@@ -20,6 +20,7 @@ import java.net.URI;
 import java.util.Map;
 
 import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.transports.Transport;
 import org.apache.qpid.jms.transports.TransportFactory;
 import org.apache.qpid.jms.util.PropertyUtil;
@@ -37,19 +38,40 @@ public class AmqpProviderFactory extends ProviderFactory {
 
     @Override
     public AmqpProvider createProvider(URI remoteURI) throws Exception {
+        return createProvider(remoteURI, null);
+    }
 
+    @Override
+    public AmqpProvider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
         Map<String, String> map = PropertyUtil.parseQuery(remoteURI);
-        Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "amqp.");
 
-        // Clear off any amqp.X values from the transport before creation.
+        // Clear off any amqp.X and provider.X values from the transport before creation.
+        Map<String, String> amqpProviderOptions = PropertyUtil.filterProperties(map, "amqp.");
+        Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "provider.");
+
         Transport transport = TransportFactory.create(getTransportScheme(), PropertyUtil.replaceQuery(remoteURI, map));
 
-        AmqpProvider result = new AmqpProvider(remoteURI, transport);
+        // If we have been given a futures factory to use then we ignore any URI options indicating
+        // what to create and just go with what we are given.
+        if (futureFactory == null) {
+            // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+            futureFactory = ProviderFutureFactory.create(providerOptions);
+            if (!providerOptions.isEmpty()) {
+                String msg = ""
+                    + " Not all Provider options could be applied during AMQP Provider creation."
+                    + " Check the options are spelled correctly."
+                    + " Unused parameters=[" + providerOptions + "]."
+                    + " This provider instance cannot be started.";
+                throw new IllegalArgumentException(msg);
+            }
+        }
+
+        AmqpProvider result = new AmqpProvider(remoteURI, transport, futureFactory);
 
-        Map<String, String> unused = PropertyUtil.setProperties(result, providerOptions);
+        Map<String, String> unused = PropertyUtil.setProperties(result, amqpProviderOptions);
         if (!unused.isEmpty()) {
             String msg = ""
-                + " Not all provider options could be set on the AMQP Provider."
+                + " Not all AMQP provider options could be set on the AMQP Provider."
                 + " Check the options are spelled correctly."
                 + " Unused parameters=[" + unused + "]."
                 + " This provider instance cannot be started.";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index b48f4c9..654ec48 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -52,8 +52,10 @@ import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFactory;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderRedirectedException;
+import org.apache.qpid.jms.provider.ProviderSynchronization;
 import org.apache.qpid.jms.provider.WrappedAsyncResult;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.jms.util.QpidJMSThreadFactory;
@@ -100,6 +102,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     private final Map<Long, FailoverRequest> requests = new LinkedHashMap<Long, FailoverRequest>();
     private final DefaultProviderListener closedListener = new DefaultProviderListener();
     private final AtomicReference<JmsMessageFactory> messageFactory = new AtomicReference<JmsMessageFactory>();
+    private final ProviderFutureFactory futureFactory;
 
     // Current state of connection / reconnection
     private final ReconnectControls reconnectControl = new ReconnectControls();
@@ -124,16 +127,17 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
 
     private FailoverServerListAction amqpOpenServerListAction = FailoverServerListAction.REPLACE;
 
-    public FailoverProvider(Map<String, String> nestedOptions) {
-        this(null, nestedOptions);
+    public FailoverProvider(Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) {
+        this(null, nestedOptions, futureFactory);
     }
 
-    public FailoverProvider(List<URI> uris) {
-        this(uris, null);
+    public FailoverProvider(List<URI> uris, ProviderFutureFactory futureFactory) {
+        this(uris, null, futureFactory);
     }
 
-    public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions) {
+    public FailoverProvider(List<URI> uris, Map<String, String> nestedOptions, ProviderFutureFactory futureFactory) {
         this.uris = new FailoverUriPool(uris, nestedOptions);
+        this.futureFactory = futureFactory;
 
         serializer = new ScheduledThreadPoolExecutor(1, new QpidJMSThreadFactory("FailoverProvider: serialization thread", true));
         serializer.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@@ -167,7 +171,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     @Override
     public void close() {
         if (closed.compareAndSet(false, true)) {
-            final ProviderFuture request = new ProviderFuture();
+            final ProviderFuture request = futureFactory.createFuture();
             serializer.execute(new Runnable() {
 
                 @Override
@@ -700,7 +704,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
                             try {
                                 LOG.debug("Connection attempt:[{}] to: {} in-progress", reconnectAttempts,
                                     target.getScheme() + "://" + target.getHost() + ":" + target.getPort());
-                                provider = ProviderFactory.create(target);
+                                provider = ProviderFactory.create(target, futureFactory);
                                 provider.connect(connectionInfo);
                                 initializeNewConnection(provider);
                                 return;
@@ -1045,6 +1049,16 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
     }
 
     @Override
+    public ProviderFuture newProviderFuture() {
+        return futureFactory.createFuture();
+    }
+
+    @Override
+    public ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
+        return futureFactory.createFuture(synchronization);
+    }
+
+    @Override
     public String toString() {
         return "FailoverProvider: " +
                (connectedURI == null ? "unconnected" : connectedURI.toString());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
index 3914784..5d33763 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProviderFactory.java
@@ -21,6 +21,7 @@ import java.util.Map;
 
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.apache.qpid.jms.util.PropertyUtil;
 import org.apache.qpid.jms.util.URISupport;
 import org.apache.qpid.jms.util.URISupport.CompositeData;
@@ -42,13 +43,34 @@ public class FailoverProviderFactory extends ProviderFactory {
 
     @Override
     public Provider createProvider(URI remoteURI) throws Exception {
+        return createProvider(remoteURI, null);
+    }
+
+    @Override
+    public Provider createProvider(URI remoteURI, ProviderFutureFactory futureFactory) throws Exception {
         CompositeData composite = URISupport.parseComposite(remoteURI);
         Map<String, String> options = composite.getParameters();
 
         Map<String, String> filtered = PropertyUtil.filterProperties(options, FAILOVER_OPTION_PREFIX);
         Map<String, String> nested = PropertyUtil.filterProperties(filtered, FAILOVER_NESTED_OPTION_PREFIX_ADDON);
 
-        FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested);
+        Map<String, String> providerOptions = PropertyUtil.filterProperties(options, "provider.");
+        // If we have been given a futures factory to use then we ignore any URI options indicating
+        // what to create and just go with what we are given.
+        if (futureFactory == null) {
+            // Create a configured ProviderFutureFactory for use by the resulting AmqpProvider
+            futureFactory = ProviderFutureFactory.create(providerOptions);
+            if (!providerOptions.isEmpty()) {
+                String msg = ""
+                    + " Not all Provider options could be applied during Failover Provider creation."
+                    + " Check the options are spelled correctly."
+                    + " Unused parameters=[" + providerOptions + "]."
+                    + " This provider instance cannot be started.";
+                throw new IllegalArgumentException(msg);
+            }
+        }
+
+        FailoverProvider provider = new FailoverProvider(composite.getComponents(), nested, futureFactory);
         Map<String, String> unused = PropertyUtil.setProperties(provider, filtered);
         if (!unused.isEmpty()) {
             String msg = ""

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java
new file mode 100644
index 0000000..8cd03ce
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureFactoryTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+public class ProviderFutureFactoryTest {
+
+    @Test
+    public void testCreateFailsWithNullOptions() {
+        try {
+            ProviderFutureFactory.create(null);
+            fail("Should throw NullPointerException");
+        } catch (NullPointerException npe) {}
+    }
+
+    @Test
+    public void testCreateFailsWhenFutureTypeNotValid() {
+        Map<String, String> options = new HashMap<>();
+
+        options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "super-fast");
+
+        try {
+            ProviderFutureFactory.create(options);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException iae) {}
+    }
+
+    @Test
+    public void testCreateFactoryWithNoConfigurationOptionsGiven() {
+        ProviderFutureFactory factory = ProviderFutureFactory.create(Collections.emptyMap());
+
+        ProviderFuture future = factory.createFuture();
+        assertNotNull(future);
+        assertFalse(future.isComplete());
+    }
+
+    @Test
+    public void testCreateConservativeFactoryFromConfiguration() {
+        Map<String, String> options = new HashMap<>();
+
+        options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "conservative");
+
+        ProviderFutureFactory factory = ProviderFutureFactory.create(options);
+
+        ProviderFuture future = factory.createFuture();
+        assertNotNull(future);
+        assertFalse(future.isComplete());
+
+        assertTrue(future instanceof ConservativeProviderFuture);
+    }
+
+    @Test
+    public void testCreateBalancedFactoryFromConfiguration() {
+        Map<String, String> options = new HashMap<>();
+
+        options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "balanced");
+
+        ProviderFutureFactory factory = ProviderFutureFactory.create(options);
+
+        ProviderFuture future = factory.createFuture();
+        assertNotNull(future);
+        assertFalse(future.isComplete());
+
+        assertTrue(future instanceof BalancedProviderFuture);
+    }
+
+    @Test
+    public void testCreateProgressiveFactoryFromConfiguration() {
+        Map<String, String> options = new HashMap<>();
+
+        options.put(ProviderFutureFactory.PROVIDER_FUTURE_TYPE_KEY, "progressive");
+
+        ProviderFutureFactory factory = ProviderFutureFactory.create(options);
+
+        ProviderFuture future = factory.createFuture();
+        assertNotNull(future);
+        assertFalse(future.isComplete());
+
+        assertTrue(future instanceof ProgressiveProviderFuture);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
index 42258ed..601c879 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/ProviderFutureTest.java
@@ -22,17 +22,41 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 public class ProviderFutureTest {
 
+    private final ProviderFutureFactory futuresFactory;
+
+    @Parameters(name = "{index}: futureType={0}")
+    public static Collection<Object> data() {
+        return Arrays.asList(new Object[] {
+                 "conservative", "balanced", "progressive" }
+           );
+    }
+
+    public ProviderFutureTest(String futureTypeName) {
+        Map<String, String> options = new HashMap<>();
+        options.put("futureType", futureTypeName);
+
+        futuresFactory = ProviderFutureFactory.create(options);
+    }
+
     @Test
     public void testIsComplete() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
 
         assertFalse(future.isComplete());
         future.onSuccess();
@@ -41,7 +65,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 10000)
     public void testOnSuccess() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
 
         future.onSuccess();
         try {
@@ -53,7 +77,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 90000)
     public void testTimedSync() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
 
         try {
             assertFalse(future.sync(1, TimeUnit.SECONDS));
@@ -64,7 +88,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 10000)
     public void testOnFailure() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
         IOException ex = new IOException();
 
         future.onFailure(ex);
@@ -79,7 +103,7 @@ public class ProviderFutureTest {
     @Test(timeout = 10000)
     public void testOnSuccessCallsSynchronization() {
         final AtomicBoolean syncCalled = new AtomicBoolean(false);
-        ProviderFuture future = new ProviderFuture(new ProviderSynchronization() {
+        ProviderFuture future = futuresFactory.createFuture(new ProviderSynchronization() {
 
             @Override
             public void onPendingSuccess() {
@@ -104,7 +128,7 @@ public class ProviderFutureTest {
     @Test(timeout = 10000)
     public void testOnFailureCallsSynchronization() {
         final AtomicBoolean syncCalled = new AtomicBoolean(false);
-        ProviderFuture future = new ProviderFuture(new ProviderSynchronization() {
+        ProviderFuture future = futuresFactory.createFuture(new ProviderSynchronization() {
 
             @Override
             public void onPendingSuccess() {
@@ -131,7 +155,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 10000)
     public void testSuccessfulStateIsFixed() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
         IOException ex = new IOException();
 
         future.onSuccess();
@@ -145,7 +169,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 10000)
     public void testFailedStateIsFixed() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
         IOException ex = new IOException();
 
         future.onFailure(ex);
@@ -160,7 +184,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 10000)
     public void testSyncHandlesInterruption() throws InterruptedException {
-        final ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
 
         final CountDownLatch syncing = new CountDownLatch(1);
         final CountDownLatch done = new CountDownLatch(1);
@@ -193,7 +217,7 @@ public class ProviderFutureTest {
 
     @Test(timeout = 10000)
     public void testTimedSyncHandlesInterruption() throws InterruptedException {
-        final ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
 
         final CountDownLatch syncing = new CountDownLatch(1);
         final CountDownLatch done = new CountDownLatch(1);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
index 52e888b..6fea518 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/WrappedAsyncResultTest.java
@@ -21,11 +21,14 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.Collections;
 
 import org.junit.Test;
 
 public class WrappedAsyncResultTest {
 
+    private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap());
+
     @Test
     public void testCreateWithNull() {
         try {
@@ -35,7 +38,7 @@ public class WrappedAsyncResultTest {
 
     @Test
     public void testGetWrapped() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
         WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {};
 
         assertSame(wrapped.getWrappedRequest(), future);
@@ -43,7 +46,7 @@ public class WrappedAsyncResultTest {
 
     @Test
     public void testOnSuccessPassthrough() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
         WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {};
 
         assertFalse(future.isComplete());
@@ -55,7 +58,7 @@ public class WrappedAsyncResultTest {
 
     @Test
     public void testOnFailurePassthrough() {
-        ProviderFuture future = new ProviderFuture();
+        ProviderFuture future = futuresFactory.createFuture();
         WrappedAsyncResult wrapped = new WrappedAsyncResult(future) {};
 
         assertFalse(future.isComplete());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index b26cb5f..53f3c6a 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -323,7 +323,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
                 fail("Should have thrown an IOException when closed.");
             } catch (IOException ex) {}
 
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
             try {
                 provider.unsubscribe("subscription-name", request);
                 fail("Should have thrown an IOException when closed.");
@@ -360,7 +360,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             connectionInfo.setSendTimeout(SEND_TIMEOUT);
             connectionInfo.setRequestTimeout(REQUEST_TIMEOUT);
 
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
             provider.create(connectionInfo, request);
             request.sync();
 
@@ -415,7 +415,7 @@ public class AmqpProviderTest extends QpidJmsTestCase {
             };
 
             assertFalse("Error should not yet be thrown", errorThrown.get());
-            ProviderFuture request = new ProviderFuture();
+            ProviderFuture request = provider.newProviderFuture();
 
             switch(operation) {
             case CREATE:

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/264a9a9b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
index 7cf686c..c5d4197 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderClosedTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.failover;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
@@ -28,6 +29,7 @@ import org.apache.qpid.jms.meta.JmsTransactionId;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderFutureFactory;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -36,6 +38,8 @@ import org.junit.Test;
  */
 public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
 
+    private final ProviderFutureFactory futuresFactory = ProviderFutureFactory.create(Collections.emptyMap());
+
     private FailoverProvider provider;
     private JmsConnectionInfo connection;
     private JmsSessionInfo session;
@@ -71,49 +75,49 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
 
     @Test(timeout=30000, expected=IOException.class)
     public void testCreateResource() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.create(connection, request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testStartResource() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.start(session, request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testStopResource() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.stop(session, request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testDestroyResource() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.destroy(session, request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testSend() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.send(new JmsOutboundMessageDispatch(), request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testSessionAcknowledge() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.acknowledge(session.getId(), ACK_TYPE.ACCEPTED, request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testAcknowledgeMessage() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.acknowledge(new JmsInboundMessageDispatch(1), ACK_TYPE.ACCEPTED, request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testCommit() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         JmsTransactionId txId = new JmsTransactionId(connection.getId(), 1);
         JmsTransactionInfo txInfo = new JmsTransactionInfo(session.getId(), txId);
         provider.commit(txInfo, null, request);
@@ -121,7 +125,7 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
 
     @Test(timeout=30000, expected=IOException.class)
     public void testRollback() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         JmsTransactionId txId = new JmsTransactionId(connection.getId(), 1);
         JmsTransactionInfo txInfo = new JmsTransactionInfo(session.getId(), txId);
         provider.rollback(txInfo, null, request);
@@ -129,19 +133,19 @@ public class FailoverProviderClosedTest extends FailoverProviderTestSupport {
 
     @Test(timeout=30000, expected=IOException.class)
     public void testRecover() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.recover(session.getId(), request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testUnsubscribe() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.unsubscribe("subscription-name", request);
     }
 
     @Test(timeout=30000, expected=IOException.class)
     public void testMessagePull() throws Exception {
-        ProviderFuture request = new ProviderFuture();
+        ProviderFuture request = futuresFactory.createFuture();
         provider.pull(consumer.getId(), 1, request);
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org