You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2009/02/18 10:44:31 UTC
svn commit: r745456 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
test/java/org/apache/activemq/broker/
Author: gtully
Date: Wed Feb 18 09:44:31 2009
New Revision: 745456
URL: http://svn.apache.org/viewvc?rev=745456&view=rev
Log:
little refactor of recovery dispatch as now only used for browser dispatch
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Wed Feb 18 09:44:31 2009
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -214,12 +215,37 @@
}
}
- class RecoveryDispatch {
- public ArrayList<QueueMessageReference> messages;
- public Subscription subscription;
+ /*
+ * Holder for subscription and pagedInMessages as a browser
+ * needs access to existing messages in the queue that have
+ * already been dispatched
+ */
+ class BrowserDispatch {
+ ArrayList<QueueMessageReference> messages;
+ QueueBrowserSubscription browser;
+
+ public BrowserDispatch(QueueBrowserSubscription browserSubscription,
+ Collection<QueueMessageReference> values) {
+
+ messages = new ArrayList<QueueMessageReference>(values);
+ browser = browserSubscription;
+ browser.incrementQueueRef();
+ }
+
+ void done() {
+ try {
+ browser.decrementQueueRef();
+ } catch (Exception e) {
+ LOG.warn("decrement ref on browser: " + browser, e);
+ }
+ }
+
+ public QueueBrowserSubscription getBrowser() {
+ return browser;
+ }
}
- LinkedList<RecoveryDispatch> recoveries = new LinkedList<RecoveryDispatch>();
+ LinkedList<BrowserDispatch> browserDispatches = new LinkedList<BrowserDispatch>();
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception {
// synchronize with dispatch method so that no new messages are sent
@@ -257,19 +283,18 @@
}
}
- // do recovery dispatch only if it is a browser subscription
- if(sub instanceof QueueBrowserSubscription ) {
- // any newly paged in messages that are not dispatched are added to pagedInPending in iterate()
- doPageIn(false);
+ if (sub instanceof QueueBrowserSubscription ) {
+ QueueBrowserSubscription browserSubscription = (QueueBrowserSubscription) sub;
+
+ // do again in iterate to ensure new messages are dispatched
+ doPageIn(false);
synchronized (pagedInMessages) {
- RecoveryDispatch rd = new RecoveryDispatch();
- rd.messages = new ArrayList<QueueMessageReference>(pagedInMessages.values());
- rd.subscription = sub;
- recoveries.addLast(rd);
+ if (!pagedInMessages.isEmpty()) {
+ BrowserDispatch browserDispatch = new BrowserDispatch(browserSubscription, pagedInMessages.values());
+ browserDispatches.addLast(browserDispatch);
+ }
}
-
- ((QueueBrowserSubscription)sub).incrementQueueRef();
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
@@ -971,64 +996,45 @@
return movedCounter;
}
- RecoveryDispatch getNextRecoveryDispatch() {
+ BrowserDispatch getNextBrowserDispatch() {
synchronized (pagedInMessages) {
- if( recoveries.isEmpty() ) {
+ if( browserDispatches.isEmpty() ) {
return null;
}
- return recoveries.removeFirst();
+ return browserDispatches.removeFirst();
}
}
- protected boolean isRecoveryDispatchEmpty() {
- synchronized (pagedInMessages) {
- return recoveries.isEmpty();
- }
- }
/**
* @return true if we would like to iterate again
* @see org.apache.activemq.thread.Task#iterate()
*/
public boolean iterate() {
+ boolean pageInMoreMessages = false;
synchronized(iteratingMutex) {
- RecoveryDispatch rd;
- while ((rd = getNextRecoveryDispatch()) != null) {
+ BrowserDispatch rd;
+ while ((rd = getNextBrowserDispatch()) != null) {
+ pageInMoreMessages = true;
+
try {
MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext();
msgContext.setDestination(destination);
+ QueueBrowserSubscription browser = rd.getBrowser();
for (QueueMessageReference node : rd.messages) {
- if (!node.isDropped() && !node.isAcked() && (!node.isDropped() || rd.subscription.getConsumerInfo().isBrowser())) {
+ if (!node.isAcked()) {
msgContext.setMessageReference(node);
- if (rd.subscription.matches(node, msgContext)) {
- // Log showing message dispatching
- if (LOG.isDebugEnabled()) {
- LOG.debug(destination.getQualifiedName() + " - Recovery - Message pushed '" + node.hashCode() + " - " + node + "' to subscription: '" + rd.subscription + "'");
- }
- rd.subscription.add(node);
- } else {
- // make sure it gets queued for dispatched again
- dispatchLock.lock();
- try {
- synchronized(pagedInPendingDispatch) {
- if (!pagedInPendingDispatch.contains(node)) {
- pagedInPendingDispatch.add(node);
- }
- }
- } finally {
- dispatchLock.unlock();
- }
+ if (browser.matches(node, msgContext)) {
+ browser.add(node);
}
}
}
-
- if( rd.subscription instanceof QueueBrowserSubscription ) {
- ((QueueBrowserSubscription)rd.subscription).decrementQueueRef();
- }
-
+
+ rd.done();
+
} catch (Exception e) {
- e.printStackTrace();
+ LOG.warn("exception on dispatch to browser: " + rd.getBrowser(), e);
}
}
@@ -1061,7 +1067,6 @@
}
}
- boolean pageInMoreMessages = false;
synchronized (messages) {
pageInMoreMessages = !messages.isEmpty();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueDispatchSelector.java Wed Feb 18 09:44:31 2009
@@ -62,12 +62,9 @@
public boolean canSelect(Subscription subscription,
MessageReference m) throws Exception {
- if (subscription.isBrowser() && super.canDispatch(subscription, m)) {
- return true;
- }
- boolean result = super.canDispatch(subscription, m) ;
- if (result) {
+ boolean result = super.canDispatch(subscription, m);
+ if (result && !subscription.isBrowser()) {
result = exclusiveConsumer == null
|| exclusiveConsumer == subscription;
if (result) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueue.java Wed Feb 18 09:44:31 2009
@@ -72,27 +72,5 @@
LOG.debug(" changed ownership of " + this + " to "+ tempDest.getConnectionId());
}
super.addSubscription(context, sub);
- }
-
- public void xwakeup() {
- boolean result = false;
- synchronized (messages) {
- result = !messages.isEmpty();
- }
- if (result) {
- try {
- pageInMessages(false);
-
- } catch (Throwable e) {
- LOG.error("Failed to page in more queue messages ", e);
- }
- }
- if (!messagesWaitingForSpace.isEmpty() || !isRecoveryDispatchEmpty()) {
- try {
- taskRunner.wakeup();
- } catch (InterruptedException e) {
- LOG.warn("Task Runner failed to wakeup ", e);
- }
- }
}
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?rev=745456&r1=745455&r2=745456&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Wed Feb 18 09:44:31 2009
@@ -161,6 +161,124 @@
assertNoMessagesLeft(connection2);
}
+
+ /*
+ * change the order of the above test
+ */
+ public void testQueueBrowserWith2ConsumersBrowseFirst() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+ deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+
+ // Setup a second connection with a queue browser.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+ consumerInfo2.setPrefetchSize(10);
+ consumerInfo2.setBrowser(true);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(10);
+ connection1.request(consumerInfo1);
+
+ // Send the messages
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ //as the messages are sent async - need to synchronize the last
+ //one to ensure they arrive in the order we want
+ connection1.request(createMessage(producerInfo, destination, deliveryMode));
+
+
+ List<Message> messages = new ArrayList<Message>();
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ messages.add(m1);
+ }
+
+ // no messages present in queue browser as there were no messages when it
+ // was created
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+ public void testQueueBrowserWith2ConsumersInterleaved() throws Exception {
+
+ ActiveMQDestination destination = new ActiveMQQueue("TEST");
+ deliveryMode = DeliveryMode.NON_PERSISTENT;
+
+ // Setup a first connection
+ StubConnection connection1 = createConnection();
+ ConnectionInfo connectionInfo1 = createConnectionInfo();
+ SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+ ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+ connection1.send(connectionInfo1);
+ connection1.send(sessionInfo1);
+ connection1.send(producerInfo);
+
+ ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, destination);
+ consumerInfo1.setPrefetchSize(10);
+ connection1.request(consumerInfo1);
+
+ // Send the messages
+ connection1.request(createMessage(producerInfo, destination, deliveryMode));
+
+ // Setup a second connection with a queue browser.
+ StubConnection connection2 = createConnection();
+ ConnectionInfo connectionInfo2 = createConnectionInfo();
+ SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+ ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
+ consumerInfo2.setPrefetchSize(1);
+ consumerInfo2.setBrowser(true);
+ connection2.send(connectionInfo2);
+ connection2.send(sessionInfo2);
+ connection2.request(consumerInfo2);
+
+
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ connection1.send(createMessage(producerInfo, destination, deliveryMode));
+ //as the messages are sent async - need to synchronize the last
+ //one to ensure they arrive in the order we want
+ connection1.request(createMessage(producerInfo, destination, deliveryMode));
+
+
+ List<Message> messages = new ArrayList<Message>();
+
+ for (int i = 0; i < 4; i++) {
+ Message m1 = receiveMessage(connection1);
+ assertNotNull("m1 is null for index: " + i, m1);
+ messages.add(m1);
+ }
+
+ for (int i = 0; i < 1; i++) {
+ Message m1 = messages.get(i);
+ Message m2 = receiveMessage(connection2);
+ assertNotNull("m2 is null for index: " + i, m2);
+ assertEquals(m1.getMessageId(), m2.getMessageId());
+ connection2.send(createAck(consumerInfo2, m2, 1, MessageAck.DELIVERED_ACK_TYPE));
+ }
+
+ assertNoMessagesLeft(connection1);
+ assertNoMessagesLeft(connection2);
+ }
+
+
public void initCombosForTestConsumerPrefetchAndStandardAck() {
addCombinationValues("deliveryMode", new Object[] {
// Integer.valueOf(DeliveryMode.NON_PERSISTENT),