You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/04/27 00:51:50 UTC
svn commit: r1476433 - in
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region:
DurableTopicSubscription.java PrefetchSubscription.java Topic.java
Author: tabish
Date: Fri Apr 26 22:51:50 2013
New Revision: 1476433
URL: http://svn.apache.org/r1476433
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4351
Ensure the destination statistics are updated on durable sub deactivate.
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1476433&r1=1476432&r2=1476433&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Fri Apr 26 22:51:50 2013
@@ -19,6 +19,7 @@ package org.apache.activemq.broker.regio
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -191,6 +192,8 @@ public class DurableTopicSubscription ex
this.usageManager.getMemoryUsage().removeUsageListener(this);
ArrayList<Topic> topicsToDeactivate = new ArrayList<Topic>();
+ List<MessageReference> savedDispateched = null;
+
synchronized (pendingLock) {
pending.stop();
@@ -224,6 +227,9 @@ public class DurableTopicSubscription ex
}
}
+ if (!topicsToDeactivate.isEmpty()) {
+ savedDispateched = new ArrayList<MessageReference>(dispatched);
+ }
dispatched.clear();
}
if (!keepDurableSubsActive && pending.isTransient()) {
@@ -240,7 +246,7 @@ public class DurableTopicSubscription ex
}
}
for(Topic topic: topicsToDeactivate) {
- topic.deactivate(context, this);
+ topic.deactivate(context, this, savedDispateched);
}
prefetchExtension.set(0);
}
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1476433&r1=1476432&r2=1476433&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Fri Apr 26 22:51:50 2013
@@ -582,7 +582,7 @@ public abstract class PrefetchSubscripti
}
}
- @Override
+ @Override
public void add(ConnectionContext context, Destination destination) throws Exception {
synchronized(pendingLock) {
super.add(context, destination);
@@ -592,6 +592,10 @@ public abstract class PrefetchSubscripti
@Override
public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
+ return remove(context, destination, dispatched);
+ }
+
+ public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception {
List<MessageReference> rc = new ArrayList<MessageReference>();
synchronized(pendingLock) {
super.remove(context, destination);
@@ -600,23 +604,35 @@ public abstract class PrefetchSubscripti
// Except if each commit or rollback callback action comes before remove of subscriber.
rc.addAll(pending.remove(context, destination));
- // Synchronized to DispatchLock
- synchronized(dispatchLock) {
- ArrayList<MessageReference> references = new ArrayList<MessageReference>();
- for (MessageReference r : dispatched) {
- if( r.getRegionDestination() == destination) {
- references.add(r);
- }
+ if (dispatched == null) {
+ return rc;
+ }
+
+ // Synchronized to DispatchLock if necessary
+ if (dispatched == this.dispatched) {
+ synchronized(dispatchLock) {
+ updateDestinationStats(rc, destination, dispatched);
}
- rc.addAll(references);
- destination.getDestinationStatistics().getDispatched().subtract(references.size());
- destination.getDestinationStatistics().getInflight().subtract(references.size());
- dispatched.removeAll(references);
+ } else {
+ updateDestinationStats(rc, destination, dispatched);
}
}
return rc;
}
+ private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) {
+ ArrayList<MessageReference> references = new ArrayList<MessageReference>();
+ for (MessageReference r : dispatched) {
+ if (r.getRegionDestination() == destination) {
+ references.add(r);
+ }
+ }
+ rc.addAll(references);
+ destination.getDestinationStatistics().getDispatched().subtract(references.size());
+ destination.getDestinationStatistics().getInflight().subtract(references.size());
+ dispatched.removeAll(references);
+ }
+
protected void dispatchPending() throws IOException {
synchronized(pendingLock) {
try {
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=1476433&r1=1476432&r2=1476433&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Apr 26 22:51:50 2013
@@ -73,6 +73,7 @@ public class Topic extends BaseDestinati
private final TaskRunner taskRunner;
private final LinkedList<Runnable> messagesWaitingForSpace = new LinkedList<Runnable>();
private final Runnable sendMessagesWaitingForSpaceTask = new Runnable() {
+ @Override
public void run() {
try {
Topic.this.taskRunner.wakeup();
@@ -106,6 +107,7 @@ public class Topic extends BaseDestinati
}
}
+ @Override
public List<Subscription> getConsumers() {
synchronized (consumers) {
return new ArrayList<Subscription>(consumers);
@@ -116,6 +118,7 @@ public class Topic extends BaseDestinati
return true;
}
+ @Override
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
@@ -182,6 +185,7 @@ public class Topic extends BaseDestinati
}
}
+ @Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
if (!sub.getConsumerInfo().isDurable()) {
super.removeSubscription(context, sub, lastDeliveredSequenceId);
@@ -228,13 +232,13 @@ public class Topic extends BaseDestinati
topicStore.deleteSubscription(clientId, subscriptionName);
info = null;
synchronized (consumers) {
- consumers.remove(subscription);
+ consumers.remove(subscription);
}
} else {
synchronized (consumers) {
- if (!consumers.contains(subscription)) {
- consumers.add(subscription);
- }
+ if (!consumers.contains(subscription)) {
+ consumers.add(subscription);
+ }
}
}
}
@@ -259,6 +263,7 @@ public class Topic extends BaseDestinati
msgContext.setDestination(destination);
if (subscription.isRecoveryRequired()) {
topicStore.recoverSubscription(clientId, subscriptionName, new MessageRecoveryListener() {
+ @Override
public boolean recoverMessage(Message message) throws Exception {
message.setRegionDestination(Topic.this);
try {
@@ -272,14 +277,17 @@ public class Topic extends BaseDestinati
return true;
}
+ @Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
throw new RuntimeException("Should not be called.");
}
+ @Override
public boolean hasSpace() {
return true;
}
+ @Override
public boolean isDuplicate(MessageId id) {
return false;
}
@@ -290,11 +298,11 @@ public class Topic extends BaseDestinati
}
}
- public void deactivate(ConnectionContext context, DurableTopicSubscription sub) throws Exception {
+ public void deactivate(ConnectionContext context, DurableTopicSubscription sub, List<MessageReference> dispatched) throws Exception {
synchronized (consumers) {
consumers.remove(sub);
}
- sub.remove(context, this);
+ sub.remove(context, this, dispatched);
}
protected void recoverRetroactiveMessages(ConnectionContext context, Subscription subscription) throws Exception {
@@ -303,6 +311,7 @@ public class Topic extends BaseDestinati
}
}
+ @Override
public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
final ConnectionContext context = producerExchange.getConnectionContext();
@@ -348,6 +357,7 @@ public class Topic extends BaseDestinati
if (producerInfo.getWindowSize() > 0 || message.isResponseRequired()) {
synchronized (messagesWaitingForSpace) {
messagesWaitingForSpace.add(new Runnable() {
+ @Override
public void run() {
try {
@@ -377,7 +387,6 @@ public class Topic extends BaseDestinati
context.getConnection().dispatchAsync(response);
}
}
-
}
});
@@ -521,6 +530,7 @@ public class Topic extends BaseDestinati
return "Topic: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size();
}
+ @Override
public void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack,
final MessageReference node) throws IOException {
if (topicStore != null && node.isPersistent()) {
@@ -532,6 +542,7 @@ public class Topic extends BaseDestinati
messageConsumed(context, node);
}
+ @Override
public void gc() {
}
@@ -539,6 +550,7 @@ public class Topic extends BaseDestinati
return topicStore != null ? topicStore.getMessage(messageId) : null;
}
+ @Override
public void start() throws Exception {
this.subscriptionRecoveryPolicy.start();
if (memoryUsage != null) {
@@ -550,6 +562,7 @@ public class Topic extends BaseDestinati
}
}
+ @Override
public void stop() throws Exception {
if (taskRunner != null) {
taskRunner.shutdown();
@@ -565,6 +578,7 @@ public class Topic extends BaseDestinati
scheduler.cancel(expireMessagesTask);
}
+ @Override
public Message[] browse() {
final List<Message> result = new ArrayList<Message>();
doBrowse(result, getMaxBrowsePageSize());
@@ -576,6 +590,7 @@ public class Topic extends BaseDestinati
if (topicStore != null) {
final List<Message> toExpire = new ArrayList<Message>();
topicStore.recover(new MessageRecoveryListener() {
+ @Override
public boolean recoverMessage(Message message) throws Exception {
if (message.isExpired()) {
toExpire.add(message);
@@ -584,14 +599,17 @@ public class Topic extends BaseDestinati
return true;
}
+ @Override
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
return true;
}
+ @Override
public boolean hasSpace() {
return browseList.size() < max;
}
+ @Override
public boolean isDuplicate(MessageId id) {
return false;
}
@@ -616,6 +634,7 @@ public class Topic extends BaseDestinati
}
}
+ @Override
public boolean iterate() {
synchronized (messagesWaitingForSpace) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
@@ -661,6 +680,7 @@ public class Topic extends BaseDestinati
// Implementation methods
// -------------------------------------------------------------------------
+ @Override
public final void wakeup() {
}
@@ -698,12 +718,14 @@ public class Topic extends BaseDestinati
}
private final Runnable expireMessagesTask = new Runnable() {
+ @Override
public void run() {
List<Message> browsedMessages = new InsertionCountList<Message>();
doBrowse(browsedMessages, getMaxExpirePageSize());
}
};
+ @Override
public void messageExpired(ConnectionContext context, Subscription subs, MessageReference reference) {
broker.messageExpired(context, reference, subs);
// AMQ-2586: Better to leave this stat at zero than to give the user
@@ -760,6 +782,7 @@ public class Topic extends BaseDestinati
/**
* force a reread of the store - after transaction recovery completion
*/
+ @Override
public void clearPendingMessages() {
dispatchLock.readLock().lock();
try {