You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/22 00:23:50 UTC

git commit: isolate cursor storeHasMessage logic into durable topic sub cursor b/c only durable sub cursors have selectors that won't match, otherwise we should always read a page if the store has messages

Repository: activemq
Updated Branches:
  refs/heads/trunk 74d2c2425 -> 8b8f63008


isolate cursor storeHasMessage logic into durable topic sub cursor b/c only durable sub cursors have selectors that won't match, otherwise we should always read a page if the store has messages


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8b8f6300
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8b8f6300
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8b8f6300

Branch: refs/heads/trunk
Commit: 8b8f6300801f318ce0ce365c4ae600fce3548244
Parents: 74d2c24
Author: gtully <ga...@gmail.com>
Authored: Tue Oct 21 23:21:45 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue Oct 21 23:23:15 2014 +0100

----------------------------------------------------------------------
 .../region/cursors/AbstractStoreCursor.java     | 17 ++++-------------
 .../region/cursors/TopicStorePrefetch.java      | 20 +++++++++++++++++---
 2 files changed, 21 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/8b8f6300/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index f2df96e..07d4351 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -42,7 +42,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     protected final PendingList batchList;
     private Iterator<MessageReference> iterator = null;
     protected boolean batchResetNeeded = false;
-    private boolean storeHasMessages = false;
     protected int size;
     private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
     private static int SYNC_ADD = 0;
@@ -66,13 +65,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             super.start();
             resetBatch();
             resetSize();
-            setCacheEnabled(!this.storeHasMessages&&useCache);
+            setCacheEnabled(size==0&&useCache);
         } 
     }
 
     protected void resetSize() {
         this.size = getStoreSize();
-        this.storeHasMessages=this.size > 0;
     }
 
     @Override
@@ -93,7 +91,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
 
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
         boolean recovered = false;
-        storeHasMessages = true;
         if (recordUniqueId(message.getMessageId())) {
             if (!cached) {
                 message.setRegionDestination(regionDestination);
@@ -202,7 +199,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         return result;
     }
     
-    public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
+    public synchronized boolean addMessageLast(MessageReference node) throws Exception {
         boolean disableCache = false;
         if (hasSpace()) {
             if (!isCacheEnabled() && size==0 && isStarted() && useCache) {
@@ -230,7 +227,6 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             syncWithStore(node.getMessage());
             setCacheEnabled(false);
         }
-        this.storeHasMessages = true;
         size++;
         return true;
     }
@@ -380,18 +376,13 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             resetBatch();
             this.batchResetNeeded = false;
         }
-        if (this.batchList.isEmpty() && this.storeHasMessages && this.size >0) {
-            // avoid repeated  trips to the store if there is nothing of interest
-            this.storeHasMessages = false;
+        if (this.batchList.isEmpty() && this.size >0) {
             try {
                 doFillBatch();
             } catch (Exception e) {
                 LOG.error("{} - Failed to fill batch", this, e);
                 throw new RuntimeException(e);
             }
-            if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
-                this.storeHasMessages = true;
-            }
         }
     }
     
@@ -417,7 +408,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     @Override
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
-                    + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
+                    + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
                     + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
                     + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
                     + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");

http://git-wip-us.apache.org/repos/asf/activemq/blob/8b8f6300/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
index 9e02e4e..811531e 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/TopicStorePrefetch.java
@@ -40,6 +40,8 @@ class TopicStorePrefetch extends AbstractStoreCursor {
     private final String subscriberName;
     private final Subscription subscription;
     private byte lastRecoveredPriority = 9;
+    private boolean storeHasMessages = false;
+
     /**
      * @param topic
      * @param clientId
@@ -54,6 +56,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         this.maxProducersToAudit=32;
         this.maxAuditDepth=10000;
         resetSize();
+        this.storeHasMessages=this.size > 0;
     }
 
     public boolean recoverMessageReference(MessageId messageReference) throws Exception {
@@ -65,8 +68,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
         batchList.addMessageFirst(node);
         size++;
     }
-    
-        
+
+    @Override
+    public final synchronized boolean addMessageLast(MessageReference node) throws Exception {
+        this.storeHasMessages = super.addMessageLast(node);
+        return this.storeHasMessages;
+    }
+
     @Override
     public synchronized boolean recoverMessage(Message message, boolean cached) throws Exception {
         LOG.trace("{} recover: {}, priority: {}", this, message.getMessageId(), message.getPriority());
@@ -78,6 +86,7 @@ class TopicStorePrefetch extends AbstractStoreCursor {
             if (recovered && !cached) {
                 lastRecoveredPriority = message.getPriority();
             }
+            storeHasMessages = true;
         }
         return recovered;      
     }
@@ -110,8 +119,13 @@ class TopicStorePrefetch extends AbstractStoreCursor {
 
     @Override
     protected void doFillBatch() throws Exception {
+        // avoid repeated  trips to the store if there is nothing of interest
+        this.storeHasMessages = false;
         this.store.recoverNextMessages(clientId, subscriberName,
                 maxBatchSize, this);
+        if (!this.storeHasMessages && (!this.batchList.isEmpty() || !hadSpace)) {
+            this.storeHasMessages = true;
+        }
     }
 
     public byte getLastRecoveredPriority() {
@@ -129,6 +143,6 @@ class TopicStorePrefetch extends AbstractStoreCursor {
 
     @Override
     public String toString() {
-        return "TopicStorePrefetch(" + clientId + "," + subscriberName + ") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
+        return "TopicStorePrefetch(" + clientId + "," + subscriberName + ",storeHasMessages=" + this.storeHasMessages +") " + this.subscription.getConsumerInfo().getConsumerId() + " - " + super.toString();
     }
 }