You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/10/13 10:57:07 UTC
svn commit: r703975 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors:
AbstractPendingMessageCursor.java AbstractStoreCursor.java
PendingMessageCursor.java StoreDurableSubscriberCursor.java
TopicStorePrefetch.java
Author: rajdavies
Date: Mon Oct 13 01:57:07 2008
New Revision: 703975
URL: http://svn.apache.org/viewvc?rev=703975&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1971
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java Mon Oct 13 01:57:07 2008
@@ -246,15 +246,7 @@
return false;
}
- /**
- * Mark a message as already dispatched
- * @param message
- */
- public void dispatched(MessageReference message) {
- //add it to the audit
- isDuplicate(message.getMessageId());
- }
-
+
/**
* set the audit
* @param audit
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java Mon Oct 13 01:57:07 2008
@@ -37,11 +37,11 @@
public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor implements MessageRecoveryListener, UsageListener {
private static final Log LOG = LogFactory.getLog(AbstractStoreCursor.class);
protected final Destination regionDestination;
- protected final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
+ private final LinkedHashMap<MessageId,Message> batchList = new LinkedHashMap<MessageId,Message> ();
+ private Iterator<Entry<MessageId, Message>> iterator = null;
protected boolean cacheEnabled=false;
protected boolean batchResetNeeded = true;
protected boolean storeHasMessages = false;
- protected Iterator<Entry<MessageId, Message>> iterator = null;
protected int size;
protected AbstractStoreCursor(Destination destination) {
@@ -84,6 +84,7 @@
}
message.incrementReferenceCount();
batchList.put(message.getMessageId(), message);
+ clearIterator(true);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring batched duplicated from store: " + message);
@@ -102,11 +103,25 @@
throw new RuntimeException(e);
}
}
- this.iterator = this.batchList.entrySet().iterator();
+ clearIterator(true);
}
- public void release() {
+ public synchronized void release() {
+ clearIterator(false);
+ }
+
+ private synchronized void clearIterator(boolean ensureIterator) {
+ boolean haveIterator = this.iterator != null;
this.iterator=null;
+ if(haveIterator&&ensureIterator) {
+ ensureIterator();
+ }
+ }
+
+ private synchronized void ensureIterator() {
+ if(this.iterator==null) {
+ this.iterator=this.batchList.entrySet().iterator();
+ }
}
@@ -117,16 +132,12 @@
if (batchList.isEmpty()) {
try {
fillBatch();
- this.iterator = this.batchList.entrySet().iterator();
} catch (Exception e) {
LOG.error("Failed to fill batch", e);
throw new RuntimeException(e);
}
- }else {
- if (this.iterator==null) {
- this.iterator=this.batchList.entrySet().iterator();
- }
}
+ ensureIterator();
return this.iterator.hasNext();
}
@@ -192,6 +203,7 @@
msg.decrementReferenceCount();
}
batchList.clear();
+ clearIterator(false);
batchResetNeeded = true;
this.cacheEnabled=false;
if (isStarted()) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java Mon Oct 13 01:57:07 2008
@@ -255,11 +255,6 @@
*/
public boolean isTransient();
- /**
- * Mark a message as already dispatched
- * @param message
- */
- public void dispatched(MessageReference message);
/**
* set the audit
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java Mon Oct 13 01:57:07 2008
@@ -300,17 +300,6 @@
}
}
- /**
- * Mark a message as already dispatched
- * @param message
- */
- public synchronized void dispatched(MessageReference message) {
- super.dispatched(message);
- for (PendingMessageCursor cursor : storePrefetches) {
- cursor.dispatched(message);
- }
- }
-
protected synchronized PendingMessageCursor getNextCursor() throws Exception {
if (currentCursor == null || currentCursor.isEmpty()) {
currentCursor = null;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java?rev=703975&r1=703974&r2=703975&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java Mon Oct 13 01:57:07 2008
@@ -64,20 +64,7 @@
throw new RuntimeException("Not supported");
}
- /**
- * Mark a message as already dispatched
- * @param message
- */
- public synchronized void dispatched(MessageReference message) {
- if (this.audit != null) {
- isDuplicate(message.getMessageId());
- Message removed = this.batchList.remove(message.getMessageId());
- if (removed != null) {
- removed.decrementReferenceCount();
- }
- }
- }
-
+
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
MessageEvaluationContext messageEvaluationContext = new NonCachedMessageEvaluationContext();
messageEvaluationContext.setMessageReference(message);