You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/10/17 00:55:29 UTC
[1/3] git commit: Tests for stat exception thrown when connection is
closed.
Repository: qpid-jms
Updated Branches:
refs/heads/master 0b4311225 -> ec13a0d50
Tests for stat exception thrown when connection is closed.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/49db0e5e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/49db0e5e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/49db0e5e
Branch: refs/heads/master
Commit: 49db0e5e84f11274c55c7dfb7ee6f30bacbb1d4f
Parents: 0b43112
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 16 14:51:09 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 16 14:51:09 2014 -0400
----------------------------------------------------------------------
.../org/apache/qpid/jms/JmsConnectionTest.java | 68 ++++++++++++++++++++
1 file changed, 68 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/49db0e5e/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 841fd51..4f50a37 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.URI;
import javax.jms.JMSException;
+import javax.jms.Session;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
@@ -134,4 +135,71 @@ public class JmsConnectionTest {
connection.start();
assertTrue(connection.isConnected());
}
+
+ //---------- Test methods fail after connection closed -------------------//
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testSetClientIdAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.setClientID("test-Id");
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testStartCalledAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.start();
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testStopCalledAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.stop();
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testSetExceptionListenerAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.setExceptionListener(null);
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testFetExceptionListenerAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.getExceptionListener();
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testCreateConnectionConsumerForTopicAfterClose() throws JMSException {
+ JmsDestination destination = new JmsTopic("test");
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.createConnectionConsumer(destination, null, null, 0);
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testCreateConnectionConsumerForQueueAfterClose() throws JMSException {
+ JmsDestination destination = new JmsQueue("test");
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.createConnectionConsumer(destination, null, null, 0);
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testCreateTopicSessionAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Test(expected=javax.jms.IllegalStateException.class)
+ public void testCreateQueueSessionAfterClose() throws JMSException {
+ JmsConnection connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+ connection.close();
+ connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] git commit: Do some cleanup and renaming to make things more
consistent.
Posted by ta...@apache.org.
Do some cleanup and renaming to make things more consistent.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/ec13a0d5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/ec13a0d5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/ec13a0d5
Branch: refs/heads/master
Commit: ec13a0d50f121a71280f9f074a1a6e94e61bbfd1
Parents: 1ec8e2c
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 16 18:52:38 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 16 18:52:38 2014 -0400
----------------------------------------------------------------------
.../jms/provider/amqp/AbstractAmqpResource.java | 242 -------------------
.../jms/provider/amqp/AmqpAbstractResource.java | 240 ++++++++++++++++++
.../qpid/jms/provider/amqp/AmqpConnection.java | 14 +-
.../qpid/jms/provider/amqp/AmqpConsumer.java | 40 +--
.../jms/provider/amqp/AmqpFixedProducer.java | 6 +-
.../qpid/jms/provider/amqp/AmqpProducer.java | 6 +-
.../jms/provider/amqp/AmqpQueueBrowser.java | 6 +-
.../qpid/jms/provider/amqp/AmqpSession.java | 18 +-
.../provider/amqp/AmqpTemporaryDestination.java | 20 +-
.../provider/amqp/AmqpTransactionContext.java | 6 +-
10 files changed, 298 insertions(+), 300 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
deleted file mode 100644
index 1175b34..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AbstractAmqpResource.java
+++ /dev/null
@@ -1,242 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider.amqp;
-
-import java.io.IOException;
-
-import javax.jms.JMSException;
-import javax.jms.JMSSecurityException;
-
-import org.apache.qpid.jms.meta.JmsResource;
-import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.proton.amqp.Symbol;
-import org.apache.qpid.proton.amqp.transport.AmqpError;
-import org.apache.qpid.proton.amqp.transport.ErrorCondition;
-import org.apache.qpid.proton.engine.Endpoint;
-import org.apache.qpid.proton.engine.EndpointState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Abstract base for all AmqpResource implementations to extend.
- *
- * This abstract class wraps up the basic state management bits so that the concrete
- * object don't have to reproduce it. Provides hooks for the subclasses to initialize
- * and shutdown.
- */
-public abstract class AbstractAmqpResource<R extends JmsResource, E extends Endpoint> implements AmqpResource {
-
- private static final Logger LOG = LoggerFactory.getLogger(AbstractAmqpResource.class);
-
- protected AsyncResult openRequest;
- protected AsyncResult closeRequest;
-
- protected E endpoint;
- protected R info;
-
- /**
- * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
- * sets the Endpoint to null.
- *
- * @param info
- * The JmsResource instance that this AmqpResource is managing.
- */
- public AbstractAmqpResource(R info) {
- this(info, null);
- }
-
- /**
- * Creates a new AbstractAmqpResource instance with the JmsResource provided, and
- * sets the Endpoint to the given value.
- *
- * @param info
- * The JmsResource instance that this AmqpResource is managing.
- * @param endpoint
- * The Proton Endpoint instance that this object maps to.
- */
- public AbstractAmqpResource(R info, E endpoint) {
- this.info = info;
- this.endpoint = endpoint;
- }
-
- @Override
- public void open(AsyncResult request) {
- this.openRequest = request;
- doOpen();
- this.endpoint.setContext(this);
- this.endpoint.open();
- }
-
- @Override
- public boolean isOpen() {
- return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
- }
-
- @Override
- public boolean isAwaitingOpen() {
- return this.openRequest != null;
- }
-
- @Override
- public void opened() {
- if (this.openRequest != null) {
- this.openRequest.onSuccess();
- this.openRequest = null;
- }
- }
-
- @Override
- public void close(AsyncResult request) {
- // If already closed signal success or else the caller might never get notified.
- if (endpoint.getLocalState() == EndpointState.CLOSED) {
- request.onSuccess();
- return;
- }
-
- this.closeRequest = request;
- doClose();
- this.endpoint.close();
- }
-
- @Override
- public boolean isClosed() {
- return this.endpoint.getLocalState() == EndpointState.CLOSED;
- }
-
- @Override
- public boolean isAwaitingClose() {
- return this.closeRequest != null;
- }
-
- @Override
- public void closed() {
- if (this.closeRequest != null) {
- this.closeRequest.onSuccess();
- this.closeRequest = null;
- }
-
- this.endpoint.close();
- this.endpoint.free();
- }
-
- @Override
- public void failed() {
- failed(new JMSException("Remote request failed."));
- }
-
- @Override
- public void failed(Exception cause) {
- if (openRequest != null) {
- openRequest.onFailure(cause);
- openRequest = null;
- }
-
- if (closeRequest != null) {
- closeRequest.onFailure(cause);
- closeRequest = null;
- }
- }
-
- public E getEndpoint() {
- return this.endpoint;
- }
-
- public R getJmsResource() {
- return this.info;
- }
-
- public EndpointState getLocalState() {
- if (endpoint == null) {
- return EndpointState.UNINITIALIZED;
- }
- return this.endpoint.getLocalState();
- }
-
- public EndpointState getRemoteState() {
- if (endpoint == null) {
- return EndpointState.UNINITIALIZED;
- }
- return this.endpoint.getRemoteState();
- }
-
- @Override
- public Exception getRemoteError() {
- String message = getRemoteErrorMessage();
- Exception remoteError = null;
- Symbol error = endpoint.getRemoteCondition().getCondition();
- if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
- remoteError = new JMSSecurityException(message);
- } else {
- remoteError = new JMSException(message);
- }
-
- return remoteError;
- }
-
- @Override
- public String getRemoteErrorMessage() {
- String message = "Received unkown error from remote peer";
- if (endpoint.getRemoteCondition() != null) {
- ErrorCondition error = endpoint.getRemoteCondition();
- if (error.getDescription() != null && !error.getDescription().isEmpty()) {
- message = error.getDescription();
- }
- }
-
- return message;
- }
-
- @Override
- public void processStateChange() throws IOException {
- EndpointState remoteState = endpoint.getRemoteState();
-
- if (remoteState == EndpointState.ACTIVE) {
- if (isAwaitingOpen()) {
- LOG.debug("{} is now open: ", this);
- opened();
- }
-
- // Should not receive an ACTIVE event if not awaiting the open state.
- } else if (remoteState == EndpointState.CLOSED) {
- if (isAwaitingClose()) {
- LOG.debug("{} is now closed: ", this);
- closed();
- } else if (isAwaitingOpen()) {
- // Error on Open, create exception and signal failure.
- LOG.warn("Open of {} failed: ", this);
- Exception remoteError = this.getRemoteError();
- failed(remoteError);
- } else {
- // TODO - Handle remote asynchronous close.
- LOG.warn("{} was closed remotely.", this);
- }
- }
- }
-
- @Override
- public void processDeliveryUpdates() throws IOException {
- }
-
- @Override
- public void processFlowUpdates() throws IOException {
- }
-
- protected abstract void doOpen();
-
- protected abstract void doClose();
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
new file mode 100644
index 0000000..83249c2
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -0,0 +1,240 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+import javax.jms.JMSSecurityException;
+
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.transport.AmqpError;
+import org.apache.qpid.proton.amqp.transport.ErrorCondition;
+import org.apache.qpid.proton.engine.Endpoint;
+import org.apache.qpid.proton.engine.EndpointState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract base for all AmqpResource implementations to extend.
+ *
+ * This abstract class wraps up the basic state management bits so that the concrete
+ * object don't have to reproduce it. Provides hooks for the subclasses to initialize
+ * and shutdown.
+ */
+public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endpoint> implements AmqpResource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpAbstractResource.class);
+
+ protected AsyncResult openRequest;
+ protected AsyncResult closeRequest;
+
+ protected E endpoint;
+ protected R resource;
+
+ /**
+ * Creates a new instance with the JmsResource provided, and sets the Endpoint to null.
+ *
+ * @param resource
+ * The JmsResource instance that this AmqpResource is managing.
+ */
+ public AmqpAbstractResource(R resource) {
+ this(resource, null);
+ }
+
+ /**
+ * Creates a new instance with the JmsResource provided, and sets the Endpoint to the given value.
+ *
+ * @param resource
+ * The JmsResource instance that this AmqpResource is managing.
+ * @param endpoint
+ * The Proton Endpoint instance that this object maps to.
+ */
+ public AmqpAbstractResource(R resource, E endpoint) {
+ this.resource = resource;
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void open(AsyncResult request) {
+ this.openRequest = request;
+ doOpen();
+ this.endpoint.setContext(this);
+ this.endpoint.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return this.endpoint.getRemoteState() == EndpointState.ACTIVE;
+ }
+
+ @Override
+ public boolean isAwaitingOpen() {
+ return this.openRequest != null;
+ }
+
+ @Override
+ public void opened() {
+ if (this.openRequest != null) {
+ this.openRequest.onSuccess();
+ this.openRequest = null;
+ }
+ }
+
+ @Override
+ public void close(AsyncResult request) {
+ // If already closed signal success or else the caller might never get notified.
+ if (endpoint.getLocalState() == EndpointState.CLOSED) {
+ request.onSuccess();
+ return;
+ }
+
+ this.closeRequest = request;
+ doClose();
+ this.endpoint.close();
+ }
+
+ @Override
+ public boolean isClosed() {
+ return this.endpoint.getLocalState() == EndpointState.CLOSED;
+ }
+
+ @Override
+ public boolean isAwaitingClose() {
+ return this.closeRequest != null;
+ }
+
+ @Override
+ public void closed() {
+ if (this.closeRequest != null) {
+ this.closeRequest.onSuccess();
+ this.closeRequest = null;
+ }
+
+ this.endpoint.close();
+ this.endpoint.free();
+ }
+
+ @Override
+ public void failed() {
+ failed(new JMSException("Remote request failed."));
+ }
+
+ @Override
+ public void failed(Exception cause) {
+ if (openRequest != null) {
+ openRequest.onFailure(cause);
+ openRequest = null;
+ }
+
+ if (closeRequest != null) {
+ closeRequest.onFailure(cause);
+ closeRequest = null;
+ }
+ }
+
+ public E getEndpoint() {
+ return this.endpoint;
+ }
+
+ public R getJmsResource() {
+ return this.resource;
+ }
+
+ public EndpointState getLocalState() {
+ if (endpoint == null) {
+ return EndpointState.UNINITIALIZED;
+ }
+ return this.endpoint.getLocalState();
+ }
+
+ public EndpointState getRemoteState() {
+ if (endpoint == null) {
+ return EndpointState.UNINITIALIZED;
+ }
+ return this.endpoint.getRemoteState();
+ }
+
+ @Override
+ public Exception getRemoteError() {
+ String message = getRemoteErrorMessage();
+ Exception remoteError = null;
+ Symbol error = endpoint.getRemoteCondition().getCondition();
+ if (error.equals(AmqpError.UNAUTHORIZED_ACCESS)) {
+ remoteError = new JMSSecurityException(message);
+ } else {
+ remoteError = new JMSException(message);
+ }
+
+ return remoteError;
+ }
+
+ @Override
+ public String getRemoteErrorMessage() {
+ String message = "Received unkown error from remote peer";
+ if (endpoint.getRemoteCondition() != null) {
+ ErrorCondition error = endpoint.getRemoteCondition();
+ if (error.getDescription() != null && !error.getDescription().isEmpty()) {
+ message = error.getDescription();
+ }
+ }
+
+ return message;
+ }
+
+ @Override
+ public void processStateChange() throws IOException {
+ EndpointState remoteState = endpoint.getRemoteState();
+
+ if (remoteState == EndpointState.ACTIVE) {
+ if (isAwaitingOpen()) {
+ LOG.debug("{} is now open: ", this);
+ opened();
+ }
+
+ // Should not receive an ACTIVE event if not awaiting the open state.
+ } else if (remoteState == EndpointState.CLOSED) {
+ if (isAwaitingClose()) {
+ LOG.debug("{} is now closed: ", this);
+ closed();
+ } else if (isAwaitingOpen()) {
+ // Error on Open, create exception and signal failure.
+ LOG.warn("Open of {} failed: ", this);
+ Exception remoteError = this.getRemoteError();
+ failed(remoteError);
+ } else {
+ // TODO - Handle remote asynchronous close.
+ LOG.warn("{} was closed remotely.", this);
+ }
+ }
+ }
+
+ @Override
+ public void processDeliveryUpdates() throws IOException {
+ }
+
+ @Override
+ public void processFlowUpdates() throws IOException {
+ }
+
+ protected abstract void doOpen();
+
+ protected abstract void doClose();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index f3d09b0..d9a52da 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -36,7 +36,7 @@ import org.apache.qpid.proton.engine.Sasl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Connection> {
+public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Connection> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnection.class);
@@ -71,7 +71,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
this.authenticator = new AmqpSaslAuthenticator(sasl, info);
}
- this.info.getConnectionId().setProviderHint(this);
+ this.resource.getConnectionId().setProviderHint(this);
this.queuePrefix = info.getQueuePrefix();
this.topicPrefix = info.getTopicPrefix();
@@ -80,7 +80,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
- JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1);
+ JmsSessionInfo sessionInfo = new JmsSessionInfo(this.resource, -1);
sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
this.connectionSession = new AmqpConnectionSession(this, sessionInfo);
@@ -88,7 +88,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
@Override
protected void doOpen() {
- this.endpoint.setContainer(info.getClientId());
+ this.endpoint.setContainer(resource.getClientId());
this.endpoint.setHostname(remoteURI.getHost());
}
@@ -199,7 +199,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
}
public JmsConnectionInfo getConnectionInfo() {
- return this.info;
+ return this.resource;
}
public Connection getProtonConnection() {
@@ -211,11 +211,11 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
}
public String getUsername() {
- return this.info.getUsername();
+ return this.resource.getUsername();
}
public String getPassword() {
- return this.info.getPassword();
+ return this.resource.getPassword();
}
public AmqpProvider getProvider() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 0af029b..3caec3c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -58,7 +58,7 @@ import org.slf4j.LoggerFactory;
/**
* AMQP Consumer object that is used to manage JMS MessageConsumer semantics.
*/
-public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver> {
+public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConsumer.class);
@@ -81,20 +81,20 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
this.session = session;
// Add a shortcut back to this Consumer for quicker lookups
- this.info.getConsumerId().setProviderHint(this);
+ this.resource.getConsumerId().setProviderHint(this);
}
/**
* Starts the consumer by setting the link credit to the given prefetch value.
*/
public void start(AsyncResult request) {
- this.endpoint.flow(info.getPrefetchSize());
+ this.endpoint.flow(resource.getPrefetchSize());
request.onSuccess();
}
@Override
protected void doOpen() {
- JmsDestination destination = info.getDestination();
+ JmsDestination destination = resource.getDestination();
String subscription = session.getQualifiedName(destination);
Source source = new Source();
@@ -104,10 +104,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
configureSource(source);
String receiverName = getConsumerId() + ":" + subscription;
- if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) {
+ if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) {
// In the case of Durable Topic Subscriptions the client must use the same
// receiver name which is derived from the subscription name property.
- receiverName = info.getSubscriptionName();
+ receiverName = resource.getSubscriptionName();
}
endpoint = session.getProtonSession().receiver(receiverName);
@@ -136,7 +136,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
protected void configureSource(Source source) {
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
- if (info.getSubscriptionName() != null && !info.getSubscriptionName().isEmpty()) {
+ if (resource.getSubscriptionName() != null && !resource.getSubscriptionName().isEmpty()) {
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setDistributionMode(COPY);
@@ -145,12 +145,12 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
}
- if (info.isNoLocal()) {
+ if (resource.isNoLocal()) {
filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
}
- if (info.getSelector() != null && !info.getSelector().trim().equals("")) {
- filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(info.getSelector()));
+ if (resource.getSelector() != null && !resource.getSelector().trim().equals("")) {
+ filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resource.getSelector()));
}
if (!filters.isEmpty()) {
@@ -168,7 +168,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
* would already have been given for these so we just need to settle them.
*/
public void acknowledge() {
- LOG.trace("Session Acknowledge for consumer: {}", info.getConsumerId());
+ LOG.trace("Session Acknowledge for consumer: {}", resource.getConsumerId());
for (Delivery delivery : delivered.values()) {
delivery.disposition(Accepted.getInstance());
delivery.settle();
@@ -246,13 +246,13 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
* then we open the window back up to full prefetch size.
*/
private void sendFlowIfNeeded() {
- if (info.getPrefetchSize() == 0) {
+ if (resource.getPrefetchSize() == 0) {
return;
}
int currentCredit = endpoint.getCredit();
- if (currentCredit <= info.getPrefetchSize() * 0.2) {
- endpoint.flow(info.getPrefetchSize() - currentCredit);
+ if (currentCredit <= resource.getPrefetchSize() * 0.2) {
+ endpoint.flow(resource.getPrefetchSize() - currentCredit);
}
}
@@ -262,7 +262,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
* @throws Exception if an error occurs while performing the recover.
*/
public void recover() throws Exception {
- LOG.debug("Session Recover for consumer: {}", info.getConsumerId());
+ LOG.debug("Session Recover for consumer: {}", resource.getConsumerId());
for (Delivery delivery : delivered.values()) {
// TODO - increment redelivery counter and apply connection redelivery policy
// to those messages that are past max redlivery.
@@ -281,7 +281,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
* @param timeout
*/
public void pull(long timeout) {
- if (info.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
+ if (resource.getPrefetchSize() == 0 && endpoint.getCredit() == 0) {
// expand the credit window by one.
endpoint.flow(1);
}
@@ -329,7 +329,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
envelope.setMessage(message);
- envelope.setConsumerId(info.getConsumerId());
+ envelope.setConsumerId(resource.getConsumerId());
// Store link to delivery in the hint for use in acknowledge requests.
envelope.setProviderHint(incoming);
envelope.setMessageId(message.getFacade().getProviderMessageIdObject());
@@ -357,11 +357,11 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
}
public JmsConsumerId getConsumerId() {
- return this.info.getConsumerId();
+ return this.resource.getConsumerId();
}
public JmsDestination getDestination() {
- return this.info.getDestination();
+ return this.resource.getDestination();
}
public Receiver getProtonReceiver() {
@@ -382,7 +382,7 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
@Override
public String toString() {
- return "AmqpConsumer { " + this.info.getConsumerId() + " }";
+ return "AmqpConsumer { " + this.resource.getConsumerId() + " }";
}
protected void deliveryFailed(Delivery incoming, boolean expandCredit) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 3afc492..56ed79f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -235,8 +235,8 @@ public class AmqpFixedProducer extends AmqpProducer {
protected void doOpen() {
String targetAddress;
- if (info.getDestination() != null) {
- JmsDestination destination = info.getDestination();
+ if (resource.getDestination() != null) {
+ JmsDestination destination = resource.getDestination();
targetAddress = session.getQualifiedName(destination);
} else {
targetAddress = connection.getProperties().getAnonymousRelayName();
@@ -274,7 +274,7 @@ public class AmqpFixedProducer extends AmqpProducer {
@Override
public boolean isAnonymous() {
- return this.info.getDestination() == null;
+ return this.resource.getDestination() == null;
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
index 81ebf9e..473f184 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProducer.java
@@ -29,7 +29,7 @@ import org.apache.qpid.proton.engine.Sender;
/**
* Base class for Producer instances.
*/
-public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo, Sender> {
+public abstract class AmqpProducer extends AmqpAbstractResource<JmsProducerInfo, Sender> {
protected final AmqpSession session;
protected final AmqpConnection connection;
@@ -41,7 +41,7 @@ public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo,
this.connection = session.getConnection();
// Add a shortcut back to this Producer for quicker lookup.
- this.info.getProducerId().setProviderHint(this);
+ this.resource.getProducerId().setProviderHint(this);
}
/**
@@ -69,7 +69,7 @@ public abstract class AmqpProducer extends AbstractAmqpResource<JmsProducerInfo,
* @return the JmsProducerId that was assigned to this AmqpProducer.
*/
public JmsProducerId getProducerId() {
- return this.info.getProducerId();
+ return this.resource.getProducerId();
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
index 10e165b..6434bbd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpQueueBrowser.java
@@ -46,7 +46,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
*/
@Override
public void start(AsyncResult request) {
- this.endpoint.flow(info.getPrefetchSize());
+ this.endpoint.flow(resource.getPrefetchSize());
request.onSuccess();
}
@@ -66,7 +66,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
public void pull(long timeout) {
if (!endpoint.getDrain() && endpoint.current() == null && endpoint.getUnsettled() == 0) {
LOG.trace("QueueBrowser {} will try to drain remote.", getConsumerId());
- this.endpoint.drain(info.getPrefetchSize());
+ this.endpoint.drain(resource.getPrefetchSize());
} else {
endpoint.setDrain(false);
}
@@ -113,7 +113,7 @@ public class AmqpQueueBrowser extends AmqpConsumer {
@Override
protected void configureSource(Source source) {
- if (info.isBrowser()) {
+ if (resource.isBrowser()) {
source.setDistributionMode(COPY);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
index 10519f3..a671ffc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java
@@ -34,7 +34,7 @@ import org.apache.qpid.proton.engine.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
+public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpSession.class);
@@ -48,8 +48,8 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
super(info, connection.getProtonConnection().session());
this.connection = connection;
- this.info.getSessionId().setProviderHint(this);
- if (this.info.isTransacted()) {
+ this.resource.getSessionId().setProviderHint(this);
+ if (this.resource.isTransacted()) {
txContext = new AmqpTransactionContext(this);
} else {
txContext = null;
@@ -165,7 +165,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
* @throws Exception if an error occurs while performing the operation.
*/
public void begin(JmsTransactionId txId, AsyncResult request) throws Exception {
- if (!this.info.isTransacted()) {
+ if (!this.resource.isTransacted()) {
throw new IllegalStateException("Non-transacted Session cannot start a TX.");
}
@@ -181,7 +181,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
* @throws Exception if an error occurs while performing the operation.
*/
public void commit(AsyncResult request) throws Exception {
- if (!this.info.isTransacted()) {
+ if (!this.resource.isTransacted()) {
throw new IllegalStateException("Non-transacted Session cannot start a TX.");
}
@@ -197,7 +197,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
* @throws Exception if an error occurs while performing the operation.
*/
public void rollback(AsyncResult request) throws Exception {
- if (!this.info.isTransacted()) {
+ if (!this.resource.isTransacted()) {
throw new IllegalStateException("Non-transacted Session cannot start a TX.");
}
@@ -256,7 +256,7 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
}
public JmsSessionId getSessionId() {
- return this.info.getSessionId();
+ return this.resource.getSessionId();
}
public Session getProtonSession() {
@@ -264,11 +264,11 @@ public class AmqpSession extends AbstractAmqpResource<JmsSessionInfo, Session> {
}
boolean isTransacted() {
- return this.info.isTransacted();
+ return this.resource.isTransacted();
}
boolean isAsyncAck() {
- return this.info.isSendAcksAsync() || isTransacted();
+ return this.resource.isSendAcksAsync() || isTransacted();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
index 4107e3e..1376874 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTemporaryDestination.java
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* the broker in the case where the user does not have authorization to access temporary
* destinations.
*/
-public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestination, Sender> {
+public class AmqpTemporaryDestination extends AmqpAbstractResource<JmsDestination, Sender> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTemporaryDestination.class);
@@ -58,10 +58,10 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
EndpointState remoteState = endpoint.getRemoteState();
if (remoteState == EndpointState.ACTIVE) {
- LOG.trace("Temporary Destination: {} is now open", this.info);
+ LOG.trace("Temporary Destination: {} is now open", this.resource);
opened();
} else if (remoteState == EndpointState.CLOSED) {
- LOG.trace("Temporary Destination: {} is now closed", this.info);
+ LOG.trace("Temporary Destination: {} is now closed", this.resource);
closed();
}
}
@@ -70,12 +70,12 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
public void opened() {
// Once our producer is opened we can read the updated name from the target address.
- String oldDestinationName = info.getName();
+ String oldDestinationName = resource.getName();
String destinationName = this.endpoint.getRemoteTarget().getAddress();
- this.info.setName(destinationName);
+ this.resource.setName(destinationName);
- LOG.trace("Updated temp destination to: {} from: {}", info, oldDestinationName);
+ LOG.trace("Updated temp destination to: {} from: {}", resource, oldDestinationName);
super.opened();
}
@@ -83,8 +83,8 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
@Override
protected void doOpen() {
- String sourceAddress = info.getName();
- if (info.isQueue()) {
+ String sourceAddress = resource.getName();
+ if (resource.isQueue()) {
sourceAddress = connection.getTempQueuePrefix() + sourceAddress;
} else {
// TODO - AMQ doesn't support temp topics so we make everything a temp queue for now
@@ -123,11 +123,11 @@ public class AmqpTemporaryDestination extends AbstractAmqpResource<JmsDestinatio
}
public JmsDestination getJmsDestination() {
- return this.info;
+ return this.resource;
}
@Override
public String toString() {
- return getClass().getSimpleName() + " { " + info + "}";
+ return getClass().getSimpleName() + " { " + resource + "}";
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/ec13a0d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
index 1cc6fd1..6bfbaa5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java
@@ -52,7 +52,7 @@ import org.slf4j.LoggerFactory;
* The Transaction will carry a JmsTransactionId while the Transaction is open, once a
* transaction has been committed or rolled back the Transaction Id is cleared.
*/
-public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo, Sender> {
+public class AmqpTransactionContext extends AmqpAbstractResource<JmsSessionInfo, Sender> {
private static final Logger LOG = LoggerFactory.getLogger(AmqpTransactionContext.class);
@@ -72,7 +72,7 @@ public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo,
*
* @param session
* The session that owns this transaction
- * @param info
+ * @param resource
* The JmsTransactionInfo that defines this Transaction.
*/
public AmqpTransactionContext(AmqpSession session) {
@@ -134,7 +134,7 @@ public class AmqpTransactionContext extends AbstractAmqpResource<JmsSessionInfo,
coordinator.setCapabilities(TxnCapability.LOCAL_TXN, TxnCapability.MULTI_TXNS_PER_SSN);
Source source = new Source();
- String coordinatorName = info.getSessionId().toString();
+ String coordinatorName = resource.getSessionId().toString();
endpoint = session.getProtonSession().sender(coordinatorName);
endpoint.setSource(source);
endpoint.setTarget(coordinator);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] git commit: Create a specialized session object used soley by
the connection to provide a place for things like durable unsubscribe, etc.
Posted by ta...@apache.org.
Create a specialized session object used soley by the connection to
provide a place for things like durable unsubscribe, etc.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/1ec8e2ca
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/1ec8e2ca
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/1ec8e2ca
Branch: refs/heads/master
Commit: 1ec8e2ca8521819b982b1ba853859a12b32014a5
Parents: 49db0e5
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Oct 16 16:51:25 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Oct 16 16:51:25 2014 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConnection.java | 8 ++-
.../provider/amqp/AmqpConnectionSession.java | 51 ++++++++++++++++++++
.../qpid/jms/provider/amqp/AmqpProvider.java | 2 +-
3 files changed, 58 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1ec8e2ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 97118d7..f3d09b0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -48,7 +48,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
private final AmqpProvider provider;
private boolean connected;
private AmqpSaslAuthenticator authenticator;
- private final AmqpSession connectionSession;
+ private final AmqpConnectionSession connectionSession;
private AmqpConnectionProperties properties;
private String queuePrefix;
@@ -83,7 +83,7 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
JmsSessionInfo sessionInfo = new JmsSessionInfo(this.info, -1);
sessionInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
- this.connectionSession = new AmqpSession(this, sessionInfo);
+ this.connectionSession = new AmqpConnectionSession(this, sessionInfo);
}
@Override
@@ -106,6 +106,10 @@ public class AmqpConnection extends AbstractAmqpResource<JmsConnectionInfo, Conn
return temporary;
}
+ public void unsubscribe(String subscriptionName, AsyncResult request) {
+ connectionSession.unsubscribe(subscriptionName, request);
+ }
+
/**
* Called on receiving an event from Proton indicating a state change on the remote
* side of the Connection.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1ec8e2ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
new file mode 100644
index 0000000..341893e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.AsyncResult;
+
+/**
+ * Subclass of the standard session object used solely by AmqpConnection to
+ * aid in managing connection resources that require a persistent session.
+ */
+public class AmqpConnectionSession extends AmqpSession {
+
+ /**
+ * Create a new instance of a Connection owned Session object.
+ *
+ * @param connection
+ * the connection that owns this session.
+ * @param info
+ * the <code>JmsSessionInfo</code> for the Session to create.
+ */
+ public AmqpConnectionSession(AmqpConnection connection, JmsSessionInfo info) {
+ super(connection, info);
+ }
+
+ /**
+ * Used to remove an existing durable topic subscription from the remote broker.
+ *
+ * @param subscriptionName
+ * the subscription name that is to be removed.
+ * @param request
+ * the request that awaits the completion of this action.
+ */
+ public void unsubscribe(String subscriptionName, AsyncResult request) {
+ request.onSuccess();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/1ec8e2ca/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 3928b03..e5b5f7a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -524,8 +524,8 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
public void run() {
try {
checkClosed();
+ connection.unsubscribe(subscription, request);
pumpToProtonTransport();
- request.onSuccess();
} catch (Exception error) {
request.onFailure(error);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org