You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/09/27 21:04:24 UTC
qpid-jms git commit: QPIDJMS-X Implement JMS ConnectionConsumer
functionality
Repository: qpid-jms
Updated Branches:
refs/heads/connection-consumer-2 [created] f0042a6ba
QPIDJMS-X Implement JMS ConnectionConsumer functionality
Add support for JMS ConnectionConsumer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/f0042a6b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/f0042a6b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/f0042a6b
Branch: refs/heads/connection-consumer-2
Commit: f0042a6bac8d29b8730f86d043f09b208a2eed4a
Parents: f4fcfab
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Sep 14 17:12:48 2017 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 27 17:04:01 2017 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 205 +++++--
.../apache/qpid/jms/JmsConnectionConsumer.java | 289 ++++++++++
.../org/apache/qpid/jms/JmsMessageConsumer.java | 15 +-
.../java/org/apache/qpid/jms/JmsSession.java | 103 +++-
.../jms/message/JmsInboundMessageDispatch.java | 10 +
.../apache/qpid/jms/meta/JmsConsumerInfo.java | 20 +
.../qpid/jms/policy/JmsPrefetchPolicy.java | 2 +-
.../qpid/jms/provider/amqp/AmqpConnection.java | 9 +
.../provider/amqp/AmqpConnectionSession.java | 16 +
.../qpid/jms/provider/amqp/AmqpProvider.java | 9 +-
.../org/apache/qpid/jms/JmsConnectionTest.java | 38 --
.../org/apache/qpid/jms/JmsSessionTest.java | 5 -
.../ConnectionConsumerIntegrationTest.java | 540 +++++++++++++++++++
.../failover/FailoverIntegrationTest.java | 71 +++
14 files changed, 1217 insertions(+), 115 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index d3a77c7..cbb9b72 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -55,6 +55,7 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.meta.JmsConnectionId;
import org.apache.qpid.jms.meta.JmsConnectionInfo;
@@ -80,6 +81,9 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.ProviderSynchronization;
+import org.apache.qpid.jms.util.FifoMessageQueue;
+import org.apache.qpid.jms.util.MessageQueue;
+import org.apache.qpid.jms.util.PriorityMessageQueue;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
import org.apache.qpid.jms.util.ThreadPoolUtils;
import org.slf4j.Logger;
@@ -92,7 +96,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class);
- private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<JmsSessionId, JmsSession>();
+ private final Map<JmsSessionId, JmsSession> sessions = new ConcurrentHashMap<>();
+ private final Map<JmsConsumerId, JmsConnectionConsumer> connectionConsumers = new ConcurrentHashMap<>();
private final AtomicBoolean connected = new AtomicBoolean();
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean closing = new AtomicBoolean();
@@ -105,15 +110,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
private JmsMessageFactory messageFactory;
private Provider provider;
- private final Set<JmsConnectionListener> connectionListeners =
- new CopyOnWriteArraySet<JmsConnectionListener>();
- private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations =
- new ConcurrentHashMap<JmsTemporaryDestination, JmsTemporaryDestination>();
+ private final Set<JmsConnectionListener> connectionListeners = new CopyOnWriteArraySet<>();
+ private final Map<JmsTemporaryDestination, JmsTemporaryDestination> tempDestinations = new ConcurrentHashMap<>();
private final AtomicLong sessionIdGenerator = new AtomicLong();
private final AtomicLong tempDestIdGenerator = new AtomicLong();
private final AtomicLong transactionIdGenerator = new AtomicLong();
-
- private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<AsyncResult, AsyncResult>();
+ private final AtomicLong connectionConsumerIdGenerator = new AtomicLong();
+ private final Map<AsyncResult, AsyncResult> requests = new ConcurrentHashMap<>();
protected JmsConnection(final JmsConnectionInfo connectionInfo, Provider provider) throws JMSException {
@@ -203,6 +206,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
session.shutdown();
}
+ for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+ connectionConsumer.shutdown();
+ }
+
if (isConnected() && !isFailed()) {
ProviderFuture request = new ProviderFuture();
requests.put(request, request);
@@ -272,6 +279,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
session.shutdown(cause);
}
+ for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+ connectionConsumer.shutdown();
+ }
+
if (isConnected() && !isFailed() && !closing.get()) {
destroyResource(connectionInfo);
}
@@ -343,9 +354,13 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
createJmsConnection();
if (started.compareAndSet(false, true)) {
try {
- for (JmsSession s : sessions.values()) {
- s.start();
+ for (JmsSession session : sessions.values()) {
+ session.start();
}
+
+ for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+ connectionConsumer.start();
+ }
} catch (Exception e) {
throw JmsExceptionSupport.create(e);
}
@@ -377,9 +392,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
if (started.compareAndSet(true, false)) {
- synchronized(sessions) {
- for (JmsSession s : sessions.values()) {
- s.stop();
+ synchronized (sessions) {
+ for (JmsSession session : sessions.values()) {
+ session.stop();
+ }
+ }
+
+ synchronized (connectionConsumers) {
+ for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+ connectionConsumer.stop();
}
}
}
@@ -389,48 +410,95 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
- throw new JMSException("Not supported");
+
+ return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, false, true);
}
@Override
public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
- throw new JMSException("Not supported");
+
+ return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, true);
}
@Override
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
- String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
- throw new JMSException("Not supported");
+
+ return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, subscriptionName, true, false);
}
@Override
- public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
- throw new JMSException("Not supported");
+
+ return createConnectionConsumer(destination, messageSelector, sessionPool, maxMessages, null, false, false);
}
@Override
- public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
- throw new JMSException("Not supported");
+
+ return createConnectionConsumer(topic, messageSelector, sessionPool, maxMessages, null, false, false);
}
@Override
- public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
- ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
createJmsConnection();
- throw new JMSException("Not supported");
+
+ return createConnectionConsumer(queue, messageSelector, sessionPool, maxMessages, null, false, false);
}
+ private ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector, ServerSessionPool sessionPool, int maxMessages, String subscriptionName, boolean durable, boolean shared) throws JMSException {
+ JmsDestination jmsDestination = JmsMessageTransformation.transformDestination(this, destination);
+
+ int configuredPrefetch = getPrefetchPolicy().getConfiguredPrefetch((JmsSession) null, jmsDestination, durable, false);
+
+ final MessageQueue messageQueue;
+
+ if (isLocalMessagePriority()) {
+ messageQueue = new PriorityMessageQueue();
+ } else {
+ messageQueue = new FifoMessageQueue(configuredPrefetch);
+ }
+
+ JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue);
+ consumerInfo.setExplicitClientID(isExplicitClientID());
+ consumerInfo.setSelector(messageSelector);
+ consumerInfo.setDurable(durable);
+ consumerInfo.setSubscriptionName(subscriptionName);
+ consumerInfo.setShared(shared);
+ consumerInfo.setDestination(jmsDestination);
+ consumerInfo.setAcknowledgementMode(Session.AUTO_ACKNOWLEDGE);
+ consumerInfo.setNoLocal(false);
+ consumerInfo.setBrowser(false);
+ consumerInfo.setPrefetchSize(configuredPrefetch);
+ consumerInfo.setRedeliveryPolicy(getRedeliveryPolicy().copy());
+ consumerInfo.setLocalMessageExpiry(isLocalMessageExpiry());
+ consumerInfo.setPresettle(false);
+ consumerInfo.setDeserializationPolicy(getDeserializationPolicy().copy());
+ consumerInfo.setMaxMessages(maxMessages);
+ consumerInfo.setConnectionConsumer(true);
+
+ JmsConnectionConsumer consumer = new JmsConnectionConsumer(this, consumerInfo, messageQueue, sessionPool);
+
+ try {
+ consumer.init();
+ if (started.get()) {
+ consumer.start();
+ }
+ return consumer;
+ } catch (JMSException jmsEx) {
+ consumer.close();
+ throw jmsEx;
+ }
+ }
+
@Override
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
@@ -491,6 +559,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
sessions.put(sessionInfo.getId(), session);
}
+ protected void removeConnectionConsumer(JmsConsumerInfo consumerInfo) throws JMSException {
+ connectionConsumers.remove(consumerInfo.getId());
+ }
+
+ protected void addConnectionConsumer(JmsConsumerInfo consumerInfo, JmsConnectionConsumer consumer) {
+ connectionConsumers.put(consumerInfo.getId(), consumer);
+ }
+
private void createJmsConnection() throws JMSException {
if (isConnected() || closed.get()) {
return;
@@ -583,6 +659,10 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
return new JmsTransactionId(connectionInfo.getId(), transactionIdGenerator.incrementAndGet());
}
+ protected JmsConsumerId getNextConnectionConsumerId() {
+ return new JmsConsumerId(connectionInfo.getId().toString(), -1, connectionConsumerIdGenerator.incrementAndGet());
+ }
+
protected synchronized boolean isExplicitClientID() {
return connectionInfo.isExplicitClientID();
}
@@ -1100,6 +1180,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
JmsMessageDispatcher dispatcher = sessions.get(envelope.getConsumerId().getParentId());
if (dispatcher != null) {
dispatcher.onInboundMessage(envelope);
+ } else {
+ dispatcher = connectionConsumers.get(envelope.getConsumerId());
+ if (dispatcher != null) {
+ dispatcher.onInboundMessage(envelope);
+ }
}
// Run the application callbacks on the connection executor to allow the provider to
@@ -1170,6 +1255,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
request.sync();
}
+ for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+ JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
+ if (consumerInfo.isOpen()) {
+ request = new ProviderFuture();
+ provider.create(consumerInfo, request);
+ request.sync();
+ }
+ }
+
for (JmsSession session : sessions.values()) {
session.onConnectionRecovery(provider);
}
@@ -1182,6 +1276,15 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
setMessageFactory(provider.getMessageFactory());
connectionInfo.setConnectedURI(provider.getRemoteURI());
+ for (JmsConnectionConsumer connectionConsumer : connectionConsumers.values()) {
+ JmsConsumerInfo consumerInfo = connectionConsumer.getConsumerInfo();
+ if (consumerInfo.isOpen()) {
+ ProviderFuture request = new ProviderFuture();
+ provider.start(consumerInfo, request);
+ request.sync();
+ }
+ }
+
for (JmsSession session : sessions.values()) {
session.onConnectionRecovered(provider);
}
@@ -1311,14 +1414,22 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
}
} else if (resource instanceof JmsConsumerInfo) {
- JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
- JmsSession session = sessions.get(parentId);
- if (session != null) {
- JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
+ JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource;
+ if (consumerInfo.isConnectionConsumer()) {
+ JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId());
if (consumer != null) {
consumer.setFailureCause(cause);
}
- }
+ } else {
+ JmsSessionId parentId = consumerInfo.getParentId();
+ JmsSession session = sessions.get(parentId);
+ if (session != null) {
+ JmsMessageConsumer consumer = session.lookup((JmsConsumerId) resource.getId());
+ if (consumer != null) {
+ consumer.setFailureCause(cause);
+ }
+ }
+ }
}
executor.execute(new Runnable() {
@@ -1345,16 +1456,32 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
}
} else if (resource instanceof JmsConsumerInfo) {
- JmsSessionId parentId = ((JmsConsumerInfo) resource).getParentId();
- JmsSession session = sessions.get(parentId);
- if (session != null) {
- JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
- if (consumer != null) {
- for (JmsConnectionListener listener : connectionListeners) {
- listener.onConsumerClosed(consumer, cause);
+ JmsConsumerInfo consumerInfo = (JmsConsumerInfo) resource;
+ if (consumerInfo.isConnectionConsumer()) {
+ JmsConnectionConsumer consumer = connectionConsumers.get(consumerInfo.getId());
+ if (consumer != null) {
+ try {
+ if (consumer != null) {
+ consumer.shutdown(cause);
+ }
+ } catch (Throwable error) {
+ LOG.trace("Ignoring exception thrown during cleanup of closed connection consumer", error);
}
+
+ onAsyncException(new JMSException("Connection Consumer remotely closed").initCause(cause));
}
- }
+ } else {
+ JmsSessionId parentId = consumerInfo.getParentId();
+ JmsSession session = sessions.get(parentId);
+ if (session != null) {
+ JmsMessageConsumer consumer = session.consumerClosed((JmsConsumerInfo) resource, cause);
+ if (consumer != null) {
+ for (JmsConnectionListener listener : connectionListeners) {
+ listener.onConsumerClosed(consumer, cause);
+ }
+ }
+ }
+ }
} else {
LOG.info("A JMS resource has been remotely closed: {}", resource);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
new file mode 100644
index 0000000..862b959
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionConsumer.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
+import org.apache.qpid.jms.meta.JmsResource.ResourceState;
+import org.apache.qpid.jms.util.MessageQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JMS Connection Consumer implementation.
+ */
+public class JmsConnectionConsumer implements ConnectionConsumer, JmsMessageDispatcher {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionConsumer.class);
+
+ private static final long DEFAULT_DISPATCH_RETRY_DELAY = 1000;
+
+ private final JmsConnection connection;
+ private final JmsConsumerInfo consumerInfo;
+ private final ServerSessionPool sessionPool;
+ private final MessageQueue messageQueue;
+
+ private final Lock stateLock = new ReentrantLock();
+ private final Lock dispatchLock = new ReentrantLock();
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicReference<Throwable> failureCause = new AtomicReference<>();
+ private final ScheduledThreadPoolExecutor dispatcher;
+
+ public JmsConnectionConsumer(JmsConnection connection, JmsConsumerInfo consumerInfo, MessageQueue messageQueue, ServerSessionPool sessionPool) throws JMSException {
+ this.connection = connection;
+ this.consumerInfo = consumerInfo;
+ this.sessionPool = sessionPool;
+ this.messageQueue = messageQueue;
+ this.dispatcher = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread serial = new Thread(runner);
+ serial.setDaemon(true);
+ serial.setName(this.getClass().getSimpleName() + ":(" + consumerInfo.getId() + ")");
+ return serial;
+ }
+ });
+
+ // Ensure a timely shutdown for consumer close.
+ dispatcher.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
+ dispatcher.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
+
+ connection.addConnectionConsumer(consumerInfo, this);
+ try {
+ connection.createResource(consumerInfo);
+ } catch (JMSException jmse) {
+ connection.removeConnectionConsumer(consumerInfo);
+ throw jmse;
+ }
+ }
+
+ public JmsConnectionConsumer init() throws JMSException {
+ getConnection().startResource(consumerInfo);
+ return this;
+ }
+
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ envelope.setConsumerInfo(consumerInfo);
+
+ stateLock.lock();
+ try {
+ if (envelope.isEnqueueFirst()) {
+ this.messageQueue.enqueueFirst(envelope);
+ } else {
+ this.messageQueue.enqueue(envelope);
+ }
+
+ if (messageQueue.isRunning()) {
+ try {
+ dispatcher.execute(() -> deliverNextPending());
+ } catch (RejectedExecutionException rje) {
+ LOG.debug("Rejected on attempt to queue message dispatch", rje);
+ }
+ }
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ @Override
+ public void close() throws JMSException {
+ if (!closed.get()) {
+ doClose();
+ }
+ }
+
+ /**
+ * Called to initiate shutdown of consumer resources and request that the remote
+ * peer remove the registered producer.
+ *
+ * @throws JMSException if an error occurs during the consumer close operation.
+ */
+ protected void doClose() throws JMSException {
+ shutdown();
+ this.connection.destroyResource(consumerInfo);
+ }
+
+ protected void shutdown() throws JMSException {
+ shutdown(null);
+ }
+
+ protected void shutdown(Throwable cause) throws JMSException {
+ if (closed.compareAndSet(false, true)) {
+ dispatchLock.lock();
+ try {
+ failureCause.set(cause);
+ consumerInfo.setState(ResourceState.CLOSED);
+ connection.removeConnectionConsumer(consumerInfo);
+ stop(true);
+ dispatcher.shutdown();
+ try {
+ dispatcher.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.trace("ConnectionConsumer shutdown of dispatcher was interupted");
+ }
+ } finally {
+ dispatchLock.unlock();
+ }
+ }
+ }
+
+ public void start() {
+ stateLock.lock();
+ try {
+ if (!messageQueue.isRunning()) {
+ this.messageQueue.start();
+ this.dispatcher.execute(new BoundedMessageDeliverTask(messageQueue.size()));
+ }
+ } finally {
+ stateLock.unlock();
+ }
+ }
+
+ public void stop() {
+ stop(false);
+ }
+
+ private void stop(boolean closeMessageQueue) {
+ dispatchLock.lock();
+ stateLock.lock();
+ try {
+ if (closeMessageQueue) {
+ this.messageQueue.close();
+ } else {
+ this.messageQueue.stop();
+ }
+ } finally {
+ stateLock.unlock();
+ dispatchLock.unlock();
+ }
+ }
+
+ @Override
+ public ServerSessionPool getServerSessionPool() throws JMSException {
+ checkClosed();
+ return sessionPool;
+ }
+
+ JmsConnection getConnection() {
+ return connection;
+ }
+
+ JmsConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ void setFailureCause(Throwable failureCause) {
+ this.failureCause.set(failureCause);
+ }
+
+ Throwable getFailureCause() {
+ return failureCause.get();
+ }
+
+ @Override
+ public String toString() {
+ return "JmsConnectionConsumer { id=" + consumerInfo.getId() + " }";
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (closed.get()) {
+ IllegalStateException jmsEx = null;
+
+ if (getFailureCause() == null) {
+ jmsEx = new IllegalStateException("The ConnectionConsumer is closed");
+ } else {
+ jmsEx = new IllegalStateException("The ConnectionConsumer was closed due to an unrecoverable error.");
+ jmsEx.initCause(getFailureCause());
+ }
+
+ throw jmsEx;
+ }
+ }
+
+ private boolean deliverNextPending() {
+ if (messageQueue.isRunning() && !messageQueue.isEmpty()) {
+ dispatchLock.lock();
+
+ try {
+ ServerSession serverSession = getServerSessionPool().getServerSession();
+ if (serverSession == null) {
+ // There might not be an available session so queue a task to try again
+ // and hope that by then one is available in the pool.
+ dispatcher.schedule(new BoundedMessageDeliverTask(messageQueue.size()), DEFAULT_DISPATCH_RETRY_DELAY, TimeUnit.MILLISECONDS);
+ return false;
+ }
+
+ Session session = serverSession.getSession();
+
+ JmsInboundMessageDispatch envelope = messageQueue.dequeueNoWait();
+
+ if (session instanceof JmsSession) {
+ ((JmsSession) session).enqueueInSession(envelope);
+ } else {
+ LOG.warn("ServerSession provided an onknown JMS Session type to this connection consumer: {}", session);
+ }
+
+ serverSession.start();
+ } catch (JMSException e) {
+ connection.onAsyncException(e);
+ stop(true);
+ } finally {
+ dispatchLock.unlock();
+ }
+ }
+
+ return !messageQueue.isEmpty();
+ }
+
+ private final class BoundedMessageDeliverTask implements Runnable {
+
+ private final int deliveryCount;
+
+ public BoundedMessageDeliverTask(int deliveryCount) {
+ this.deliveryCount = deliveryCount;
+ }
+
+ @Override
+ public void run() {
+ int current = 0;
+
+ while (messageQueue.isRunning() && current++ < deliveryCount) {
+ if (!deliverNextPending()) {
+ return; // Another task already drained the queue.
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index bd09e77..40ff378 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -328,7 +328,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
timeout = Math.max(deadline - System.currentTimeMillis(), 0);
}
performPullIfRequired(timeout, false);
- } else if (redeliveryExceeded(envelope)) {
+ } else if (session.redeliveryExceeded(envelope)) {
LOG.debug("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
applyRedeliveryPolicyOutcome(envelope);
if (timeout > 0) {
@@ -356,15 +356,6 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
return false;
}
- protected boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
- LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount());
-
- JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
- return redeliveryPolicy != null &&
- redeliveryPolicy.getMaxRedeliveries(getDestination()) >= 0 &&
- redeliveryPolicy.getMaxRedeliveries(getDestination()) < envelope.getRedeliveryCount();
- }
-
protected void checkClosed() throws IllegalStateException {
if (closed.get()) {
IllegalStateException jmsEx = null;
@@ -470,6 +461,8 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
*/
@Override
public void onInboundMessage(final JmsInboundMessageDispatch envelope) {
+ envelope.setConsumerInfo(consumerInfo);
+
lock.lock();
try {
if (acknowledgementMode == Session.CLIENT_ACKNOWLEDGE) {
@@ -718,7 +711,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
if (consumeExpiredMessage(envelope)) {
LOG.trace("{} filtered expired message: {}", getConsumerId(), envelope);
doAckExpired(envelope);
- } else if (redeliveryExceeded(envelope)) {
+ } else if (session.redeliveryExceeded(envelope)) {
LOG.trace("{} filtered message with excessive redelivery count: {}", getConsumerId(), envelope);
applyRedeliveryPolicyOutcome(envelope);
} else {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 6654d70..a1020b3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms;
+import static org.apache.qpid.jms.message.JmsMessageSupport.lookupAckTypeForDisposition;
+
import java.io.Serializable;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
@@ -92,6 +94,8 @@ import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.selector.SelectorParser;
import org.apache.qpid.jms.selector.filter.FilterException;
+import org.apache.qpid.jms.util.FifoMessageQueue;
+import org.apache.qpid.jms.util.MessageQueue;
import org.apache.qpid.jms.util.NoOpExecutor;
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
import org.slf4j.Logger;
@@ -109,6 +113,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final Map<JmsProducerId, JmsMessageProducer> producers = new ConcurrentHashMap<JmsProducerId, JmsMessageProducer>();
private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
private MessageListener messageListener;
+ private final MessageQueue sessionQueue = new FifoMessageQueue(16);
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean started = new AtomicBoolean();
private final JmsSessionInfo sessionInfo;
@@ -189,7 +194,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
@Override
public void setMessageListener(MessageListener listener) throws JMSException {
- checkClosed();
+ if (listener != null) {
+ checkClosed();
+ }
+
this.messageListener = listener;
}
@@ -249,17 +257,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
@Override
- public void run() {
- try {
- checkClosed();
- } catch (IllegalStateException e) {
- throw new RuntimeException(e);
- }
-
- throw new UnsupportedOperationException();
- }
-
- @Override
public void close() throws JMSException {
checkIsDeliveryThread();
checkIsCompletionThread();
@@ -714,6 +711,55 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return connection.createTemporaryTopic();
}
+ //----- Session dispatch support -----------------------------------------//
+
+ @Override
+ public void run() {
+ try {
+ checkClosed();
+ } catch (IllegalStateException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ JmsInboundMessageDispatch envelope = null;
+ while ((envelope = sessionQueue.dequeueNoWait()) != null) {
+ try {
+ JmsMessage copy = null;
+
+ if (envelope.getMessage().isExpired()) {
+ LOG.trace("{} filtered expired message: {}", envelope.getConsumerId(), envelope);
+ acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+ } else if (redeliveryExceeded(envelope)) {
+ LOG.trace("{} filtered message with excessive redelivery count: {}", envelope.getConsumerId(), envelope);
+ JmsRedeliveryPolicy redeliveryPolicy = envelope.getConsumerInfo().getRedeliveryPolicy();
+ acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(envelope.getConsumerInfo().getDestination())));
+ } else {
+ boolean deliveryFailed = false;
+
+ copy = acknowledge(envelope, ACK_TYPE.DELIVERED).getMessage().copy();
+
+ clearSessionRecovered();
+
+ try {
+ messageListener.onMessage(copy);
+ } catch (RuntimeException rte) {
+ deliveryFailed = true;
+ }
+
+ if (!isSessionRecovered()) {
+ if (!deliveryFailed) {
+ acknowledge(envelope, ACK_TYPE.ACCEPTED);
+ } else {
+ acknowledge(envelope, ACK_TYPE.RELEASED);
+ }
+ }
+ }
+ } catch (Exception e) {
+ getConnection().onException(e);
+ }
+ }
+ }
+
//----- Session Implementation methods -----------------------------------//
protected void add(JmsMessageConsumer consumer) throws JMSException {
@@ -914,8 +960,9 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
- void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+ JmsInboundMessageDispatch acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
transactionContext.acknowledge(connection, envelope, ackType);
+ return envelope;
}
/**
@@ -1016,6 +1063,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
for (JmsMessageConsumer consumer : consumers.values()) {
consumer.start();
}
+
+ sessionQueue.start();
}
}
@@ -1026,6 +1075,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
consumer.stop();
}
+ sessionQueue.stop();
+
synchronized (sessionInfo) {
if (deliveryExecutor != null) {
deliveryExecutor.shutdown();
@@ -1232,6 +1283,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
+ boolean redeliveryExceeded(JmsInboundMessageDispatch envelope) {
+ LOG.trace("checking envelope with {} redeliveries", envelope.getRedeliveryCount());
+
+ JmsConsumerInfo consumerInfo = envelope.getConsumerInfo();
+
+ JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
+ return redeliveryPolicy != null &&
+ redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) >= 0 &&
+ redeliveryPolicy.getMaxRedeliveries(consumerInfo.getDestination()) < envelope.getRedeliveryCount();
+ }
+
//----- Event handlers ---------------------------------------------------//
@Override
@@ -1306,16 +1368,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
if (id == null) {
this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage()));
}
- if (messageListener != null) {
- messageListener.onMessage(envelope.getMessage());
- } else {
- JmsMessageConsumer consumer = consumers.get(id);
- if (consumer != null) {
- consumer.onInboundMessage(envelope);
- }
+
+ JmsMessageConsumer consumer = consumers.get(id);
+ if (consumer != null) {
+ consumer.onInboundMessage(envelope);
}
}
+ void enqueueInSession(JmsInboundMessageDispatch envelope) {
+ sessionQueue.enqueue(envelope);
+ }
+
//----- Asynchronous Send Helpers ----------------------------------------//
private final class FailOrCompleteAsyncCompletionsTask implements Runnable {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
index fbb5a1d..e55426f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsInboundMessageDispatch.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.message;
import org.apache.qpid.jms.meta.JmsAbstractResourceId;
import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsConsumerInfo;
/**
* Envelope used to deliver incoming messages to their targeted consumer.
@@ -31,6 +32,7 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
private boolean enqueueFirst;
private boolean delivered;
+ private transient JmsConsumerInfo consumerInfo;
private transient String stringView;
public JmsInboundMessageDispatch(long sequence) {
@@ -83,6 +85,14 @@ public class JmsInboundMessageDispatch extends JmsAbstractResourceId {
return redeliveryCount;
}
+ public JmsConsumerInfo getConsumerInfo() {
+ return consumerInfo;
+ }
+
+ public void setConsumerInfo(JmsConsumerInfo consumerInfo) {
+ this.consumerInfo = consumerInfo;
+ }
+
@Override
public String toString() {
if (stringView == null) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index 74256ff..b2c1d07 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -38,6 +38,8 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
private int acknowledgementMode;
private boolean localMessageExpiry;
private boolean presettle;
+ private boolean connectionConsumer;
+ private int maxMessages;
private volatile boolean listener;
private final MessageQueue messageQueue;
@@ -76,6 +78,8 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
info.redeliveryPolicy = getRedeliveryPolicy().copy();
info.deserializationPolicy = getDeserializationPolicy().copy();
info.listener = listener;
+ info.connectionConsumer = connectionConsumer;
+ info.maxMessages = maxMessages;
}
public int getPrefetchedMessageCount() {
@@ -225,6 +229,22 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
this.presettle = presettle;
}
+ public boolean isConnectionConsumer() {
+ return connectionConsumer;
+ }
+
+ public void setConnectionConsumer(boolean connectionConsumer) {
+ this.connectionConsumer = connectionConsumer;
+ }
+
+ public int getMaxMessages() {
+ return maxMessages;
+ }
+
+ public void setMaxMessages(int maxMessages) {
+ this.maxMessages = maxMessages;
+ }
+
@Override
public String toString() {
return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
index efc49de..2834c60 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsPrefetchPolicy.java
@@ -36,7 +36,7 @@ public interface JmsPrefetchPolicy {
* Returns the prefetch value to use when creating a MessageConsumer instance.
*
* @param session
- * the Session that own the MessageConsumer being created.
+ * the Session that own the MessageConsumer being created. (null for a ConnectionConsumer).
* @param destination
* the Destination that the consumer will be subscribed to.
* @param durable
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index c93107b..0c43e43 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -177,6 +177,15 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
}
/**
+ * Retrieves the AmqpConnectionSession owned by this AmqpConnection.
+ *
+ * @return the AmqpConnectionSession owned by this AmqpConnection.
+ */
+ public AmqpConnectionSession getConnectionSession() {
+ return connectionSession;
+ }
+
+ /**
* @return true if anonymous producers should be cached or closed on send complete.
*/
public boolean isAnonymousProducerCache() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
index 78ca63a..1ffca30 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionSession.java
@@ -84,11 +84,27 @@ public class AmqpConnectionSession extends AmqpSession {
}
@Override
+ public void addChildResource(AmqpResource resource) {
+ // When a Connection Consumer is created the Connection is doing so
+ // without a known session to associate it with, we link up the consumer
+ // to this session by adding this session as the provider hint on the
+ // consumer's parent session ID.
+ if (resource instanceof AmqpConsumer) {
+ AmqpConsumer consumer = (AmqpConsumer) resource;
+ consumer.getConsumerId().getParentId().setProviderHint(this);
+ }
+
+ super.addChildResource(resource);
+ }
+
+ @Override
public void handleResourceClosure(AmqpProvider provider, Throwable cause) {
List<AsyncResult> pending = new ArrayList<>(pendingUnsubs.values());
for (AsyncResult unsubscribeRequest : pending) {
unsubscribeRequest.onFailure(cause);
}
+
+ super.handleResourceClosure(provider, cause);
}
private static final class DurableSubscriptionReattach extends AmqpAbstractResource<JmsSessionInfo, Receiver> {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 88deb58..1011312 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -340,7 +340,14 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
@Override
public void processConsumerInfo(JmsConsumerInfo consumerInfo) throws Exception {
- AmqpSession session = connection.getSession(consumerInfo.getParentId());
+ final AmqpSession session;
+
+ if (consumerInfo.isConnectionConsumer()) {
+ session = connection.getConnectionSession();
+ } else {
+ session = connection.getSession(consumerInfo.getParentId());
+ }
+
session.createConsumer(consumerInfo, request);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 9cf74c4..7577338 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -305,42 +305,4 @@ public class JmsConnectionTest {
int minor = metaData.getProviderMinorVersion();
assertTrue("Expected non-zero provider major(" + major + ") / minor(" + minor +") version.", (major + minor) != 0);
}
-
- //----- Currently these are unimplemented, these will fail after that ----//
-
- @Test(timeout=30000, expected=JMSException.class)
- public void testCreateConnectionConsumer() throws Exception {
- connection = new JmsConnection(connectionInfo, provider);
- connection.createConnectionConsumer((JmsDestination) new JmsTopic(), "", null, 1);
- }
-
- @Test(timeout=30000, expected=JMSException.class)
- public void testCreateConnectionTopicConsumer() throws Exception {
- connection = new JmsConnection(connectionInfo, provider);
- connection.createConnectionConsumer(new JmsTopic(), "", null, 1);
- }
-
- @Test(timeout=30000, expected=JMSException.class)
- public void testCreateConnectionQueueConsumer() throws Exception {
- connection = new JmsConnection(connectionInfo, provider);
- connection.createConnectionConsumer(new JmsQueue(), "", null, 1);
- }
-
- @Test(timeout=30000, expected=JMSException.class)
- public void testCreateDurableConnectionQueueConsumer() throws Exception {
- connection = new JmsConnection(connectionInfo, provider);
- connection.createDurableConnectionConsumer(new JmsTopic(), "", "", null, 1);
- }
-
- @Test(timeout=30000, expected=JMSException.class)
- public void testCreateSharedConnectionConsumer() throws Exception {
- connection = new JmsConnection(connectionInfo, provider);
- connection.createSharedConnectionConsumer(new JmsTopic(), "id", "", null, 1);
- }
-
- @Test(timeout=30000, expected=JMSException.class)
- public void testCreateSharedDurableConnectionConsumer() throws Exception {
- connection = new JmsConnection(connectionInfo, provider);
- connection.createSharedDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
index e12592e..d33463d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsSessionTest.java
@@ -318,11 +318,6 @@ public class JmsSessionTest extends JmsConnectionTestSupport {
public void testSessionRunFailsWhenSessionIsClosed() throws Exception {
JmsSession session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- try {
- session.run();
- fail("Not implemented");
- } catch (UnsupportedOperationException usoe) {}
-
session.close();
try {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
new file mode 100644
index 0000000..934b385
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionConsumerIntegrationTest.java
@@ -0,0 +1,540 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.ServerSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.JmsConnection;
+import org.apache.qpid.jms.JmsDefaultConnectionListener;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test for expected behaviors of JMS Connection Consumer implementation.
+ */
+public class ConnectionConsumerIntegrationTest extends QpidJmsTestCase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectionConsumerIntegrationTest.class);
+
+ private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+ @Test(timeout = 20000)
+ public void testCreateConnectionConsumer() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool();
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ // No additional Begin calls as there's no Session created for a Connection Consumer
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testConnectionConsumerDispatchesToSessionConnectionSratedBeforeCreate() throws Exception {
+ doTestConnectionConsumerDispatchesToSession(true);
+ }
+
+ @Test(timeout = 20000)
+ public void testConnectionConsumerDispatchesToSessionConnectionSratedAfterCreate() throws Exception {
+ doTestConnectionConsumerDispatchesToSession(false);
+ }
+
+ private void doTestConnectionConsumerDispatchesToSession(boolean startBeforeCreate) throws Exception {
+ final CountDownLatch messageArrived = new CountDownLatch(1);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ if (startBeforeCreate) {
+ connection.start();
+ }
+
+ testPeer.expectBegin();
+
+ // Create a session for our ServerSessionPool to use
+ Session session = connection.createSession();
+ session.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ messageArrived.countDown();
+ }
+ });
+ JmsServerSession serverSession = new JmsServerSession(session);
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ if (!startBeforeCreate) {
+ connection.start();
+ }
+
+ assertTrue("Message didn't arrive in time", messageArrived.await(10, TimeUnit.SECONDS));
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testNonStartedConnectionConsumerDoesNotDispatch() throws Exception {
+ final CountDownLatch messageArrived = new CountDownLatch(1);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ testPeer.expectBegin();
+
+ // Create a session for our ServerSessionPool to use
+ Session session = connection.createSession();
+ session.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ messageArrived.countDown();
+ }
+ });
+ JmsServerSession serverSession = new JmsServerSession(session);
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ assertFalse("Message Arrived unexpectedly", messageArrived.await(500, TimeUnit.MILLISECONDS));
+
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectDispositionThatIsReleasedAndSettled();
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testQueuedMessagesAreDrainedToServerSession() throws Exception {
+ final int MESSAGE_COUNT = 10;
+ final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT);
+ final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ messagesDispatched.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+
+ // Create a session for our ServerSessionPool to use
+ Session session = connection.createSession();
+ session.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ messagesArrived.countDown();
+ }
+ });
+
+ JmsServerSession serverSession = new JmsServerSession(session);
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool(serverSession);
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT);
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ }
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS));
+ assertEquals(MESSAGE_COUNT, messagesArrived.getCount());
+
+ connection.start();
+
+ assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testConsumerRecoversAfterSessionPoolReturnsNullSession() throws Exception {
+ final int MESSAGE_COUNT = 10;
+ final CountDownLatch messagesDispatched = new CountDownLatch(MESSAGE_COUNT);
+ final CountDownLatch messagesArrived = new CountDownLatch(MESSAGE_COUNT);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+
+ @Override
+ public void onInboundMessage(JmsInboundMessageDispatch envelope) {
+ messagesDispatched.countDown();
+ }
+ });
+
+ testPeer.expectBegin();
+
+ // Create a session for our ServerSessionPool to use
+ Session session = connection.createSession();
+ session.setMessageListener(new MessageListener() {
+
+ @Override
+ public void onMessage(Message message) {
+ messagesArrived.countDown();
+ }
+ });
+
+ JmsServerSession serverSession = new JmsServerSession(session);
+ JmsServerSessionPoolFirstAttemptGetsNull sessionPool = new JmsServerSessionPoolFirstAttemptGetsNull(serverSession);
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, MESSAGE_COUNT);
+
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ testPeer.expectDispositionThatIsAcceptedAndSettled();
+ }
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ assertTrue("Message didn't arrive in time", messagesDispatched.await(10, TimeUnit.SECONDS));
+ assertEquals(MESSAGE_COUNT, messagesArrived.getCount());
+
+ connection.start();
+
+ assertTrue("Message didn't arrive in time", messagesArrived.await(10, TimeUnit.SECONDS));
+
+ testPeer.expectDetach(true, true, true);
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testRemotelyCloseConnectionConsumer() throws Exception {
+ final String BREAD_CRUMB = "ErrorMessage";
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final CountDownLatch connectionError = new CountDownLatch(1);
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool();
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ connectionError.countDown();
+ }
+ });
+
+ // Create a consumer, then remotely end it afterwards.
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlow();
+ testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, BREAD_CRUMB);
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ // Verify the consumer gets marked closed
+ testPeer.waitForAllHandlersToComplete(1000);
+ assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ try {
+ consumer.getServerSessionPool();
+ } catch (IllegalStateException jmsise) {
+ LOG.debug("Error reported from consumer.getServerSessionPool()", jmsise);
+ if (jmsise.getCause() != null) {
+ String message = jmsise.getCause().getMessage();
+ return message.contains(AmqpError.RESOURCE_DELETED.toString()) &&
+ message.contains(BREAD_CRUMB);
+ } else {
+ return false;
+ }
+ }
+ return false;
+ }
+ }, 10000, 10));
+
+ assertTrue("Consumer closed callback didn't trigger", connectionError.await(5, TimeUnit.SECONDS));
+
+ // Try closing it explicitly, should effectively no-op in client.
+ // The test peer will throw during close if it sends anything.
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testOnExceptionFiredOnSessionPoolFailure() throws Exception {
+ final CountDownLatch exceptionFired = new CountDownLatch(1);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ exceptionFired.countDown();
+ }
+ });
+
+ connection.start();
+
+ JmsFailingServerSessionPool sessionPool = new JmsFailingServerSessionPool();
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
+
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectDispositionThatIsReleasedAndSettled();
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testOnExceptionFiredOnServerSessionFailure() throws Exception {
+ final CountDownLatch exceptionFired = new CountDownLatch(1);
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ Connection connection = testFixture.establishConnecton(testPeer);
+ connection.setExceptionListener(new ExceptionListener() {
+
+ @Override
+ public void onException(JMSException exception) {
+ exceptionFired.countDown();
+ }
+ });
+
+ connection.start();
+
+ JmsServerSessionPool sessionPool = new JmsServerSessionPool(new JmsFailingServerSession());
+
+ // Now the Connection consumer arrives and we give it a message
+ // to be dispatched to the server session.
+ DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+
+ Queue queue = new JmsQueue("myQueue");
+ ConnectionConsumer consumer = connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ assertTrue("Exception should have been fired", exceptionFired.await(5, TimeUnit.SECONDS));
+
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectDispositionThatIsReleasedAndSettled();
+ consumer.close();
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ //----- Internal ServerSessionPool ---------------------------------------//
+
+ private class JmsFailingServerSessionPool implements ServerSessionPool {
+
+ public JmsFailingServerSessionPool() {
+ }
+
+ @Override
+ public ServerSession getServerSession() throws JMSException {
+ throw new JMSException("Something is wrong with me");
+ }
+ }
+
+ private class JmsServerSessionPool implements ServerSessionPool {
+
+ private JmsServerSession serverSession;
+
+ public JmsServerSessionPool() {
+ this.serverSession = new JmsServerSession();
+ }
+
+ public JmsServerSessionPool(JmsServerSession serverSession) {
+ this.serverSession = serverSession;
+ }
+
+ @Override
+ public ServerSession getServerSession() throws JMSException {
+ return serverSession;
+ }
+ }
+
+ private class JmsServerSessionPoolFirstAttemptGetsNull implements ServerSessionPool {
+
+ private volatile boolean firstAttempt = true;
+ private JmsServerSession serverSession;
+
+ public JmsServerSessionPoolFirstAttemptGetsNull(JmsServerSession serverSession) {
+ this.serverSession = serverSession;
+ }
+
+ @Override
+ public ServerSession getServerSession() throws JMSException {
+ if (firstAttempt) {
+ firstAttempt = false;
+ return null;
+ } else {
+ return serverSession;
+ }
+ }
+ }
+
+ private class JmsServerSession implements ServerSession {
+
+ private final Session session;
+ private final ExecutorService runner = Executors.newSingleThreadExecutor();
+
+ public JmsServerSession() {
+ this.session = null;
+ }
+
+ public JmsServerSession(Session session) {
+ this.session = session;
+ }
+
+ @Override
+ public Session getSession() throws JMSException {
+ return session;
+ }
+
+ @Override
+ public void start() throws JMSException {
+ runner.execute(() -> {
+ session.run();
+ });
+ }
+ }
+
+ private class JmsFailingServerSession extends JmsServerSession {
+
+ public JmsFailingServerSession() {
+ }
+
+ @Override
+ public Session getSession() throws JMSException {
+ throw new JMSException("Something is wrong with me");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/f0042a6b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 8174eea..6952c3d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -43,6 +43,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
+import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
@@ -52,6 +53,7 @@ import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsDefaultConnectionListener;
import org.apache.qpid.jms.JmsOperationTimedOutException;
+import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsSendTimedOutException;
import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy;
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -71,6 +73,7 @@ import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.junit.Test;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1551,6 +1554,74 @@ public class FailoverIntegrationTest extends QpidJmsTestCase {
}
}
+ @Test(timeout = 20000)
+ public void testConnectionConsumerRecreatedAfterReconnect() throws Exception {
+ try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+ TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+ ServerSessionPool sessionPool = Mockito.mock(ServerSessionPool.class);
+
+ final CountDownLatch originalConnected = new CountDownLatch(1);
+ final CountDownLatch finalConnected = new CountDownLatch(1);
+
+ // Create a peer to connect to, then one to reconnect to
+ final String originalURI = createPeerURI(originalPeer);
+ final String finalURI = createPeerURI(finalPeer);
+
+ LOG.info("Original peer is at: {}", originalURI);
+ LOG.info("Final peer is at: {}", finalURI);
+
+ // Connect to the first peer
+ originalPeer.expectSaslAnonymous();
+ originalPeer.expectOpen();
+ originalPeer.expectBegin();
+ originalPeer.expectReceiverAttach();
+ originalPeer.expectLinkFlow();
+ originalPeer.dropAfterLastHandler();
+
+ final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer);
+ connection.addConnectionListener(new JmsDefaultConnectionListener() {
+ @Override
+ public void onConnectionEstablished(URI remoteURI) {
+ LOG.info("Connection Established: {}", remoteURI);
+ if (originalURI.equals(remoteURI.toString())) {
+ originalConnected.countDown();
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ LOG.info("Connection Restored: {}", remoteURI);
+ if (finalURI.equals(remoteURI.toString())) {
+ finalConnected.countDown();
+ }
+ }
+ });
+ connection.start();
+
+ Queue queue = new JmsQueue("myQueue");
+ connection.createConnectionConsumer(queue, null, sessionPool, 100);
+
+ assertTrue("Should connect to original peer", originalConnected.await(5, TimeUnit.SECONDS));
+
+ // --- Post Failover Expectations of FinalPeer --- //
+
+ finalPeer.expectSaslAnonymous();
+ finalPeer.expectOpen();
+ finalPeer.expectBegin();
+ finalPeer.expectReceiverAttach();
+ finalPeer.expectLinkFlow();
+ finalPeer.expectClose();
+
+ assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS));
+
+ // Shut it down
+ connection.close();
+
+ finalPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
private JmsConnection establishAnonymousConnecton(TestAmqpPeer... peers) throws JMSException {
return establishAnonymousConnecton(null, null, peers);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org