You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/14 23:43:16 UTC
svn commit: r1409489 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
Author: tabish
Date: Wed Nov 14 22:43:15 2012
New Revision: 1409489
URL: http://svn.apache.org/viewvc?rev=1409489&view=rev
Log:
fix for breakage caused by earlier fix to: https://issues.apache.org/jira/browse/AMQ-4062
Only reconfigure when keepDurableSubsActive=false
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java?rev=1409489&r1=1409488&r2=1409489&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java Wed Nov 14 22:43:15 2012
@@ -51,8 +51,8 @@ public class DurableTopicSubscription ex
private final ConcurrentHashMap<ActiveMQDestination, Destination> durableDestinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
- private AtomicBoolean active = new AtomicBoolean();
- private AtomicLong offlineTimestamp = new AtomicLong(-1);
+ private final AtomicBoolean active = new AtomicBoolean();
+ private final AtomicLong offlineTimestamp = new AtomicLong(-1);
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws JMSException {
@@ -76,10 +76,12 @@ public class DurableTopicSubscription ex
offlineTimestamp.set(timestamp);
}
+ @Override
public boolean isFull() {
return !active.get() || super.isFull();
}
+ @Override
public void gc() {
}
@@ -87,6 +89,7 @@ public class DurableTopicSubscription ex
* store will have a pending ack for all durables, irrespective of the
* selector so we need to ack if node is un-matched
*/
+ @Override
public void unmatched(MessageReference node) throws IOException {
MessageAck ack = new MessageAck();
ack.setAckType(MessageAck.UNMATCHED_ACK_TYPE);
@@ -100,6 +103,7 @@ public class DurableTopicSubscription ex
// statically configured via maxPageSize
}
+ @Override
public void add(ConnectionContext context, Destination destination) throws Exception {
if (!destinations.contains(destination)) {
super.add(context, destination);
@@ -135,15 +139,6 @@ public class DurableTopicSubscription ex
this.context = context;
this.info = info;
- // On Activation we should update the configuration based on our new consumer info.
- ActiveMQDestination dest = this.info.getDestination();
- if (dest != null && regionBroker.getDestinationPolicy() != null) {
- PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
- if (entry != null) {
- entry.configure(broker, usageManager, this);
- }
- }
-
if (LOG.isDebugEnabled()) {
LOG.debug("Activating " + this);
}
@@ -153,7 +148,17 @@ public class DurableTopicSubscription ex
add(context, topic);
topic.activate(context, this);
}
+
+ // On Activation we should update the configuration based on our new consumer info.
+ ActiveMQDestination dest = this.info.getDestination();
+ if (dest != null && regionBroker.getDestinationPolicy() != null) {
+ PolicyEntry entry = regionBroker.getDestinationPolicy().getEntryFor(dest);
+ if (entry != null) {
+ entry.configure(broker, usageManager, this);
+ }
+ }
}
+
synchronized (pendingLock) {
pending.setSystemUsage(memoryManager);
pending.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
@@ -234,6 +239,7 @@ public class DurableTopicSubscription ex
prefetchExtension.set(0);
}
+ @Override
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
MessageDispatch md = super.createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) {
@@ -245,6 +251,7 @@ public class DurableTopicSubscription ex
return md;
}
+ @Override
public void add(MessageReference node) throws Exception {
if (!active.get() && !keepDurableSubsActive) {
return;
@@ -252,6 +259,7 @@ public class DurableTopicSubscription ex
super.add(node);
}
+ @Override
protected void dispatchPending() throws IOException {
if (isActive()) {
super.dispatchPending();
@@ -262,12 +270,14 @@ public class DurableTopicSubscription ex
pending.remove(node);
}
+ @Override
protected void doAddRecoveredMessage(MessageReference message) throws Exception {
synchronized (pending) {
pending.addRecoveredMessage(message);
}
}
+ @Override
public int getPendingQueueSize() {
if (active.get() || keepDurableSubsActive) {
return super.getPendingQueueSize();
@@ -276,14 +286,17 @@ public class DurableTopicSubscription ex
return 0;
}
+ @Override
public void setSelector(String selector) throws InvalidSelectorException {
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions");
}
+ @Override
protected boolean canDispatch(MessageReference node) {
return isActive();
}
+ @Override
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.acknowledge(context, this, ack, node);
@@ -291,6 +304,7 @@ public class DurableTopicSubscription ex
node.decrementReferenceCount();
}
+ @Override
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
+ durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
@@ -304,6 +318,7 @@ public class DurableTopicSubscription ex
/**
* Release any references that we are holding.
*/
+ @Override
public void destroy() {
synchronized (pendingLock) {
try {
@@ -327,6 +342,7 @@ public class DurableTopicSubscription ex
setSlowConsumer(false);
}
+ @Override
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
try {
@@ -337,6 +353,7 @@ public class DurableTopicSubscription ex
}
}
+ @Override
protected boolean isDropped(MessageReference node) {
return false;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java?rev=1409489&r1=1409488&r2=1409489&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ4062Test.java Wed Nov 14 22:43:15 2012
@@ -96,6 +96,7 @@ public class AMQ4062Test {
service=new BrokerService();
service.setPersistent(true);
service.setUseJmx(false);
+ service.setKeepDurableSubsActive(false);
KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter();
File dataFile=new File("createData");