You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/02/23 22:45:22 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5229 -
implement ability to pause/resume dispatch of message to all consumers of a
queue
Repository: activemq
Updated Branches:
refs/heads/master 1406d40ac -> 85b9c81a3
https://issues.apache.org/jira/browse/AMQ-5229 - implement ability to pause/resume dispatch of message to all consumers of a queue
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/85b9c81a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/85b9c81a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/85b9c81a
Branch: refs/heads/master
Commit: 85b9c81a3f2431b8272c19acf4e4b1cddeb25c5e
Parents: 1406d40
Author: gtully <ga...@gmail.com>
Authored: Mon Feb 23 21:41:10 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Feb 23 21:41:33 2015 +0000
----------------------------------------------------------------------
.../apache/activemq/broker/jmx/QueueView.java | 18 ++++++
.../activemq/broker/jmx/QueueViewMBean.java | 11 ++++
.../apache/activemq/broker/region/Queue.java | 14 ++++
.../broker/region/QueueDispatchSelector.java | 19 ++++--
.../apache/activemq/broker/jmx/MBeanTest.java | 67 +++++++++++++++++++-
5 files changed, 123 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
index 76d82a3..076b4fc 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java
@@ -226,4 +226,22 @@ public class QueueView extends DestinationView implements QueueViewMBean {
Queue queue = (Queue) destination;
queue.getMessageGroupOwners().removeAll();
}
+
+ @Override
+ public void pause() {
+ Queue queue = (Queue) destination;
+ queue.pauseDispatch();
+ }
+
+ @Override
+ public void resume() {
+ Queue queue = (Queue) destination;
+ queue.resumeDispatch();
+ }
+
+ @Override
+ public boolean isPaused() {
+ Queue queue = (Queue) destination;
+ return queue.isDispatchPaused();
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
index 3f99162..27ef61c 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java
@@ -209,4 +209,15 @@ public interface QueueViewMBean extends DestinationViewMBean {
*/
@MBeanInfo("emove all the message groups - will rebalance all message groups across consumers")
void removeAllMessageGroups();
+
+ @MBeanInfo("pause dispatch to consumers")
+ void pause();
+
+ @MBeanInfo("resume dispatch to consumers if paused")
+ void resume();
+
+ @MBeanInfo("Dispatch to consumers is paused")
+ boolean isPaused();
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c0a237f..67b9119 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -147,6 +147,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
private final Object iteratingMutex = new Object();
+
+
class TimeoutMessage implements Delayed {
Message message;
@@ -1649,6 +1651,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
}
}
+ public void pauseDispatch() {
+ dispatchSelector.pause();
+ }
+
+ public void resumeDispatch() {
+ dispatchSelector.resume();
+ }
+
+ public boolean isDispatchPaused() {
+ return dispatchSelector.isPaused();
+ }
+
protected MessageReferenceFilter createMessageIdFilter(final String messageId) {
return new MessageReferenceFilter() {
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
index c73d960..56f6076 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
@@ -30,8 +30,8 @@ import org.slf4j.LoggerFactory;
public class QueueDispatchSelector extends SimpleDispatchSelector {
private static final Logger LOG = LoggerFactory.getLogger(QueueDispatchSelector.class);
private Subscription exclusiveConsumer;
-
-
+ private boolean paused;
+
/**
* @param destination
*/
@@ -54,11 +54,22 @@ public class QueueDispatchSelector extends SimpleDispatchSelector {
public boolean canSelect(Subscription subscription,
MessageReference m) throws Exception {
- boolean result = super.canDispatch(subscription, m);
+ boolean result = !paused && super.canDispatch(subscription, m);
if (result && !subscription.isBrowser()) {
result = exclusiveConsumer == null || exclusiveConsumer == subscription;
}
return result;
}
-
+
+ public void pause() {
+ paused = true;
+ }
+
+ public void resume() {
+ paused = false;
+ }
+
+ public boolean isPaused() {
+ return paused;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/85b9c81a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index e2b0c51..95b7ff0 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -818,13 +818,13 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
return answer;
}
- protected void useConnection(Connection connection) throws Exception {
+ protected void useConnection(Connection connection, int numToSend) throws Exception {
connection.setClientID(clientID);
connection.start();
Session session = connection.createSession(transacted, authMode);
destination = createDestination();
MessageProducer producer = session.createProducer(destination);
- for (int i = 0; i < MESSAGE_COUNT; i++) {
+ for (int i = 0; i < numToSend; i++) {
Message message = session.createTextMessage("Message: " + i);
message.setIntProperty("counter", i);
message.setJMSCorrelationID("MyCorrelationID");
@@ -836,6 +836,10 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
Thread.sleep(1000);
}
+ protected void useConnection(Connection connection) throws Exception {
+ useConnection(connection, MESSAGE_COUNT);
+ }
+
protected void useConnectionWithBlobMessage(Connection connection) throws Exception {
connection.setClientID(clientID);
connection.start();
@@ -1505,4 +1509,63 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals(mbeans.size(), 1);
sub.close();
}
+
+ public void testQueuePauseResume() throws Exception {
+ connection = connectionFactory.createConnection();
+ final int numToSend = 20;
+ useConnection(connection, numToSend);
+ ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString());
+
+ QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
+
+ CompositeData[] compdatalist = queue.browse();
+ int initialQueueSize = compdatalist.length;
+ assertEquals("expected", numToSend, initialQueueSize);
+
+
+ echo("Attempting to consume 5 bytes messages from: " + destination);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(destination);
+ for (int i=0; i<5; i++) {
+ assertNotNull("Message: " + i, consumer.receive(5000));
+ }
+ consumer.close();
+ session.close();
+
+ compdatalist = queue.browse();
+ assertEquals("expected", numToSend -5, compdatalist.length);
+
+ echo("pause");
+ queue.pause();
+
+ assertTrue("queue is paused", queue.isPaused());
+
+ // verify no consume while paused
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createConsumer(destination);
+ assertNull("cannot get message while paused", consumer.receive(2000));
+ consumer.close();
+ session.close();
+ connection.close();
+
+ // verify send while paused
+ connection = connectionFactory.createConnection();
+ useConnection(connection, numToSend);
+
+ // verify browse
+ compdatalist = queue.browse();
+ assertEquals("expected browse", (2*numToSend)-5, compdatalist.length);
+ assertEquals("expected message count", compdatalist.length, queue.getQueueSize());
+
+ echo("resume");
+ queue.resume();
+
+ assertFalse("queue is not paused", queue.isPaused());
+
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ consumer = session.createConsumer(destination);
+ for (int i = 0; i < compdatalist.length; i++) {
+ assertNotNull("Message: " + i, consumer.receive(5000));
+ }
+ }
}