You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/22 00:23:50 UTC
git commit: isolate cursor storeHasMessage logic into durable topic
sub cursor b/c only durable sub cursors have selectors that won't match,
otherwise we should always read a page if the store has messages
Repository: activemq
Updated Branches:
refs/heads/trunk 74d2c2425 -> 8b8f63008
isolate cursor storeHasMessage logic into durable topic sub cursor b/c only durable sub cursors have selectors that won't match, otherwise we should always read a page if the store has messages
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b8f6300
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b8f6300
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b8f6300
Branch: refs/heads/trunk
Commit: 8b8f6300801f318ce0ce365c4ae600fce3548244
Parents: 74d2c24
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 21 23:21:45 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 21 23:23:15 2014 +0100
----------------------------------------------------------------------
.../region/cursors/AbstractStoreCursor.java | 17 ++++-------------
.../region/cursors/TopicStorePrefetch.java | 20 +++++++++++++++++---
2 files changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8b8f6300/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index f2df96e..07d4351 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -42,7 +42,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
protected final PendingList batchList;
private Iterator<MessageReference> iterator = null;
protected boolean batchResetNeeded = false;
- private boolean storeHasMessages = false;
protected int size;
private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
private static int SYNC_ADD = 0;
@@ -66,13 +65,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
super.start();
resetBatch();
resetSize();
- setCacheEnabled(!this.storeHasMessages&&useCache);
+ setCacheEnabled(size==0&&useCache);
}
}
protected void resetSize() {
this.size = getStoreSize();
- this.storeHasMessages=this.size > 0;
}
@Override
@@ -93,7 +91,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
boolean recovered = false;
- storeHasMessages = true;
if (recordUniqueId(message.getMessageId())) {
if (!cached) {
message.setRegionDestination(regionDestination);
@@ -202,7 +199,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return result;
}
- public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
+ public synchronized boolean addMessageLast(MessageReference node) throws Exception {
boolean disableCache = false;
if (hasSpace()) {
if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
@@ -230,7 +227,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
syncWithStore(node.getMessage());
setCacheEnabled(false);
}
- this.storeHasMessages = true;
size++;
return true;
}
@@ -380,18 +376,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
resetBatch();
this.batchResetNeeded = false;
}
- if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
- // avoid repeated trips to the store if there is nothing of interest
- this.storeHasMessages = false;
+ if (this.batchList.isEmpty() && this.size >0) {
try {
doFillBatch();
} catch (Exception e) {
LOG.error("{} - Failed to fill batch", this, e);
throw new RuntimeException(e);
}
- if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
- this.storeHasMessages = true;
- }
}
}
@@ -417,7 +408,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
@Override
public String toString() {
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
- + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+ + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+ ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
+ ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
+ ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");
http://git-wip-us.apache.org/repos/asf/activemq/blob/8b8f6300/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 9e02e4e..811531e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -40,6 +40,8 @@ class TopicStorePrefetch extends AbstractStoreCursor {
private final String subscriberName;
private final Subscription subscription;
private byte lastRecoveredPriority = 9;
+ private boolean storeHasMessages = false;
+
/**
* @param topic
* @param clientId
@@ -54,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
this.maxProducersToAudit=32;
this.maxAuditDepth=10000;
resetSize();
+ this.storeHasMessages=this.size > 0;
}
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@@ -65,8 +68,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
batchList.addMessageFirst(node);
size++;
}
-
-
+
+ @Override
+ public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
+ this.storeHasMessages = super.addMessageLast(node);
+ return this.storeHasMessages;
+ }
+
@Override
public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
LOG.trace("{} recover: {}, priority: {}", this, message.getMessageId(), message.getPriority());
@@ -78,6 +86,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
if (recovered && !cached) {
lastRecoveredPriority = message.getPriority();
}
+ storeHasMessages = true;
}
return recovered;
}
@@ -110,8 +119,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
protected void doFillBatch() throws Exception {
+ // avoid repeated trips to the store if there is nothing of interest
+ this.storeHasMessages = false;
this.store.recoverNextMessages(clientId, subscriberName,
maxBatchSize, this);
+ if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
+ this.storeHasMessages = true;
+ }
}
public byte getLastRecoveredPriority() {
@@ -129,6 +143,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
@Override
public String toString() {
- return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
+ return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
}
}