You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/09 14:49:07 UTC

[2/2] git commit: https://issues.apache.org/jira/browse/AMQ-4485 - fix test regression with browse test - AMQ4595Test - reduce replay window when sync and asnyc cursor updates flip message order - concurrentStoreAndDispatch=true - https://issues.apache

https://issues.apache.org/jira/browse/AMQ-4485  - fix test regression with browse test - AMQ4595Test - reduce replay window when sync and asnyc cursor updates flip message order - concurrentStoreAndDispatch=true -  https://issues.apache.org/jira/browse/AMQ-5266 - increse default audit depth to match async jobs for concurrent store


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97c127d2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97c127d2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97c127d2

Branch: refs/heads/trunk
Commit: 97c127d2d4086709a50a9da6f17773d6b1a5fc33
Parents: a56996d
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 9 13:47:31 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 9 13:47:31 2014 +0100

----------------------------------------------------------------------
 .../activemq/broker/region/BaseDestination.java |   2 +-
 .../region/cursors/AbstractStoreCursor.java     | 107 ++++++++++++++-----
 .../region/cursors/QueueStorePrefetch.java      |   2 +-
 .../activemq/store/kahadb/KahaDBStore.java      |   3 +-
 .../activemq/bugs/AMQ4485LowLimitTest.java      |   2 -
 .../org/apache/activemq/bugs/AMQ4595Test.java   |  13 +--
 6 files changed, 92 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 601f59c..5a41df3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -57,7 +57,7 @@ public abstract class BaseDestination implements Destination {
     public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
     public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
     public static final int MAX_PRODUCERS_TO_AUDIT = 64;
-    public static final int MAX_AUDIT_DEPTH = 2048;
+    public static final int MAX_AUDIT_DEPTH = 10000;
 
     protected final ActiveMQDestination destination;
     protected final Broker broker;

http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index bef6017..c08b293 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors;
 
 import java.util.Iterator;
 import java.util.LinkedList;
+import java.util.ListIterator;
 import java.util.concurrent.Future;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
@@ -41,7 +42,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     private boolean storeHasMessages = false;
     protected int size;
     private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
-    MessageId lastCachedId = null;
+    private static int SYNC_ADD = 0;
+    private static int ASYNC_ADD = 1;
+    final MessageId[] lastCachedIds = new MessageId[2];
     protected boolean hadSpace = false;
 
     protected AbstractStoreCursor(Destination destination) {
@@ -203,12 +206,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             }
             if (isCacheEnabled()) {
                 if (recoverMessage(node.getMessage(),true)) {
-                    if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
-                        pruneLastCached();
-                        pendingCachedIds.add(node.getMessageId());
-                    } else {
-                        setLastCachedId(node.getMessageId());
-                    }
+                    trackLastCached(node);
                 } else {
                     dealWithDuplicates();
                     return false;
@@ -219,24 +217,78 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
         }
 
         if (disableCache && isCacheEnabled()) {
+            LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
             setCacheEnabled(false);
-            // sync with store on disabling the cache
-            if (!pendingCachedIds.isEmpty() || lastCachedId != null) {
-                LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}",
-                            new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()});
-                pruneLastCached();
-                if (lastCachedId != null) {
-                    setBatch(lastCachedId);
-                    lastCachedId = null;
-                    pendingCachedIds.clear();
-                }
-            }
+            syncWithStore();
         }
         this.storeHasMessages = true;
         size++;
         return true;
     }
 
+    private void syncWithStore() throws Exception {
+        if (lastCachedIds[SYNC_ADD] == null) {
+            // only async adds, lets wait on the potential last add and reset from there
+            for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
+                MessageId lastStored = it.previous();
+                Object futureOrLong = lastStored.getFutureOrSequenceLong();
+                if (futureOrLong instanceof Future) {
+                    Future future = (Future) futureOrLong;
+                    if (future.isCancelled()) {
+                        continue;
+                    } else {
+                        try {
+                            future.get();
+                            setLastCachedId(ASYNC_ADD, lastStored);
+                        } catch (Exception ignored) {}
+                    }
+                }
+            }
+            if (lastCachedIds[ASYNC_ADD] != null) {
+                setBatch(lastCachedIds[ASYNC_ADD]);
+            }
+        } else {
+            // mix of async and sync - async can exceed sync only if next in sequence
+            for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
+                MessageId candidate = it.next();
+                final Object futureOrLong = candidate.getFutureOrSequenceLong();
+                if (futureOrLong instanceof Future) {
+                    Future future = (Future) futureOrLong;
+                    if (future.isCancelled()) {
+                        it.remove();
+                    } else {
+                        try {
+                            future.get();
+                            long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
+                            if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) {
+                                setLastCachedId(SYNC_ADD, candidate);
+                            } else {
+                                // out of sequence, revert to sync state
+                                LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
+                                break;
+                            }
+                        } catch (Exception ignored) {}
+                    }
+                }
+            }
+            if (lastCachedIds[SYNC_ADD] != null) {
+                setBatch(lastCachedIds[SYNC_ADD]);
+            }
+
+        }
+        // cleanup
+        lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
+        pendingCachedIds.clear();
+    }
+
+    private void trackLastCached(MessageReference node) {
+        if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
+            pruneLastCached();
+            pendingCachedIds.add(node.getMessageId());
+        } else {
+            setLastCachedId(SYNC_ADD, node.getMessageId());
+        }
+    }
 
     private void pruneLastCached() {
         for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
@@ -247,21 +299,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
                 if (future.isCancelled()) {
                     it.remove();
                 } else {
+                    // we don't want to wait for work to complete
                     break;
                 }
             } else {
-                // store complete - track via lastCachedId
-                setLastCachedId(candidate);
+                // complete
+                setLastCachedId(ASYNC_ADD, candidate);
                 it.remove();
             }
         }
     }
 
-    private void setLastCachedId(MessageId candidate) {
-        if (lastCachedId == null || lastCachedId.getFutureOrSequenceLong() == null) {  // possibly null for topics
-            lastCachedId = candidate;
-        } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) {
-            lastCachedId = candidate;
+    private void setLastCachedId(final int index, MessageId candidate) {
+        if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong() == null) {  // possibly null for topics
+            lastCachedIds[index] = candidate;
+        } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong())) > 0) {
+            lastCachedIds[index] = candidate;
         }
     }
 
@@ -351,7 +404,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
-                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId + ",lastCachedId-seq:" + (lastCachedId != null ? lastCachedId.getFutureOrSequenceLong() : "null");
+                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
+                    + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
+                    + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");
     }
     
     protected abstract void doFillBatch() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 1f42d57..94dc817 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -94,7 +94,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
     
     @Override
     protected void setBatch(MessageId messageId) throws Exception {
-        LOG.trace("{}  setBatch {} loc: {}", this, messageId, messageId.getEntryLocator());
+        LOG.trace("{}  setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
         store.setBatch(messageId);
         batchResetNeeded = false;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index abdf4bf..eb5d1c4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.BaseDestination;
 import org.apache.activemq.broker.scheduler.JobSchedulerStore;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
@@ -86,7 +87,7 @@ import org.slf4j.LoggerFactory;
 
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
-    private static final int MAX_ASYNC_JOBS = 10000;
+    private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
 
     public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
     public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(

http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
index eb46f8f..38c85da 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
@@ -101,7 +101,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
             addNetworkConnector(broker);
         }
         broker.setSchedulePeriodForDestinationPurge(0);
-        //broker.getSystemUsage().setSendFailIfNoSpace(true);
         broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);
 
 
@@ -406,7 +405,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
                             int id = numMessages - val - 1;
 
                             ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
-                            //LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
                             Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload);
                             textMessage.setIntProperty("NUM", id);
                             producer.send(compositeQ, textMessage);

http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
index 0baf5c3..507e52e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
@@ -16,12 +16,9 @@
  */
 package org.apache.activemq.bugs;
 
-import static org.junit.Assert.assertEquals;
-
 import java.net.URI;
 import java.util.Date;
 import java.util.Enumeration;
-
 import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
@@ -29,13 +26,10 @@ import javax.jms.MessageProducer;
 import javax.jms.QueueBrowser;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.junit.After;
 import org.junit.Before;
@@ -43,6 +37,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
+import static org.junit.Assert.assertEquals;
+
 public class AMQ4595Test {
 
     private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class);
@@ -112,6 +109,8 @@ public class AMQ4595Test {
         }
         producerConnection.close();
 
+        LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
+
         // Browse the queue.
         Connection connection = factory.createConnection();
         connection.start();
@@ -131,6 +130,8 @@ public class AMQ4595Test {
         session.close();
         connection.close();
 
+        LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
+
         // The number of messages browsed should be equal to the number of messages sent.
         assertEquals(messageToSend, browsed);