You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2006/01/05 21:39:47 UTC
svn commit: r366274 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/broker/region/policy/
test/java/org/apache/activemq/broker/
Author: chirino
Date: Thu Jan 5 12:39:41 2006
New Revision: 366274
URL: http://svn.apache.org/viewcvs?rev=366274&view=rev
Log:
We now optimize out persisting persistent messages to a topic if it does not have any durable consumers.
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=366274&r1=366273&r2=366274&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Thu Jan 5 12:39:41 2006
@@ -120,7 +120,7 @@
recovered=true;
for (Iterator iter = destinations.iterator(); iter.hasNext();) {
Topic topic = (Topic) iter.next();
- topic.recover(this, false);
+ topic.recover(context, this, false);
}
} else {
if( !isFull() ) {
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=366274&r1=366273&r2=366274&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Thu Jan 5 12:39:41 2006
@@ -21,9 +21,9 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
-import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.DispatchPolicy;
-import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.broker.region.policy.SimpleDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQDestination;
@@ -43,6 +43,7 @@
import org.apache.activemq.util.SubscriptionKey;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* The Topic is a destination that sends a copy of a message to every active
@@ -60,10 +61,11 @@
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
- private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new LastImageSubscriptionRecoveryPolicy();
+ private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy = new FixedCountSubscriptionRecoveryPolicy();
private boolean sendAdvisoryIfNoConsumers;
private DeadLetterStrategy deadLetterStrategy = new SharedDeadLetterStrategy();
-
+ private AtomicInteger durableSubscriberCounter = new AtomicInteger();
+
public Topic(ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
@@ -90,19 +92,54 @@
destinationStatistics.getConsumers().increment();
sub.add(context, this);
if (sub.getConsumerInfo().isDurable()) {
- recover((DurableTopicSubscription) sub, true);
+ recover(context, (DurableTopicSubscription) sub, true);
}
else {
- if (sub.getConsumerInfo().isRetroactive()) {
+ recover(context, sub);
+ }
+ }
+
+ /**
+ * Used to recover the message list non durable subscriptions. Recovery only happens if the consumer is
+ * retroactive.
+ *
+ * @param context
+ * @param sub
+ * @throws Throwable
+ */
+ private void recover(ConnectionContext context, final Subscription sub) throws Throwable {
+ if (sub.getConsumerInfo().isRetroactive()) {
+
+ // synchronize with dispatch method so that no new messages are sent
+ // while we are recovering a subscription to avoid out of order messages.
+ dispatchValve.turnOff();
+ try {
+
+ synchronized(consumers) {
+ consumers.add(sub);
+ }
subscriptionRecoveryPolicy.recover(context, this, sub);
+
+ } finally {
+ dispatchValve.turnOn();
}
+
+ } else {
synchronized(consumers) {
consumers.add(sub);
}
}
}
- public void recover(final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
+ /**
+ * Used to recover the message list for a durable subscription.
+ *
+ * @param context
+ * @param sub
+ * @param initialActivation
+ * @throws Throwable
+ */
+ public void recover(ConnectionContext context, final DurableTopicSubscription sub, boolean initialActivation) throws Throwable {
// synchronize with dispatch method so that no new messages are sent
// while
@@ -110,9 +147,11 @@
dispatchValve.turnOff();
try {
+ boolean persistenceWasOptimized = canOptimizeOutPersistence();
if (initialActivation) {
- synchronized(consumers) {
+ synchronized(consumers) {
consumers.add(sub);
+ durableSubscriberCounter.incrementAndGet();
}
}
@@ -160,6 +199,16 @@
throw new RuntimeException("Should not be called.");
}
});
+
+ if( initialActivation && sub.getConsumerInfo().isRetroactive() ) {
+ // Then use the subscriptionRecoveryPolicy since there will not be any messages in the persistent store.
+ if( persistenceWasOptimized ) {
+ subscriptionRecoveryPolicy.recover(context, this, sub);
+ } else {
+ // TODO: implement something like
+ // subscriptionRecoveryPolicy.recoverNonPersistent(context, this, sub);
+ }
+ }
}
}
@@ -173,6 +222,9 @@
destinationStatistics.getConsumers().decrement();
synchronized(consumers) {
consumers.remove(sub);
+ if( sub.getConsumerInfo().isDurable() ) {
+ durableSubscriberCounter.decrementAndGet();
+ }
}
sub.remove(context, this);
}
@@ -184,7 +236,7 @@
message.setRegionDestination(this);
- if (store != null && message.isPersistent())
+ if (store != null && message.isPersistent() && !canOptimizeOutPersistence() )
store.addMessage(context, message);
message.incrementReferenceCount();
@@ -207,6 +259,10 @@
message.decrementReferenceCount();
}
+ }
+
+ private boolean canOptimizeOutPersistence() {
+ return durableSubscriberCounter.get()==0;
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java?rev=366274&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/FixedCountSubscriptionRecoveryPolicy.java Thu Jan 5 12:39:41 2006
@@ -0,0 +1,96 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.policy;
+
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.Subscription;
+import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.filter.MessageEvaluationContext;
+
+/**
+ * This implementation of {@link SubscriptionRecoveryPolicy} will only keep
+ * the last message.
+ *
+ * @org.xbean.XBean
+ *
+ * @version $Revision$
+ */
+public class FixedCountSubscriptionRecoveryPolicy implements SubscriptionRecoveryPolicy {
+
+ volatile private MessageReference messages[];
+ private int maximumSize=100;
+ private int tail=0;
+
+ synchronized public boolean add(ConnectionContext context, MessageReference node) throws Throwable {
+ messages[tail++] = node;
+ if( tail >= messages.length )
+ tail = 0;
+ return true;
+ }
+
+ synchronized public void recover(ConnectionContext context, Topic topic, Subscription sub) throws Throwable {
+ // Re-dispatch the last message seen.
+ int t = tail;
+ // The buffer may not have rolled over yet..., start from the front
+ if( messages[t]==null )
+ t=0;
+ // Well the buffer is really empty then.
+ if( messages[t]==null )
+ return;
+
+ // Keep dispatching until t hit's tail again.
+ MessageEvaluationContext msgContext = context.getMessageEvaluationContext();
+ do {
+ MessageReference node = messages[t];
+ try {
+ msgContext.setDestination(node.getRegionDestination().getActiveMQDestination());
+ msgContext.setMessageReference(node);
+ if (sub.matches(node, msgContext)) {
+ sub.add(node);
+ }
+ } finally {
+ msgContext.clear();
+ }
+ t++;
+ if( t >= messages.length )
+ t = 0;
+ } while( t!=tail );
+
+ }
+
+ public void start() throws Exception {
+ messages = new MessageReference[maximumSize];
+ }
+
+ public void stop() throws Exception {
+ messages = null;
+ }
+
+ public int getMaximumSize() {
+ return maximumSize;
+ }
+
+ /**
+ * Sets the maximum number of messages that this destination will hold around in RAM
+ */
+ public void setMaximumSize(int maximumSize) {
+ this.maximumSize = maximumSize;
+ }
+
+
+}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java?rev=366274&r1=366273&r2=366274&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/BrokerTest.java Thu Jan 5 12:39:41 2006
@@ -518,27 +518,19 @@
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
connection1.send(createMessage(producerInfo1, destination, deliveryMode));
+
+ // the behavior is VERY dependent on the recovery policy used.
+ // But the default broker settings try to make it as consistent as possible
- if( deliveryMode == DeliveryMode.NON_PERSISTENT && durableConsumer ) {
- // Durable subs don't keep non persistent messages around!
- for( int i=0; i < 2 ; i++ ) {
- Message m2 = receiveMessage(connection1);
- assertNotNull(m2);
- }
-
- } else {
-
- // Subscription should see all messages sent.
- Message m2 = receiveMessage(connection1);
+ // Subscription should see all messages sent.
+ Message m2 = receiveMessage(connection1);
+ assertNotNull(m2);
+ assertEquals(m.getMessageId(), m2.getMessageId());
+ for( int i=0; i < 2 ; i++ ) {
+ m2 = receiveMessage(connection1);
assertNotNull(m2);
- assertEquals(m.getMessageId(), m2.getMessageId());
- for( int i=0; i < 2 ; i++ ) {
- m2 = receiveMessage(connection1);
- assertNotNull(m2);
- }
-
}
-
+
assertNoMessagesLeft(connection1);
}