You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/10/17 00:55:29 UTC

[1/3] git commit: Tests for stat exception thrown when connection is closed.

Repository: qpid-jms
Updated Branches:
  refs/heads/master 0b4311225 -> ec13a0d50


Tests for stat exception thrown when connection is closed.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/49db0e5e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/49db0e5e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/49db0e5e

Branch: refs/heads/master
Commit: 49db0e5e84f11274c55c7dfb7ee6f30bacbb1d4f
Parents: 0b43112
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 16 14:51:09 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 16 14:51:09 2014 -0400

----------------------------------------------------------------------
 .../org/apache/qpid/jms/JmsConnectionTest.java  | 68 ++++++++++++++++++++
 1 file changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/49db0e5e/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 841fd51..4f50a37 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.net.URI;
 
 import javax.jms.JMSException;
+import javax.jms.Session;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
@@ -134,4 +135,71 @@ public class JmsConnectionTest {
         connection.start();
         assertTrue(connection.isConnected());
     }
+
+    //---------- Test methods fail after connection closed -------------------//
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testSetClientIdAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.setClientID("test-Id");
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testStartCalledAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.start();
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testStopCalledAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.stop();
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testSetExceptionListenerAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.setExceptionListener(null);
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testFetExceptionListenerAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.getExceptionListener();
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testCreateConnectionConsumerForTopicAfterClose() throws JMSException {
+        JmsDestination destination = new JmsTopic("test");
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.createConnectionConsumer(destination, null, null, 0);
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testCreateConnectionConsumerForQueueAfterClose() throws JMSException {
+        JmsDestination destination = new JmsQueue("test");
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.createConnectionConsumer(destination, null, null, 0);
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testCreateTopicSessionAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(expected=javax.jms.IllegalStateException.class)
+    public void testCreateQueueSessionAfterClose() throws JMSException {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.close();
+        connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] git commit: Do some cleanup and renaming to make things more consistent.

Posted by ta...@apache.org.
Do some cleanup and renaming to make things more consistent.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ec13a0d5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ec13a0d5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ec13a0d5

Branch: refs/heads/master
Commit: ec13a0d50f121a71280f9f074a1a6e94e61bbfd1
Parents: 1ec8e2c
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 16 18:52:38 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 16 18:52:38 2014 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AbstractAmqpResource.java | 242 -------------------
 .../jms/provider/amqp/AmqpAbstractResource.java | 240 ++++++++++++++++++
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  14 +-
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |  40 +--
 .../jms/provider/amqp/AmqpFixedProducer.java    |   6 +-
 .../qpid/jms/provider/amqp/AmqpProducer.java    |   6 +-
 .../jms/provider/amqp/AmqpQueueBrowser.java     |   6 +-
 .../qpid/jms/provider/amqp/AmqpSession.java     |  18 +-
 .../provider/amqp/AmqpTemporaryDestination.java |  20 +-
 .../provider/amqp/AmqpTransactionContext.java   |   6 +-
 10 files changed, 298 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
deleted file mode 100644
index 1175b34..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.io.IOException;
-
-import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-
-import org.apache.qpid.jms.meta.JmsResource;
-import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract base for all AmqpResource implementations to extend.
- *
- * This abstract class wraps up the basic state management bits so that the concrete
- * object don't have to reproduce it.  Provides hooks for the subclasses to initialize
- * and shutdown.
- */
-public abstract class AbstractAmqpResource<R extends JmsResource, E extends Endpoint> implements AmqpResource {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpResource.class);
-
-    protected AsyncResult openRequest;
-    protected AsyncResult closeRequest;
-
-    protected E endpoint;
-    protected R info;
-
-    /**
-     * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
-     * sets the Endpoint to null.
-     *
-     * @param info
-     *        The JmsResource instance that this AmqpResource is managing.
-     */
-    public AbstractAmqpResource(R info) {
-        this(info, null);
-    }
-
-    /**
-     * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
-     * sets the Endpoint to the given value.
-     *
-     * @param info
-     *        The JmsResource instance that this AmqpResource is managing.
-     * @param endpoint
-     *        The Proton Endpoint instance that this object maps to.
-     */
-    public AbstractAmqpResource(R info, E endpoint) {
-        this.info = info;
-        this.endpoint = endpoint;
-    }
-
-    @Override
-    public void open(AsyncResult request) {
-        this.openRequest = request;
-        doOpen();
-        this.endpoint.setContext(this);
-        this.endpoint.open();
-    }
-
-    @Override
-    public boolean isOpen() {
-        return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
-    }
-
-    @Override
-    public boolean isAwaitingOpen() {
-        return this.openRequest != null;
-    }
-
-    @Override
-    public void opened() {
-        if (this.openRequest != null) {
-            this.openRequest.onSuccess();
-            this.openRequest = null;
-        }
-    }
-
-    @Override
-    public void close(AsyncResult request) {
-        // If already closed signal success or else the caller might never get notified.
-        if (endpoint.getLocalState() == EndpointState.CLOSED) {
-            request.onSuccess();
-            return;
-        }
-
-        this.closeRequest = request;
-        doClose();
-        this.endpoint.close();
-    }
-
-    @Override
-    public boolean isClosed() {
-        return this.endpoint.getLocalState() == EndpointState.CLOSED;
-    }
-
-    @Override
-    public boolean isAwaitingClose() {
-        return this.closeRequest != null;
-    }
-
-    @Override
-    public void closed() {
-        if (this.closeRequest != null) {
-            this.closeRequest.onSuccess();
-            this.closeRequest = null;
-        }
-
-        this.endpoint.close();
-        this.endpoint.free();
-    }
-
-    @Override
-    public void failed() {
-        failed(new JMSException("Remote request failed."));
-    }
-
-    @Override
-    public void failed(Exception cause) {
-        if (openRequest != null) {
-            openRequest.onFailure(cause);
-            openRequest = null;
-        }
-
-        if (closeRequest != null) {
-            closeRequest.onFailure(cause);
-            closeRequest = null;
-        }
-    }
-
-    public E getEndpoint() {
-        return this.endpoint;
-    }
-
-    public R getJmsResource() {
-        return this.info;
-    }
-
-    public EndpointState getLocalState() {
-        if (endpoint == null) {
-            return EndpointState.UNINITIALIZED;
-        }
-        return this.endpoint.getLocalState();
-    }
-
-    public EndpointState getRemoteState() {
-        if (endpoint == null) {
-            return EndpointState.UNINITIALIZED;
-        }
-        return this.endpoint.getRemoteState();
-    }
-
-    @Override
-    public Exception getRemoteError() {
-        String message = getRemoteErrorMessage();
-        Exception remoteError = null;
-        Symbol error = endpoint.getRemoteCondition().getCondition();
-        if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
-            remoteError = new JMSSecurityException(message);
-        } else {
-            remoteError = new JMSException(message);
-        }
-
-        return remoteError;
-    }
-
-    @Override
-    public String getRemoteErrorMessage() {
-        String message = "Received unkown error from remote peer";
-        if (endpoint.getRemoteCondition() != null) {
-            ErrorCondition error = endpoint.getRemoteCondition();
-            if (error.getDescription() != null && !error.getDescription().isEmpty()) {
-                message = error.getDescription();
-            }
-        }
-
-        return message;
-    }
-
-    @Override
-    public void processStateChange() throws IOException {
-        EndpointState remoteState = endpoint.getRemoteState();
-
-        if (remoteState == EndpointState.ACTIVE) {
-            if (isAwaitingOpen()) {
-                LOG.debug("{} is now open: ", this);
-                opened();
-            }
-
-            // Should not receive an ACTIVE event if not awaiting the open state.
-        } else if (remoteState == EndpointState.CLOSED) {
-            if (isAwaitingClose()) {
-                LOG.debug("{} is now closed: ", this);
-                closed();
-            } else if (isAwaitingOpen()) {
-                // Error on Open, create exception and signal failure.
-                LOG.warn("Open of {} failed: ", this);
-                Exception remoteError = this.getRemoteError();
-                failed(remoteError);
-            } else {
-                // TODO - Handle remote asynchronous close.
-                LOG.warn("{} was closed remotely.", this);
-            }
-        }
-    }
-
-    @Override
-    public void processDeliveryUpdates() throws IOException {
-    }
-
-    @Override
-    public void processFlowUpdates() throws IOException {
-    }
-
-    protected abstract void doOpen();
-
-    protected abstract void doClose();
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
new file mode 100644
index 0000000..83249c2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the concrete
+ * object don't have to reproduce it.  Provides hooks for the subclasses to initialize
+ * and shutdown.
+ */
+public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endpoint> implements AmqpResource {
+
+    private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
+
+    protected AsyncResult openRequest;
+    protected AsyncResult closeRequest;
+
+    protected E endpoint;
+    protected R resource;
+
+    /**
+     * Creates a new instance with the JmsResource provided, and sets the Endpoint to null.
+     *
+     * @param resource
+     *        The JmsResource instance that this AmqpResource is managing.
+     */
+    public AmqpAbstractResource(R resource) {
+        this(resource, null);
+    }
+
+    /**
+     * Creates a new instance with the JmsResource provided, and sets the Endpoint to the given value.
+     *
+     * @param resource
+     *        The JmsResource instance that this AmqpResource is managing.
+     * @param endpoint
+     *        The Proton Endpoint instance that this object maps to.
+     */
+    public AmqpAbstractResource(R resource, E endpoint) {
+        this.resource = resource;
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    public void open(AsyncResult request) {
+        this.openRequest = request;
+        doOpen();
+        this.endpoint.setContext(this);
+        this.endpoint.open();
+    }
+
+    @Override
+    public boolean isOpen() {
+        return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
+    }
+
+    @Override
+    public boolean isAwaitingOpen() {
+        return this.openRequest != null;
+    }
+
+    @Override
+    public void opened() {
+        if (this.openRequest != null) {
+            this.openRequest.onSuccess();
+            this.openRequest = null;
+        }
+    }
+
+    @Override
+    public void close(AsyncResult request) {
+        // If already closed signal success or else the caller might never get notified.
+        if (endpoint.getLocalState() == EndpointState.CLOSED) {
+            request.onSuccess();
+            return;
+        }
+
+        this.closeRequest = request;
+        doClose();
+        this.endpoint.close();
+    }
+
+    @Override
+    public boolean isClosed() {
+        return this.endpoint.getLocalState() == EndpointState.CLOSED;
+    }
+
+    @Override
+    public boolean isAwaitingClose() {
+        return this.closeRequest != null;
+    }
+
+    @Override
+    public void closed() {
+        if (this.closeRequest != null) {
+            this.closeRequest.onSuccess();
+            this.closeRequest = null;
+        }
+
+        this.endpoint.close();
+        this.endpoint.free();
+    }
+
+    @Override
+    public void failed() {
+        failed(new JMSException("Remote request failed."));
+    }
+
+    @Override
+    public void failed(Exception cause) {
+        if (openRequest != null) {
+            openRequest.onFailure(cause);
+            openRequest = null;
+        }
+
+        if (closeRequest != null) {
+            closeRequest.onFailure(cause);
+            closeRequest = null;
+        }
+    }
+
+    public E getEndpoint() {
+        return this.endpoint;
+    }
+
+    public R getJmsResource() {
+        return this.resource;
+    }
+
+    public EndpointState getLocalState() {
+        if (endpoint == null) {
+            return EndpointState.UNINITIALIZED;
+        }
+        return this.endpoint.getLocalState();
+    }
+
+    public EndpointState getRemoteState() {
+        if (endpoint == null) {
+            return EndpointState.UNINITIALIZED;
+        }
+        return this.endpoint.getRemoteState();
+    }
+
+    @Override
+    public Exception getRemoteError() {
+        String message = getRemoteErrorMessage();
+        Exception remoteError = null;
+        Symbol error = endpoint.getRemoteCondition().getCondition();
+        if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+            remoteError = new JMSSecurityException(message);
+        } else {
+            remoteError = new JMSException(message);
+        }
+
+        return remoteError;
+    }
+
+    @Override
+    public String getRemoteErrorMessage() {
+        String message = "Received unkown error from remote peer";
+        if (endpoint.getRemoteCondition() != null) {
+            ErrorCondition error = endpoint.getRemoteCondition();
+            if (error.getDescription() != null && !error.getDescription().isEmpty()) {
+                message = error.getDescription();
+            }
+        }
+
+        return message;
+    }
+
+    @Override
+    public void processStateChange() throws IOException {
+        EndpointState remoteState = endpoint.getRemoteState();
+
+        if (remoteState == EndpointState.ACTIVE) {
+            if (isAwaitingOpen()) {
+                LOG.debug("{} is now open: ", this);
+                opened();
+            }
+
+            // Should not receive an ACTIVE event if not awaiting the open state.
+        } else if (remoteState == EndpointState.CLOSED) {
+            if (isAwaitingClose()) {
+                LOG.debug("{} is now closed: ", this);
+                closed();
+            } else if (isAwaitingOpen()) {
+                // Error on Open, create exception and signal failure.
+                LOG.warn("Open of {} failed: ", this);
+                Exception remoteError = this.getRemoteError();
+                failed(remoteError);
+            } else {
+                // TODO - Handle remote asynchronous close.
+                LOG.warn("{} was closed remotely.", this);
+            }
+        }
+    }
+
+    @Override
+    public void processDeliveryUpdates() throws IOException {
+    }
+
+    @Override
+    public void processFlowUpdates() throws IOException {
+    }
+
+    protected abstract void doOpen();
+
+    protected abstract void doClose();
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index f3d09b0..d9a52da 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -36,7 +36,7 @@ import org.apache.qpid.proton.engine.Sasl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Connection> {
+public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Connection> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
 
@@ -71,7 +71,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
             this.authenticator = new AmqpSaslAuthenticator(sasl, info);
         }
 
-        this.info.getConnectionId().setProviderHint(this);
+        this.resource.getConnectionId().setProviderHint(this);
 
         this.queuePrefix = info.getQueuePrefix();
         this.topicPrefix = info.getTopicPrefix();
@@ -80,7 +80,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
 
         // Create a Session for this connection that is used for Temporary Destinations
         // and perhaps later on management and advisory monitoring.
-        JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1);
+        JmsSessionInfo sessionInfo = new JmsSessionInfo(this.resource, -1);
         sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
 
         this.connectionSession = new AmqpConnectionSession(this, sessionInfo);
@@ -88,7 +88,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
 
     @Override
     protected void doOpen() {
-        this.endpoint.setContainer(info.getClientId());
+        this.endpoint.setContainer(resource.getClientId());
         this.endpoint.setHostname(remoteURI.getHost());
     }
 
@@ -199,7 +199,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
     }
 
     public JmsConnectionInfo getConnectionInfo() {
-        return this.info;
+        return this.resource;
     }
 
     public Connection getProtonConnection() {
@@ -211,11 +211,11 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
     }
 
     public String getUsername() {
-        return this.info.getUsername();
+        return this.resource.getUsername();
     }
 
     public String getPassword() {
-        return this.info.getPassword();
+        return this.resource.getPassword();
     }
 
     public AmqpProvider getProvider() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 0af029b..3caec3c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
 /**
  * AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
  */
-public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver> {
+public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
 
@@ -81,20 +81,20 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         this.session = session;
 
         // Add a shortcut back to this Consumer for quicker lookups
-        this.info.getConsumerId().setProviderHint(this);
+        this.resource.getConsumerId().setProviderHint(this);
     }
 
     /**
      * Starts the consumer by setting the link credit to the given prefetch value.
      */
     public void start(AsyncResult request) {
-        this.endpoint.flow(info.getPrefetchSize());
+        this.endpoint.flow(resource.getPrefetchSize());
         request.onSuccess();
     }
 
     @Override
     protected void doOpen() {
-        JmsDestination destination  = info.getDestination();
+        JmsDestination destination  = resource.getDestination();
         String subscription = session.getQualifiedName(destination);
 
         Source source = new Source();
@@ -104,10 +104,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         configureSource(source);
 
         String receiverName = getConsumerId() + ":" + subscription;
-        if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) {
+        if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) {
             // In the case of Durable Topic Subscriptions the client must use the same
             // receiver name which is derived from the subscription name property.
-            receiverName = info.getSubscriptionName();
+            receiverName = resource.getSubscriptionName();
         }
 
         endpoint = session.getProtonSession().receiver(receiverName);
@@ -136,7 +136,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
     protected void configureSource(Source source) {
         Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
 
-        if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) {
+        if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) {
             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
             source.setDurable(TerminusDurability.UNSETTLED_STATE);
             source.setDistributionMode(COPY);
@@ -145,12 +145,12 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
             source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
         }
 
-        if (info.isNoLocal()) {
+        if (resource.isNoLocal()) {
             filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
         }
 
-        if (info.getSelector() != null && !info.getSelector().trim().equals("")) {
-            filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(info.getSelector()));
+        if (resource.getSelector() != null && !resource.getSelector().trim().equals("")) {
+            filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resource.getSelector()));
         }
 
         if (!filters.isEmpty()) {
@@ -168,7 +168,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
      * would already have been given for these so we just need to settle them.
      */
     public void acknowledge() {
-        LOG.trace("Session Acknowledge for consumer: {}", info.getConsumerId());
+        LOG.trace("Session Acknowledge for consumer: {}", resource.getConsumerId());
         for (Delivery delivery : delivered.values()) {
             delivery.disposition(Accepted.getInstance());
             delivery.settle();
@@ -246,13 +246,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
      * then we open the window back up to full prefetch size.
      */
     private void sendFlowIfNeeded() {
-        if (info.getPrefetchSize() == 0) {
+        if (resource.getPrefetchSize() == 0) {
             return;
         }
 
         int currentCredit = endpoint.getCredit();
-        if (currentCredit <= info.getPrefetchSize() * 0.2) {
-            endpoint.flow(info.getPrefetchSize() - currentCredit);
+        if (currentCredit <= resource.getPrefetchSize() * 0.2) {
+            endpoint.flow(resource.getPrefetchSize() - currentCredit);
         }
     }
 
@@ -262,7 +262,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
      * @throws Exception if an error occurs while performing the recover.
      */
     public void recover() throws Exception {
-        LOG.debug("Session Recover for consumer: {}", info.getConsumerId());
+        LOG.debug("Session Recover for consumer: {}", resource.getConsumerId());
         for (Delivery delivery : delivered.values()) {
             // TODO - increment redelivery counter and apply connection redelivery policy
             //        to those messages that are past max redlivery.
@@ -281,7 +281,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
      * @param timeout
      */
     public void pull(long timeout) {
-        if (info.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
+        if (resource.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
             // expand the credit window by one.
             endpoint.flow(1);
         }
@@ -329,7 +329,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
 
         JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
         envelope.setMessage(message);
-        envelope.setConsumerId(info.getConsumerId());
+        envelope.setConsumerId(resource.getConsumerId());
         // Store link to delivery in the hint for use in acknowledge requests.
         envelope.setProviderHint(incoming);
         envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
@@ -357,11 +357,11 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
     }
 
     public JmsConsumerId getConsumerId() {
-        return this.info.getConsumerId();
+        return this.resource.getConsumerId();
     }
 
     public JmsDestination getDestination() {
-        return this.info.getDestination();
+        return this.resource.getDestination();
     }
 
     public Receiver getProtonReceiver() {
@@ -382,7 +382,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
 
     @Override
     public String toString() {
-        return "AmqpConsumer { " + this.info.getConsumerId() + " }";
+        return "AmqpConsumer { " + this.resource.getConsumerId() + " }";
     }
 
     protected void deliveryFailed(Delivery incoming, boolean expandCredit) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 3afc492..56ed79f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -235,8 +235,8 @@ public class AmqpFixedProducer extends AmqpProducer {
     protected void doOpen() {
         String targetAddress;
 
-        if (info.getDestination() != null) {
-            JmsDestination destination = info.getDestination();
+        if (resource.getDestination() != null) {
+            JmsDestination destination = resource.getDestination();
             targetAddress = session.getQualifiedName(destination);
         } else {
             targetAddress = connection.getProperties().getAnonymousRelayName();
@@ -274,7 +274,7 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     @Override
     public boolean isAnonymous() {
-        return this.info.getDestination() == null;
+        return this.resource.getDestination() == null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index 81ebf9e..473f184 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -29,7 +29,7 @@ import org.apache.qpid.proton.engine.Sender;
 /**
  * Base class for Producer instances.
  */
-public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo, Sender> {
+public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo, Sender> {
 
     protected final AmqpSession session;
     protected final AmqpConnection connection;
@@ -41,7 +41,7 @@ public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo,
         this.connection = session.getConnection();
 
         // Add a shortcut back to this Producer for quicker lookup.
-        this.info.getProducerId().setProviderHint(this);
+        this.resource.getProducerId().setProviderHint(this);
     }
 
     /**
@@ -69,7 +69,7 @@ public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo,
      * @return the JmsProducerId that was assigned to this AmqpProducer.
      */
     public JmsProducerId getProducerId() {
-        return this.info.getProducerId();
+        return this.resource.getProducerId();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index 10e165b..6434bbd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -46,7 +46,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
      */
     @Override
     public void start(AsyncResult request) {
-        this.endpoint.flow(info.getPrefetchSize());
+        this.endpoint.flow(resource.getPrefetchSize());
         request.onSuccess();
     }
 
@@ -66,7 +66,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
     public void pull(long timeout) {
         if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) {
             LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
-            this.endpoint.drain(info.getPrefetchSize());
+            this.endpoint.drain(resource.getPrefetchSize());
         } else {
             endpoint.setDrain(false);
         }
@@ -113,7 +113,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
 
     @Override
     protected void configureSource(Source source) {
-        if (info.isBrowser()) {
+        if (resource.isBrowser()) {
             source.setDistributionMode(COPY);
         }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 10519f3..a671ffc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -34,7 +34,7 @@ import org.apache.qpid.proton.engine.Session;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
+public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
 
@@ -48,8 +48,8 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
         super(info, connection.getProtonConnection().session());
         this.connection = connection;
 
-        this.info.getSessionId().setProviderHint(this);
-        if (this.info.isTransacted()) {
+        this.resource.getSessionId().setProviderHint(this);
+        if (this.resource.isTransacted()) {
             txContext = new AmqpTransactionContext(this);
         } else {
             txContext = null;
@@ -165,7 +165,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
      * @throws Exception if an error occurs while performing the operation.
      */
     public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
-        if (!this.info.isTransacted()) {
+        if (!this.resource.isTransacted()) {
             throw new IllegalStateException("Non-transacted Session cannot start a TX.");
         }
 
@@ -181,7 +181,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
      * @throws Exception if an error occurs while performing the operation.
      */
     public void commit(AsyncResult request) throws Exception {
-        if (!this.info.isTransacted()) {
+        if (!this.resource.isTransacted()) {
             throw new IllegalStateException("Non-transacted Session cannot start a TX.");
         }
 
@@ -197,7 +197,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
      * @throws Exception if an error occurs while performing the operation.
      */
     public void rollback(AsyncResult request) throws Exception {
-        if (!this.info.isTransacted()) {
+        if (!this.resource.isTransacted()) {
             throw new IllegalStateException("Non-transacted Session cannot start a TX.");
         }
 
@@ -256,7 +256,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
     }
 
     public JmsSessionId getSessionId() {
-        return this.info.getSessionId();
+        return this.resource.getSessionId();
     }
 
     public Session getProtonSession() {
@@ -264,11 +264,11 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
     }
 
     boolean isTransacted() {
-        return this.info.isTransacted();
+        return this.resource.isTransacted();
     }
 
     boolean isAsyncAck() {
-        return this.info.isSendAcksAsync() || isTransacted();
+        return this.resource.isSendAcksAsync() || isTransacted();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 4107e3e..1376874 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
  * the broker in the case where the user does not have authorization to access temporary
  * destinations.
  */
-public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestination, Sender> {
+public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestination, Sender> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestination.class);
 
@@ -58,10 +58,10 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
 
         EndpointState remoteState = endpoint.getRemoteState();
         if (remoteState == EndpointState.ACTIVE) {
-            LOG.trace("Temporary Destination: {} is now open", this.info);
+            LOG.trace("Temporary Destination: {} is now open", this.resource);
             opened();
         } else if (remoteState == EndpointState.CLOSED) {
-            LOG.trace("Temporary Destination: {} is now closed", this.info);
+            LOG.trace("Temporary Destination: {} is now closed", this.resource);
             closed();
         }
     }
@@ -70,12 +70,12 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
     public void opened() {
 
         // Once our producer is opened we can read the updated name from the target address.
-        String oldDestinationName = info.getName();
+        String oldDestinationName = resource.getName();
         String destinationName = this.endpoint.getRemoteTarget().getAddress();
 
-        this.info.setName(destinationName);
+        this.resource.setName(destinationName);
 
-        LOG.trace("Updated temp destination to: {} from: {}", info, oldDestinationName);
+        LOG.trace("Updated temp destination to: {} from: {}", resource, oldDestinationName);
 
         super.opened();
     }
@@ -83,8 +83,8 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
     @Override
     protected void doOpen() {
 
-        String sourceAddress = info.getName();
-        if (info.isQueue()) {
+        String sourceAddress = resource.getName();
+        if (resource.isQueue()) {
             sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
         } else {
             // TODO - AMQ doesn't support temp topics so we make everything a temp queue for now
@@ -123,11 +123,11 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
     }
 
     public JmsDestination getJmsDestination() {
-        return this.info;
+        return this.resource;
     }
 
     @Override
     public String toString() {
-        return getClass().getSimpleName() + " { " + info + "}";
+        return getClass().getSimpleName() + " { " + resource + "}";
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 1cc6fd1..6bfbaa5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
  * The Transaction will carry a JmsTransactionId while the Transaction is open, once a
  * transaction has been committed or rolled back the Transaction Id is cleared.
  */
-public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, Sender> {
+public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Sender> {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
 
@@ -72,7 +72,7 @@ public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo,
      *
      * @param session
      *        The session that owns this transaction
-     * @param info
+     * @param resource
      *        The JmsTransactionInfo that defines this Transaction.
      */
     public AmqpTransactionContext(AmqpSession session) {
@@ -134,7 +134,7 @@ public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo,
         coordinator.setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
         Source source = new Source();
 
-        String coordinatorName = info.getSessionId().toString();
+        String coordinatorName = resource.getSessionId().toString();
         endpoint = session.getProtonSession().sender(coordinatorName);
         endpoint.setSource(source);
         endpoint.setTarget(coordinator);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] git commit: Create a specialized session object used soley by the connection to provide a place for things like durable unsubscribe, etc.

Posted by ta...@apache.org.
Create a specialized session object used soley by the connection to
provide a place for things like durable unsubscribe, etc. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1ec8e2ca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1ec8e2ca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1ec8e2ca

Branch: refs/heads/master
Commit: 1ec8e2ca8521819b982b1ba853859a12b32014a5
Parents: 49db0e5
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 16 16:51:25 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 16 16:51:25 2014 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConnection.java  |  8 ++-
 .../provider/amqp/AmqpConnectionSession.java    | 51 ++++++++++++++++++++
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  2 +-
 3 files changed, 58 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1ec8e2ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 97118d7..f3d09b0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -48,7 +48,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
     private final AmqpProvider provider;
     private boolean connected;
     private AmqpSaslAuthenticator authenticator;
-    private final AmqpSession connectionSession;
+    private final AmqpConnectionSession connectionSession;
     private AmqpConnectionProperties properties;
 
     private String queuePrefix;
@@ -83,7 +83,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
         JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1);
         sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
 
-        this.connectionSession = new AmqpSession(this, sessionInfo);
+        this.connectionSession = new AmqpConnectionSession(this, sessionInfo);
     }
 
     @Override
@@ -106,6 +106,10 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
         return temporary;
     }
 
+    public void unsubscribe(String subscriptionName, AsyncResult request) {
+        connectionSession.unsubscribe(subscriptionName, request);
+    }
+
     /**
      * Called on receiving an event from Proton indicating a state change on the remote
      * side of the Connection.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1ec8e2ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
new file mode 100644
index 0000000..341893e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+
+/**
+ * Subclass of the standard session object used solely by AmqpConnection to
+ * aid in managing connection resources that require a persistent session.
+ */
+public class AmqpConnectionSession extends AmqpSession {
+
+    /**
+     * Create a new instance of a Connection owned Session object.
+     *
+     * @param connection
+     *        the connection that owns this session.
+     * @param info
+     *        the <code>JmsSessionInfo</code> for the Session to create.
+     */
+    public AmqpConnectionSession(AmqpConnection connection, JmsSessionInfo info) {
+        super(connection, info);
+    }
+
+    /**
+     * Used to remove an existing durable topic subscription from the remote broker.
+     *
+     * @param subscriptionName
+     *        the subscription name that is to be removed.
+     * @param request
+     *        the request that awaits the completion of this action.
+     */
+    public void unsubscribe(String subscriptionName, AsyncResult request) {
+        request.onSuccess();
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1ec8e2ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3928b03..e5b5f7a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -524,8 +524,8 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
             public void run() {
                 try {
                     checkClosed();
+                    connection.unsubscribe(subscription, request);
                     pumpToProtonTransport();
-                    request.onSuccess();
                 } catch (Exception error) {
                     request.onFailure(error);
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org