You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:44 UTC

[20/27] Initial drop of donated AMQP Client Code.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
new file mode 100644
index 0000000..ee01add
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResource.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+
+/**
+ * Base class for the JMS object representing JMS resources such as Connection, Session, etc.
+ */
+public interface JmsResource {
+
+    /**
+     * Allows a visitor object to walk the resources and process them.
+     *
+     * @param visitor
+     *        The visitor instance that is processing this resource.
+     *
+     * @throws Exception if an error occurs while visiting this resource.
+     */
+    void visit(JmsResourceVistor visitor) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
new file mode 100644
index 0000000..32ee783
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceId.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+/**
+ * Base for all Id type classes used in the JMS Framework
+ */
+public interface JmsResourceId {
+
+    /**
+     * Allows a Provider to embed a hint in this Id value for later use. The
+     * hint can allow the provider to more easier locate state data for a resource
+     *
+     * @param hint
+     *        The value to add into this Id.
+     */
+    void setProviderHint(Object hint);
+
+    /**
+     * Return the previously stored Provider hint object.
+     *
+     * @return the previously stored Provider hint object.
+     */
+    Object getProviderHint();
+
+    /**
+     * Allows a provider to set it's own internal Id object for this resource
+     * in the case where the JMS framework Id cannot be used directly by the
+     * Provider implementation.
+     *
+     * @param id
+     */
+    void setProviderId(Object id);
+
+    /**
+     * Returns the previously stored Provider Id value.
+     *
+     * @return the previously stored Provider Id value.
+     */
+    Object getProviderId();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java
new file mode 100644
index 0000000..1e963d9
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsResourceVistor.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+import org.apache.qpid.jms.JmsDestination;
+
+/**
+ * Visitor interface to make processing JmsResources simpler.
+ */
+public interface JmsResourceVistor {
+
+    void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception;
+
+    void processSessionInfo(JmsSessionInfo sessionInfo) throws Exception;
+
+    void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception;
+
+    void processProducerInfo(JmsProducerInfo producerInfo) throws Exception;
+
+    void processTransactionInfo(JmsTransactionInfo transactionInfo) throws Exception;
+
+    void processDestination(JmsDestination destination) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java
new file mode 100644
index 0000000..de0d452
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionId.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+public final class JmsSessionId extends JmsAbstractResourceId implements Comparable<JmsSessionId> {
+
+    private final String connectionId;
+    private final long value;
+
+    protected transient String key;
+    protected transient JmsConnectionId parentId;
+
+    public JmsSessionId(String connectionId, long value) {
+        this.connectionId = connectionId;
+        this.value = value;
+    }
+
+    public JmsSessionId(JmsConnectionId connectionId, long sessionId) {
+        this.connectionId = connectionId.getValue();
+        this.value = sessionId;
+        this.parentId = connectionId;
+    }
+
+    public JmsSessionId(JmsSessionId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getValue();
+    }
+
+    public JmsSessionId(JmsProducerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public JmsSessionId(JmsConsumerId id) {
+        this.connectionId = id.getConnectionId();
+        this.value = id.getSessionId();
+    }
+
+    public JmsConnectionId getParentId() {
+        if (parentId == null) {
+            parentId = new JmsConnectionId(this);
+        }
+        return parentId;
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || o.getClass() != JmsSessionId.class) {
+            return false;
+        }
+        JmsSessionId id = (JmsSessionId)o;
+        return value == id.value && connectionId.equals(id.connectionId);
+    }
+
+    public String getConnectionId() {
+        return connectionId;
+    }
+
+    public long getValue() {
+        return value;
+    }
+
+    @Override
+    public String toString() {
+        if (key == null) {
+            key = connectionId + ":" + value;
+        }
+        return key;
+    }
+
+    @Override
+    public int compareTo(JmsSessionId other) {
+        return toString().compareTo(other.toString());
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
new file mode 100644
index 0000000..04f57d8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsSessionInfo.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+import javax.jms.Session;
+
+public final class JmsSessionInfo implements JmsResource {
+
+    private final JmsSessionId sessionId;
+    private int acknowledgementMode;
+    private boolean sendAcksAsync;
+
+    public JmsSessionInfo(JmsConnectionInfo connectionMeta, long sessionId) {
+        this.sessionId = new JmsSessionId(connectionMeta.getConnectionId(), sessionId);
+    }
+
+    public JmsSessionInfo(JmsSessionId sessionId) {
+        this.sessionId = sessionId;
+    }
+
+    public JmsSessionId getSessionId() {
+        return sessionId;
+    }
+
+    @Override
+    public void visit(JmsResourceVistor vistor) throws Exception {
+        vistor.processSessionInfo(this);
+    }
+
+    public int getAcknowledgementMode() {
+        return acknowledgementMode;
+    }
+
+    public void setAcknowledgementMode(int acknowledgementMode) {
+        this.acknowledgementMode = acknowledgementMode;
+    }
+
+    public boolean isTransacted() {
+        return this.acknowledgementMode == Session.SESSION_TRANSACTED;
+    }
+
+    public boolean isSendAcksAsync() {
+        return sendAcksAsync;
+    }
+
+    public void setSendAcksAsync(boolean sendAcksAsync) {
+        this.sendAcksAsync = sendAcksAsync;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java
new file mode 100644
index 0000000..226aedf
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionId.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+public final class JmsTransactionId extends JmsAbstractResourceId implements Comparable<JmsTransactionId> {
+
+    private final JmsConnectionId connectionId;
+    private final long value;
+
+    private transient String transactionKey;
+
+    public JmsTransactionId(JmsConnectionId connectionId, long transactionId) {
+        this.connectionId = connectionId;
+        this.value = transactionId;
+    }
+
+    public String getTransactionKey() {
+        if (transactionKey == null) {
+            transactionKey = "TX:" + connectionId + ":" + value;
+        }
+        return transactionKey;
+    }
+
+    @Override
+    public String toString() {
+        return getTransactionKey();
+    }
+
+    @Override
+    public int hashCode() {
+        if (hashCode == 0) {
+            hashCode = connectionId.hashCode() ^ (int)value;
+        }
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+        if (other == null || other.getClass() != JmsTransactionId.class) {
+            return false;
+        }
+
+        JmsTransactionId tx = (JmsTransactionId) other;
+
+        return value == tx.value && connectionId.equals(tx.connectionId);
+    }
+
+    @Override
+    public int compareTo(JmsTransactionId o) {
+        int result = connectionId.compareTo(o.connectionId);
+        if (result == 0) {
+            result = (int)(value - o.value);
+        }
+        return result;
+    }
+
+    public long getValue() {
+        return value;
+    }
+
+    public JmsConnectionId getConnectionId() {
+        return connectionId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
new file mode 100644
index 0000000..5fca50f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsTransactionInfo.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.meta;
+
+public final class JmsTransactionInfo implements JmsResource, Comparable<JmsTransactionInfo> {
+
+    protected final JmsSessionId sessionId;
+    protected JmsTransactionId transactionId;
+
+    public JmsTransactionInfo(JmsSessionId sessionId, JmsTransactionId transactionId) {
+        this.sessionId = sessionId;
+        this.transactionId = transactionId;
+    }
+
+    public JmsTransactionInfo copy() {
+        return new JmsTransactionInfo(sessionId, transactionId);
+    }
+
+    public JmsSessionId getSessionId() {
+        return sessionId;
+    }
+
+    public JmsTransactionId getTransactionId() {
+        return transactionId;
+    }
+
+    public void setTransactionId(JmsTransactionId transactionId) {
+        this.transactionId = transactionId;
+    }
+
+    public JmsSessionId getParentId() {
+        return this.sessionId;
+    }
+
+    @Override
+    public int hashCode() {
+        return (transactionId == null) ? 0 : transactionId.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+
+        JmsTransactionInfo other = (JmsTransactionInfo) obj;
+
+        if (transactionId == null && other.transactionId != null) {
+            return false;
+        } else if (!transactionId.equals(other.transactionId)) {
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public int compareTo(JmsTransactionInfo other) {
+        return this.transactionId.compareTo(other.transactionId);
+    }
+
+    @Override
+    public String toString() {
+        return "JmsTransactionInfo { " + this.transactionId + " }";
+    }
+
+    @Override
+    public void visit(JmsResourceVistor visitor) throws Exception {
+        visitor.processTransactionInfo(this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html b/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html
new file mode 100644
index 0000000..3d678ce
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/package.html
@@ -0,0 +1,25 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<html>
+<head>
+</head>
+<body>
+
+The core AMQP JMS client implementation classes.
+
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
new file mode 100644
index 0000000..9d743d2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * Base class used to implement the most common features of a Provider instance..
+ *
+ * Methods that are fully optional such as transaction commit and rollback are implemented
+ * here to throw an UnsupportedOperationException.
+ */
+public abstract class AbstractProvider implements Provider {
+
+    protected final URI remoteURI;
+    protected final AtomicBoolean closed = new AtomicBoolean();
+    protected final ScheduledExecutorService serializer;
+
+    protected ProviderListener listener;
+
+    public AbstractProvider(URI remoteURI) {
+        this.remoteURI = remoteURI;
+
+        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName(toString());
+                return serial;
+            }
+        });
+    }
+
+    @Override
+    public void start() throws IOException, IllegalStateException {
+        checkClosed();
+
+        if (listener == null) {
+            throw new IllegalStateException("No ProviderListener registered.");
+        }
+    }
+
+    @Override
+    public void setProviderListener(ProviderListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public ProviderListener getProviderListener() {
+        return listener;
+    }
+
+    @Override
+    public URI getRemoteURI() {
+        return remoteURI;
+    }
+
+    public void fireProviderException(Throwable ex) {
+        ProviderListener listener = this.listener;
+        if (listener != null) {
+            listener.onConnectionFailure(IOExceptionSupport.create(ex));
+        }
+    }
+
+    protected void checkClosed() throws ProviderClosedException {
+        if (closed.get()) {
+            throw new ProviderClosedException("The Provider is already closed");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java
new file mode 100644
index 0000000..b79a3cc
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AsyncResult.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+/**
+ * Defines a result interface for Asynchronous operations.
+ */
+public interface AsyncResult {
+
+    /**
+     * If the operation fails this method is invoked with the Exception
+     * that caused the failure.
+     *
+     * @param result
+     *        The error that resulted in this asynchronous operation failing.
+     */
+    void onFailure(Throwable result);
+
+    /**
+     * If the operation succeeds the resulting value produced is set to null and
+     * the waiting parties are signaled.
+     */
+    void onSuccess();
+
+    /**
+     * Returns true if the AsyncResult has completed.  The task is considered complete
+     * regardless if it succeeded or failed.
+     *
+     * @return returns true if the asynchronous operation has completed.
+     */
+    boolean isComplete();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
new file mode 100644
index 0000000..e0d3e01
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Default implementation that does nothing for all callbacks.
+ */
+public class DefaultProviderListener implements ProviderListener {
+
+    @Override
+    public void onMessage(JmsInboundMessageDispatch envelope) {
+    }
+
+    @Override
+    public void onConnectionInterrupted(URI remoteURI) {
+    }
+
+    @Override
+    public void onConnectionFailure(IOException ex) {
+    }
+
+    @Override
+    public void onConnectionRecovery(Provider provider) {
+    }
+
+    @Override
+    public void onConnectionRecovered(Provider provider) {
+    }
+
+    @Override
+    public void onConnectionRestored(URI remoteURI) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
new file mode 100644
index 0000000..b8634ea
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/Provider.java
@@ -0,0 +1,292 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+/**
+ * Defines the interface that an Implementation of a Specific wire level protocol
+ * provider must implement.  This Provider interface requires that the implementation
+ * methods all operate in an asynchronous manner.
+ */
+public interface Provider {
+
+    /**
+     * Performs the initial low level connection for this provider such as TCP or
+     * SSL connection to a remote Broker.  If this operation fails then the Provider
+     * is considered to be unusable and no further operations should be attempted
+     * using this Provider.
+     *
+     * @throws IOException if the remote resource can not be contacted.
+     */
+    void connect() throws IOException;
+
+    /**
+     * Starts the Provider.  The start method provides a place for the Provider to perform
+     * and pre-start configuration checks to ensure that the current state is valid and that
+     * all contracts have been met prior to starting.
+     *
+     * @throws IOException if an error occurs during start processing.
+     * @throws IllegalStateException if the Provider is improperly configured.
+     */
+    void start() throws IOException, IllegalStateException;
+
+    /**
+     * Closes this Provider terminating all connections and canceling any pending
+     * operations.  The Provider is considered unusable after this call.  This call
+     * is a blocking call and will not return until the Provider has closed or an
+     * error occurs.
+     */
+    void close();
+
+    /**
+     * Returns the URI used to configure this Provider and specify the remote address of the
+     * Broker it connects to.
+     *
+     * @return the URI used to configure this Provider.
+     */
+    URI getRemoteURI();
+
+    /**
+     * Create the Provider version of the given JmsResource.
+     *
+     * For each JMS Resource type the Provider implementation must create it's own internal
+     * representation and upon successful creation provide the caller with a response.  The
+     * Provider should examine the given JmsResource to determine if the given configuration
+     * is supported or can be simulated, or is not supported in which case an exception should be
+     * thrown.
+     *
+     * It is possible for a Provider to indicate that it cannot complete a requested create
+     * either due to some mis-configuration such as bad login credentials on connection create
+     * by throwing a JMSException.  If the Provider does not support creating of the indicated
+     * resource such as a Temporary Queue etc the provider may throw an UnsupportedOperationException
+     * to indicate this.
+     *
+     * @param resource
+     *        The JmsResouce instance that indicates what is being created.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such as bad credentials.
+     */
+    void create(JmsResource resource, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Starts the Provider version of the given JmsResource.
+     *
+     * For some JMS Resources it is necessary or advantageous to have a started state that
+     * must be triggered prior to it's normal use.
+     *
+     * An example of this would be a MessageConsumer which should not receive any incoming
+     * messages until the JMS layer is in a state to handle them.  One such time would be
+     * after connection recovery.  A JMS consumer should normally recover with it's prefetch
+     * value set to zero, or an AMQP link credit of zero and only open up the credit window
+     * once all Connection resources are restored.
+     *
+     * The provider is required to implement this method and not throw any error other than
+     * an IOException if a communication error occurs.  The start operation is not required to
+     * have any effect on the provider resource but must not throw UnsupportedOperation etc.
+     *
+     * @param resource
+     *        The JmsResouce instance that indicates what is being started.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such as already closed resource.
+     */
+    void start(JmsResource resource, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Instruct the Provider to dispose of a given JmsResource.
+     *
+     * The provider is given a JmsResource which it should use to remove any associated
+     * resources and inform the remote Broker instance of the removal of this resource.
+     *
+     * If the Provider cannot destroy the resource due to a non-communication error such as
+     * the logged in user not have role access to destroy the given resource it may throw an
+     * instance of JMSException to indicate such an error.
+     *
+     * @param resource
+     *        The JmsResouce that identifies a previously created JmsResource.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such as not authorized.
+     */
+    void destroy(JmsResource resourceId, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Sends the JmsMessage contained in the outbound dispatch envelope.
+     *
+     * @param envelope
+     *        the message envelope containing the JmsMessage to send.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error that maps to JMS occurs such as not authorized.
+     */
+    void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Called to acknowledge all messages that have been delivered in a given session.
+     *
+     * This method is typically used by a Session that is configured for client acknowledge
+     * mode.  The acknowledgment should only be applied to Messages that have been marked
+     * as delivered and not those still awaiting dispatch.
+     *
+     * @param sessionId
+     *        the ID of the Session whose delivered messages should be acknowledged.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such as unmatched ack.
+     */
+    void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Called to acknowledge a JmsMessage has been delivered, consumed, re-delivered...etc.
+     *
+     * The provider should perform an acknowledgment for the message based on the configured
+     * mode of the consumer that it was dispatched to and the capabilities of the protocol.
+     *
+     * @param envelope
+     *        The message dispatch envelope containing the Message delivery information.
+     * @param ackType
+     *        The type of acknowledgment being done.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such as unmatched ack.
+     */
+    void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, AsyncResult request)
+        throws IOException, JMSException;
+
+    /**
+     * Called to commit an open transaction.
+     *
+     * If the provider is unable to support transactions then it should throw an
+     * UnsupportedOperationException to indicate this.  The Provider may also throw a
+     * JMSException to indicate a transaction was already rolled back etc.
+     *
+     * @param sessionId
+     *        the Id of the JmsSession that is committing the current transaction.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such not authorized.
+     */
+    void commit(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Called to roll back an open transaction.
+     *
+     * @param sessionId
+     *        the Id of the JmsSession that is rolling back the current transaction.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such not authorized.
+     */
+    void rollback(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Called to recover all unacknowledged messages for a Session in client Ack mode.
+     *
+     * @param sessionId
+     *        the Id of the JmsSession that is recovering unacknowledged messages..
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     */
+    void recover(JmsSessionId sessionId, AsyncResult request) throws IOException;
+
+    /**
+     * Remove a durable topic subscription by name.
+     *
+     * A provider can throw an instance of JMSException to indicate that it cannot perform the
+     * un-subscribe operation due to bad security credentials etc.
+     *
+     * @param subscription
+     *        the name of the durable subscription that is to be removed.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     * @throws JMSException if an error occurs due to JMS violation such not authorized.
+     */
+    void unsubscribe(String subscription, AsyncResult request) throws IOException, JMSException;
+
+    /**
+     * Request a remote peer send a Message to this client.  A message pull request is
+     * usually only needed in the case where the client sets a zero prefetch limit on the
+     * consumer.  If the consumer has a set prefetch that's greater than zero this method
+     * should just return without performing and action.
+     *
+     * @param timeout
+     *        the amount of time to tell the remote peer to keep this pull request valid.
+     * @param request
+     *        The request object that should be signaled when this operation completes.
+     *
+     * @throws IOException if an error occurs or the Provider is already closed.
+     */
+    void pull(JmsConsumerId consumerId, long timeout, AsyncResult request) throws IOException;
+
+    /**
+     * Gets the Provider specific Message factory for use in the JMS layer when a Session
+     * is asked to create a Message type.  The Provider should implement it's own internal
+     * JmsMessage core to optimize read / write and marshal operations for the connection.
+     *
+     * @returns a JmsMessageFactory instance for use by the JMS layer.
+     */
+    JmsMessageFactory getMessageFactory();
+
+    /**
+     * Sets the listener of events from this Provider instance.
+     *
+     * @param listener
+     *        The listener instance that will receive all event callbacks.
+     */
+    void setProviderListener(ProviderListener listener);
+
+    /**
+     * Gets the currently set ProdiverListener instance.
+     *
+     * @return the currently set ProviderListener instance.
+     */
+    ProviderListener getProviderListener();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
new file mode 100644
index 0000000..1e00b58
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderClosedException.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+
+public class ProviderClosedException extends IOException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ProviderClosedException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
new file mode 100644
index 0000000..4e3f90c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderConstants.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+/**
+ * Set of Provider specific constants used when interacting with the Provider API.
+ */
+public final class ProviderConstants {
+
+    private ProviderConstants() {}
+
+    public enum ACK_TYPE {
+        DELIVERED(0),
+        CONSUMED(1),
+        REDELIVERED(2),
+        POISONED(3),
+        EXPIRED(4);
+
+        private final int value;
+
+        private ACK_TYPE(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
new file mode 100644
index 0000000..c07518e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Interface that all JMS Providers must implement.
+ */
+public abstract class ProviderFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ProviderFactory.class);
+
+    private static final FactoryFinder<ProviderFactory> PROVIDER_FACTORY_FINDER =
+        new FactoryFinder<ProviderFactory>(ProviderFactory.class,
+            "META-INF/services/" + ProviderFactory.class.getPackage().getName().replace(".", "/") + "/");
+
+    /**
+     * Creates an instance of the given AsyncProvider and configures it using the
+     * properties set on the given remote broker URI.
+     *
+     * @param remoteURI
+     *        The URI used to connect to a remote Broker.
+     *
+     * @return a new AsyncProvider instance.
+     *
+     * @throws Exception if an error occurs while creating the Provider instance.
+     */
+    public abstract Provider createAsyncProvider(URI remoteURI) throws Exception;
+
+    /**
+     * @return the name of this JMS Provider, e.g. STOMP, AMQP, MQTT...etc
+     */
+    public abstract String getName();
+
+    /**
+     * Static create method that performs the ProviderFactory search and handles the
+     * configuration and setup.
+     *
+     * @param remoteURI
+     *        the URI of the remote peer.
+     *
+     * @return a new AsyncProvider instance that is ready for use.
+     *
+     * @throws Exception if an error occurs while creating the AsyncProvider instance.
+     */
+    public static Provider createAsync(URI remoteURI) throws Exception {
+        Provider result = null;
+
+        try {
+            ProviderFactory factory = findProviderFactory(remoteURI);
+            result = factory.createAsyncProvider(remoteURI);
+            result.connect();
+        } catch (Exception ex) {
+            LOG.error("Failed to create BlockingProvider instance for: {}", remoteURI.getScheme());
+            LOG.trace("Error: ", ex);
+            throw ex;
+        }
+
+        return result;
+    }
+
+    /**
+     * Searches for a ProviderFactory by using the scheme from the given URI.
+     *
+     * The search first checks the local cache of provider factories before moving on
+     * to search in the class path.
+     *
+     * @param location
+     *        The URI whose scheme will be used to locate a ProviderFactory.
+     *
+     * @return a provider factory instance matching the URI's scheme.
+     *
+     * @throws IOException if an error occurs while locating the factory.
+     */
+    public static ProviderFactory findProviderFactory(URI location) throws IOException {
+        String scheme = location.getScheme();
+        if (scheme == null) {
+            throw new IOException("No Provider scheme specified: [" + location + "]");
+        }
+
+        ProviderFactory factory = null;
+        try {
+            factory = PROVIDER_FACTORY_FINDER.newInstance(scheme);
+        } catch (Throwable e) {
+            throw new IOException("Provider scheme NOT recognized: [" + scheme + "]", e);
+        }
+
+        return factory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
new file mode 100644
index 0000000..7a52ad3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFuture.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.qpid.jms.util.IOExceptionSupport;
+
+/**
+ * Asynchronous Provider Future class.
+ */
+public class ProviderFuture implements AsyncResult {
+
+    protected final CountDownLatch latch = new CountDownLatch(1);
+    protected Throwable error;
+    protected final AsyncResult watcher;
+
+    public ProviderFuture() {
+        this.watcher = null;
+    }
+
+    public ProviderFuture(AsyncResult watcher) {
+        this.watcher = watcher;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return latch.getCount() == 0;
+    }
+
+    @Override
+    public void onFailure(Throwable result) {
+        error = result;
+        latch.countDown();
+        if (watcher != null) {
+            watcher.onFailure(error);
+        }
+    }
+
+    @Override
+    public void onSuccess() {
+        latch.countDown();
+        if (watcher != null) {
+            watcher.onSuccess();
+        }
+    }
+
+    /**
+     * Timed wait for a response to a Provider operation.
+     *
+     * @param amount
+     *        The amount of time to wait before abandoning the wait.
+     * @param unit
+     *        The unit to use for this wait period.
+     *
+     * @return the result of this operation or null if the wait timed out.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public void sync(long amount, TimeUnit unit) throws IOException {
+        try {
+            latch.await(amount, unit);
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+        failOnError();
+    }
+
+    /**
+     * Waits for a response to some Provider requested operation.
+     *
+     * @return the response from the Provider for this operation.
+     *
+     * @throws IOException if an error occurs while waiting for the response.
+     */
+    public void sync() throws IOException {
+        try {
+            latch.await();
+        } catch (InterruptedException e) {
+            Thread.interrupted();
+            throw IOExceptionSupport.create(e);
+        }
+        failOnError();
+    }
+
+    private void failOnError() throws IOException {
+        Throwable cause = error;
+        if (cause != null) {
+            throw IOExceptionSupport.create(cause);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
new file mode 100644
index 0000000..18938d5
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Events interface used to update the listener with changes in provider state.
+ */
+public interface ProviderListener {
+
+    /**
+     * Called when a new Message has arrived for a registered consumer.
+     *
+     * @param envelope
+     *        The dispatch object containing the message and delivery information.
+     */
+    void onMessage(JmsInboundMessageDispatch envelope);
+
+    /**
+     * Called from a fault tolerant Provider instance to signal that the underlying
+     * connection to the Broker has been lost.  The Provider will attempt to reconnect
+     * following this event unless closed.
+     *
+     * It is considered a programming error to allow any exceptions to be thrown from
+     * this notification method.
+     *
+     * @param remoteURI
+     *        The URI of the Broker whose connection was lost.
+     */
+    void onConnectionInterrupted(URI remoteURI);
+
+    /**
+     * Called to indicate that a connection to the Broker has been reestablished and
+     * that notified listener should start to recover it's state.  The provider will
+     * not transition to the recovered state until the listener notifies the provider
+     * that recovery is complete.
+     *
+     * @param provider
+     *        The new Provider instance that will become active after the state
+     *        has been recovered.
+     *
+     * @throws Exception if an error occurs during recovery attempt, this will fail
+     *         the Provider that's being used for recovery.
+     */
+    void onConnectionRecovery(Provider provider) throws Exception;
+
+    /**
+     * Called to indicate that a connection to the Broker has been reestablished and
+     * that all recovery operations have succeeded and the connection will now be
+     * transitioned to a recovered state.  This method gives the listener a chance
+     * so send any necessary post recovery commands such as consumer start or message
+     * pull for a zero prefetch consumer etc.
+     *
+     * @param provider
+     *        The new Provider instance that will become active after the state
+     *        has been recovered.
+     *
+     * @throws Exception if an error occurs during recovery attempt, this will fail
+     *         the Provider that's being used for recovery.
+     */
+    void onConnectionRecovered(Provider provider) throws Exception;
+
+    /**
+     * Called to signal that all recovery operations are now complete and the Provider
+     * is again in a normal connected state.
+     *
+     * It is considered a programming error to allow any exceptions to be thrown from
+     * this notification method.
+     *
+     * @param remoteURI
+     *        The URI of the Broker that the client has now connected to.
+     */
+    void onConnectionRestored(URI remoteURI);
+
+    /**
+     * Called to indicate that the underlying connection to the Broker has been lost and
+     * the Provider will not perform any reconnect.  Following this call the provider is
+     * in a failed state and further calls to it will throw an Exception.
+     *
+     * @param ex
+     *        The exception that indicates the cause of this Provider failure.
+     */
+    void onConnectionFailure(IOException ex);
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
new file mode 100644
index 0000000..1381dba
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider;
+
+import java.io.IOException;
+import java.net.URI;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+/**
+ * Allows one AsyncProvider instance to wrap around another and provide some additional
+ * features beyond the normal AsyncProvider interface.
+ *
+ * This wrapper is meant primarily for Providers that are adding some additional feature
+ * on-top of an existing provider such as a discovery based provider that only needs to
+ * pass along discovered remote peer information.
+ */
+public class ProviderWrapper<E extends Provider> implements Provider, ProviderListener {
+
+    protected final E next;
+    protected ProviderListener listener;
+
+    public ProviderWrapper(E next) {
+        this.next = next;
+        this.next.setProviderListener(this);
+    }
+
+    @Override
+    public void connect() throws IOException {
+        next.connect();
+    }
+
+    @Override
+    public void start() throws IOException, IllegalStateException {
+        if (this.listener == null) {
+            throw new IllegalStateException("Cannot start with null ProviderListener");
+        }
+        next.start();
+    }
+
+    @Override
+    public void close() {
+        next.close();
+    }
+
+    @Override
+    public URI getRemoteURI() {
+        return next.getRemoteURI();
+    }
+
+    @Override
+    public void create(JmsResource resource, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        next.create(resource, request);
+    }
+
+    @Override
+    public void start(JmsResource resource, AsyncResult request) throws IOException, JMSException {
+        next.start(resource, request);
+    }
+
+    @Override
+    public void destroy(JmsResource resourceId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        next.destroy(resourceId, request);
+    }
+
+    @Override
+    public void send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+        next.send(envelope, request);
+    }
+
+    @Override
+    public void acknowledge(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException {
+        next.acknowledge(sessionId, request);
+    }
+
+    @Override
+    public void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType, AsyncResult request) throws IOException, JMSException {
+        next.acknowledge(envelope, ackType, request);
+    }
+
+    @Override
+    public void commit(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        next.commit(sessionId, request);
+    }
+
+    @Override
+    public void rollback(JmsSessionId sessionId, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        next.rollback(sessionId, request);
+    }
+
+    @Override
+    public void recover(JmsSessionId sessionId, AsyncResult request) throws IOException, UnsupportedOperationException {
+        next.recover(sessionId, request);
+    }
+
+    @Override
+    public void unsubscribe(String subscription, AsyncResult request) throws IOException, JMSException, UnsupportedOperationException {
+        next.unsubscribe(subscription, request);
+    }
+
+    @Override
+    public void pull(JmsConsumerId consumerId, long timeout, AsyncResult request) throws IOException, UnsupportedOperationException {
+        next.pull(consumerId, timeout, request);
+    }
+
+    @Override
+    public JmsMessageFactory getMessageFactory() {
+        return next.getMessageFactory();
+    }
+
+    @Override
+    public void setProviderListener(ProviderListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public ProviderListener getProviderListener() {
+        return this.listener;
+    }
+
+    @Override
+    public void onMessage(JmsInboundMessageDispatch envelope) {
+        this.listener.onMessage(envelope);
+    }
+
+    @Override
+    public void onConnectionInterrupted(URI remoteURI) {
+        this.listener.onConnectionInterrupted(remoteURI);
+    }
+
+    @Override
+    public void onConnectionRecovery(Provider provider) throws Exception {
+        this.listener.onConnectionRecovery(provider);
+    }
+
+    @Override
+    public void onConnectionRecovered(Provider provider) throws Exception {
+        this.listener.onConnectionRecovered(provider);
+    }
+
+    @Override
+    public void onConnectionRestored(URI remoteURI) {
+        this.listener.onConnectionRestored(remoteURI);
+    }
+
+    @Override
+    public void onConnectionFailure(IOException ex) {
+        this.listener.onConnectionInterrupted(this.next.getRemoteURI());
+    }
+
+    /**
+     * @return the wrapped AsyncProvider.
+     */
+    public Provider getNext() {
+        return this.next;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
new file mode 100644
index 0000000..1175b34
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the concrete
+ * object don't have to reproduce it.  Provides hooks for the subclasses to initialize
+ * and shutdown.
+ */
+public abstract class AbstractAmqpResource<R extends JmsResource, E extends Endpoint> implements AmqpResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpResource.class);
+
+    protected AsyncResult openRequest;
+    protected AsyncResult closeRequest;
+
+    protected E endpoint;
+    protected R info;
+
+    /**
+     * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
+     * sets the Endpoint to null.
+     *
+     * @param info
+     *        The JmsResource instance that this AmqpResource is managing.
+     */
+    public AbstractAmqpResource(R info) {
+        this(info, null);
+    }
+
+    /**
+     * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
+     * sets the Endpoint to the given value.
+     *
+     * @param info
+     *        The JmsResource instance that this AmqpResource is managing.
+     * @param endpoint
+     *        The Proton Endpoint instance that this object maps to.
+     */
+    public AbstractAmqpResource(R info, E endpoint) {
+        this.info = info;
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void open(AsyncResult request) {
+        this.openRequest = request;
+        doOpen();
+        this.endpoint.setContext(this);
+        this.endpoint.open();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
+    }
+
+    @Override
+    public boolean isAwaitingOpen() {
+        return this.openRequest != null;
+    }
+
+    @Override
+    public void opened() {
+        if (this.openRequest != null) {
+            this.openRequest.onSuccess();
+            this.openRequest = null;
+        }
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // If already closed signal success or else the caller might never get notified.
+        if (endpoint.getLocalState() == EndpointState.CLOSED) {
+            request.onSuccess();
+            return;
+        }
+
+        this.closeRequest = request;
+        doClose();
+        this.endpoint.close();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.endpoint.getLocalState() == EndpointState.CLOSED;
+    }
+
+    @Override
+    public boolean isAwaitingClose() {
+        return this.closeRequest != null;
+    }
+
+    @Override
+    public void closed() {
+        if (this.closeRequest != null) {
+            this.closeRequest.onSuccess();
+            this.closeRequest = null;
+        }
+
+        this.endpoint.close();
+        this.endpoint.free();
+    }
+
+    @Override
+    public void failed() {
+        failed(new JMSException("Remote request failed."));
+    }
+
+    @Override
+    public void failed(Exception cause) {
+        if (openRequest != null) {
+            openRequest.onFailure(cause);
+            openRequest = null;
+        }
+
+        if (closeRequest != null) {
+            closeRequest.onFailure(cause);
+            closeRequest = null;
+        }
+    }
+
+    public E getEndpoint() {
+        return this.endpoint;
+    }
+
+    public R getJmsResource() {
+        return this.info;
+    }
+
+    public EndpointState getLocalState() {
+        if (endpoint == null) {
+            return EndpointState.UNINITIALIZED;
+        }
+        return this.endpoint.getLocalState();
+    }
+
+    public EndpointState getRemoteState() {
+        if (endpoint == null) {
+            return EndpointState.UNINITIALIZED;
+        }
+        return this.endpoint.getRemoteState();
+    }
+
+    @Override
+    public Exception getRemoteError() {
+        String message = getRemoteErrorMessage();
+        Exception remoteError = null;
+        Symbol error = endpoint.getRemoteCondition().getCondition();
+        if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+            remoteError = new JMSSecurityException(message);
+        } else {
+            remoteError = new JMSException(message);
+        }
+
+        return remoteError;
+    }
+
+    @Override
+    public String getRemoteErrorMessage() {
+        String message = "Received unkown error from remote peer";
+        if (endpoint.getRemoteCondition() != null) {
+            ErrorCondition error = endpoint.getRemoteCondition();
+            if (error.getDescription() != null && !error.getDescription().isEmpty()) {
+                message = error.getDescription();
+            }
+        }
+
+        return message;
+    }
+
+    @Override
+    public void processStateChange() throws IOException {
+        EndpointState remoteState = endpoint.getRemoteState();
+
+        if (remoteState == EndpointState.ACTIVE) {
+            if (isAwaitingOpen()) {
+                LOG.debug("{} is now open: ", this);
+                opened();
+            }
+
+            // Should not receive an ACTIVE event if not awaiting the open state.
+        } else if (remoteState == EndpointState.CLOSED) {
+            if (isAwaitingClose()) {
+                LOG.debug("{} is now closed: ", this);
+                closed();
+            } else if (isAwaitingOpen()) {
+                // Error on Open, create exception and signal failure.
+                LOG.warn("Open of {} failed: ", this);
+                Exception remoteError = this.getRemoteError();
+                failed(remoteError);
+            } else {
+                // TODO - Handle remote asynchronous close.
+                LOG.warn("{} was closed remotely.", this);
+            }
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates() throws IOException {
+    }
+
+    @Override
+    public void processFlowUpdates() throws IOException {
+    }
+
+    protected abstract void doOpen();
+
+    protected abstract void doClose();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
new file mode 100644
index 0000000..7da3143
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducer.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsProducerInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles the case of anonymous JMS MessageProducers.
+ *
+ * In order to simulate the anonymous producer we must create a sender for each message
+ * send attempt and close it following a successful send.
+ */
+public class AmqpAnonymousProducer extends AmqpProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducer.class);
+    private static final IdGenerator producerIdGenerator = new IdGenerator();
+
+    private final String producerIdKey = producerIdGenerator.generateId();
+    private long producerIdCount;
+
+    /**
+     * Creates the Anonymous Producer object.
+     *
+     * @param session
+     *        the session that owns this producer
+     * @param info
+     *        the JmsProducerInfo for this producer.
+     */
+    public AmqpAnonymousProducer(AmqpSession session, JmsProducerInfo info) {
+        super(session, info);
+    }
+
+    @Override
+    public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException {
+
+        LOG.trace("Started send chain for anonymous producer: {}", getProducerId());
+
+        // Create a new ProducerInfo for the short lived producer that's created to perform the
+        // send to the given AMQP target.
+        JmsProducerInfo info = new JmsProducerInfo(getNextProducerId());
+        info.setDestination(envelope.getDestination());
+
+        // We open a Fixed Producer instance with the target destination.  Once it opens
+        // it will trigger the open event which will in turn trigger the send event and
+        // when that succeeds it will trigger a close which completes the send chain.
+        AmqpFixedProducer producer = new AmqpFixedProducer(session, info);
+        producer.setPresettle(isPresettle());
+        AnonymousOpenRequest open = new AnonymousOpenRequest(request, producer, envelope);
+        producer.open(open);
+
+        return true;
+    }
+
+    @Override
+    public void open(AsyncResult request) {
+        // Trigger an immediate open, we don't talk to the Broker until
+        // a send occurs so we must not let the client block.
+        request.onSuccess();
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // Trigger an immediate close, the internal producers that are currently in a send
+        // will track their own state and close as the send completes or fails.
+        request.onSuccess();
+    }
+
+    @Override
+    protected void doOpen() {
+    }
+
+    @Override
+    protected void doClose() {
+    }
+
+    @Override
+    public boolean isAnonymous() {
+        return true;
+    }
+
+    @Override
+    public EndpointState getLocalState() {
+        return EndpointState.ACTIVE;
+    }
+
+    @Override
+    public EndpointState getRemoteState() {
+        return EndpointState.ACTIVE;
+    }
+
+    private JmsProducerId getNextProducerId() {
+        return new JmsProducerId(producerIdKey, -1, producerIdCount++);
+    }
+
+    private abstract class AnonymousRequest implements AsyncResult {
+
+        protected final AsyncResult sendResult;
+        protected final AmqpProducer producer;
+        protected final JmsOutboundMessageDispatch envelope;
+
+        public AnonymousRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) {
+            this.sendResult = sendResult;
+            this.producer = producer;
+            this.envelope = envelope;
+        }
+
+        @Override
+        public boolean isComplete() {
+            return sendResult.isComplete();
+        }
+
+        /**
+         * In all cases of the chain of events that make up the send for an anonymous
+         * producer a failure will trigger the original send request to fail.
+         */
+        @Override
+        public void onFailure(Throwable result) {
+            LOG.debug("Send failed during {} step in chain: {}", this.getClass().getName(), getProducerId());
+            sendResult.onFailure(result);
+        }
+    }
+
+    private final class AnonymousOpenRequest extends AnonymousRequest {
+
+        public AnonymousOpenRequest(AsyncResult sendResult, AmqpProducer producer, JmsOutboundMessageDispatch envelope) {
+            super(sendResult, producer, envelope);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Open phase of anonymous send complete: {} ", getProducerId());
+            AnonymousSendRequest send = new AnonymousSendRequest(this);
+            try {
+                producer.send(envelope, send);
+            } catch (Exception e) {
+                sendResult.onFailure(e);
+            }
+        }
+    }
+
+    private final class AnonymousSendRequest extends AnonymousRequest {
+
+        public AnonymousSendRequest(AnonymousOpenRequest open) {
+            super(open.sendResult, open.producer, open.envelope);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Send phase of anonymous send complete: {} ", getProducerId());
+            AnonymousCloseRequest close = new AnonymousCloseRequest(this);
+            producer.close(close);
+        }
+    }
+
+    private final class AnonymousCloseRequest extends AnonymousRequest {
+
+        public AnonymousCloseRequest(AnonymousSendRequest send) {
+            super(send.sendResult, send.producer, send.envelope);
+        }
+
+        @Override
+        public void onSuccess() {
+            LOG.trace("Close phase of anonymous send complete: {} ", getProducerId());
+            sendResult.onSuccess();
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org