You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/10/09 14:49:07 UTC
[2/2] git commit: https://issues.apache.org/jira/browse/AMQ-4485 -
fix test regression with browse test - AMQ4595Test - reduce replay window
when sync and asnyc cursor updates flip message order -
concurrentStoreAndDispatch=true - https://issues.apache
https://issues.apache.org/jira/browse/AMQ-4485 - fix test regression with browse test - AMQ4595Test - reduce replay window when sync and asnyc cursor updates flip message order - concurrentStoreAndDispatch=true - https://issues.apache.org/jira/browse/AMQ-5266 - increse default audit depth to match async jobs for concurrent store
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97c127d2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97c127d2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97c127d2
Branch: refs/heads/trunk
Commit: 97c127d2d4086709a50a9da6f17773d6b1a5fc33
Parents: a56996d
Author: gtully <ga...@gmail.com>
Authored: Thu Oct 9 13:47:31 2014 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Oct 9 13:47:31 2014 +0100
----------------------------------------------------------------------
.../activemq/broker/region/BaseDestination.java | 2 +-
.../region/cursors/AbstractStoreCursor.java | 107 ++++++++++++++-----
.../region/cursors/QueueStorePrefetch.java | 2 +-
.../activemq/store/kahadb/KahaDBStore.java | 3 +-
.../activemq/bugs/AMQ4485LowLimitTest.java | 2 -
.../org/apache/activemq/bugs/AMQ4595Test.java | 13 +--
6 files changed, 92 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
index 601f59c..5a41df3 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
@@ -57,7 +57,7 @@ public abstract class BaseDestination implements Destination {
public static final long EXPIRE_MESSAGE_PERIOD = 30 * 1000;
public static final long DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC = 60 * 1000;
public static final int MAX_PRODUCERS_TO_AUDIT = 64;
- public static final int MAX_AUDIT_DEPTH = 2048;
+ public static final int MAX_AUDIT_DEPTH = 10000;
protected final ActiveMQDestination destination;
protected final Broker broker;
http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index bef6017..c08b293 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -18,6 +18,7 @@ package org.apache.activemq.broker.region.cursors;
import java.util.Iterator;
import java.util.LinkedList;
+import java.util.ListIterator;
import java.util.concurrent.Future;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@@ -41,7 +42,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
private boolean storeHasMessages = false;
protected int size;
private LinkedList<MessageId> pendingCachedIds = new LinkedList<>();
- MessageId lastCachedId = null;
+ private static int SYNC_ADD = 0;
+ private static int ASYNC_ADD = 1;
+ final MessageId[] lastCachedIds = new MessageId[2];
protected boolean hadSpace = false;
protected AbstractStoreCursor(Destination destination) {
@@ -203,12 +206,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
if (isCacheEnabled()) {
if (recoverMessage(node.getMessage(),true)) {
- if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
- pruneLastCached();
- pendingCachedIds.add(node.getMessageId());
- } else {
- setLastCachedId(node.getMessageId());
- }
+ trackLastCached(node);
} else {
dealWithDuplicates();
return false;
@@ -219,24 +217,78 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
}
if (disableCache && isCacheEnabled()) {
+ LOG.trace("{} - disabling cache on add {} {}", this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong());
setCacheEnabled(false);
- // sync with store on disabling the cache
- if (!pendingCachedIds.isEmpty() || lastCachedId != null) {
- LOG.trace("{} - disabling cache. current Id: {} seq: {}, batchList size: {}",
- new Object[]{this, node.getMessageId(), node.getMessageId().getFutureOrSequenceLong(), batchList.size()});
- pruneLastCached();
- if (lastCachedId != null) {
- setBatch(lastCachedId);
- lastCachedId = null;
- pendingCachedIds.clear();
- }
- }
+ syncWithStore();
}
this.storeHasMessages = true;
size++;
return true;
}
+ private void syncWithStore() throws Exception {
+ if (lastCachedIds[SYNC_ADD] == null) {
+ // only async adds, lets wait on the potential last add and reset from there
+ for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
+ MessageId lastStored = it.previous();
+ Object futureOrLong = lastStored.getFutureOrSequenceLong();
+ if (futureOrLong instanceof Future) {
+ Future future = (Future) futureOrLong;
+ if (future.isCancelled()) {
+ continue;
+ } else {
+ try {
+ future.get();
+ setLastCachedId(ASYNC_ADD, lastStored);
+ } catch (Exception ignored) {}
+ }
+ }
+ }
+ if (lastCachedIds[ASYNC_ADD] != null) {
+ setBatch(lastCachedIds[ASYNC_ADD]);
+ }
+ } else {
+ // mix of async and sync - async can exceed sync only if next in sequence
+ for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
+ MessageId candidate = it.next();
+ final Object futureOrLong = candidate.getFutureOrSequenceLong();
+ if (futureOrLong instanceof Future) {
+ Future future = (Future) futureOrLong;
+ if (future.isCancelled()) {
+ it.remove();
+ } else {
+ try {
+ future.get();
+ long next = 1 + (Long)lastCachedIds[SYNC_ADD].getFutureOrSequenceLong();
+ if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), next) == 0) {
+ setLastCachedId(SYNC_ADD, candidate);
+ } else {
+ // out of sequence, revert to sync state
+ LOG.trace("{} cursor order out of sync at seq {}, audit must suppress potential replay of {} messages from the store", this, next, pendingCachedIds.size());
+ break;
+ }
+ } catch (Exception ignored) {}
+ }
+ }
+ }
+ if (lastCachedIds[SYNC_ADD] != null) {
+ setBatch(lastCachedIds[SYNC_ADD]);
+ }
+
+ }
+ // cleanup
+ lastCachedIds[SYNC_ADD] = lastCachedIds[ASYNC_ADD] = null;
+ pendingCachedIds.clear();
+ }
+
+ private void trackLastCached(MessageReference node) {
+ if (node.getMessageId().getFutureOrSequenceLong() instanceof Future) {
+ pruneLastCached();
+ pendingCachedIds.add(node.getMessageId());
+ } else {
+ setLastCachedId(SYNC_ADD, node.getMessageId());
+ }
+ }
private void pruneLastCached() {
for (Iterator<MessageId> it = pendingCachedIds.iterator(); it.hasNext(); ) {
@@ -247,21 +299,22 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
if (future.isCancelled()) {
it.remove();
} else {
+ // we don't want to wait for work to complete
break;
}
} else {
- // store complete - track via lastCachedId
- setLastCachedId(candidate);
+ // complete
+ setLastCachedId(ASYNC_ADD, candidate);
it.remove();
}
}
}
- private void setLastCachedId(MessageId candidate) {
- if (lastCachedId == null || lastCachedId.getFutureOrSequenceLong() == null) { // possibly null for topics
- lastCachedId = candidate;
- } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedId.getFutureOrSequenceLong())) > 0) {
- lastCachedId = candidate;
+ private void setLastCachedId(final int index, MessageId candidate) {
+ if (lastCachedIds[index] == null || lastCachedIds[index].getFutureOrSequenceLong() == null) { // possibly null for topics
+ lastCachedIds[index] = candidate;
+ } else if (Long.compare(((Long) candidate.getFutureOrSequenceLong()), ((Long) lastCachedIds[index].getFutureOrSequenceLong())) > 0) {
+ lastCachedIds[index] = candidate;
}
}
@@ -351,7 +404,9 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
public String toString() {
return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
+ ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
- + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size() + ",lastCachedId:" + lastCachedId + ",lastCachedId-seq:" + (lastCachedId != null ? lastCachedId.getFutureOrSequenceLong() : "null");
+ + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace() + ",pendingCachedIds.size:" + pendingCachedIds.size()
+ + ",lastSyncCachedId:" + lastCachedIds[SYNC_ADD] + ",lastSyncCachedId-seq:" + (lastCachedIds[SYNC_ADD] != null ? lastCachedIds[SYNC_ADD].getFutureOrSequenceLong() : "null")
+ + ",lastAsyncCachedId:" + lastCachedIds[ASYNC_ADD] + ",lastAsyncCachedId-seq:" + (lastCachedIds[ASYNC_ADD] != null ? lastCachedIds[ASYNC_ADD].getFutureOrSequenceLong() : "null");
}
protected abstract void doFillBatch() throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
index 1f42d57..94dc817 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueStorePrefetch.java
@@ -94,7 +94,7 @@ class QueueStorePrefetch extends AbstractStoreCursor {
@Override
protected void setBatch(MessageId messageId) throws Exception {
- LOG.trace("{} setBatch {} loc: {}", this, messageId, messageId.getEntryLocator());
+ LOG.trace("{} setBatch {} seq: {}, loc: {}", this, messageId, messageId.getFutureOrSequenceLong(), messageId.getEntryLocator());
store.setBatch(messageId);
batchResetNeeded = false;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
index abdf4bf..eb5d1c4 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
@@ -42,6 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@@ -86,7 +87,7 @@ import org.slf4j.LoggerFactory;
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
static final Logger LOG = LoggerFactory.getLogger(KahaDBStore.class);
- private static final int MAX_ASYNC_JOBS = 10000;
+ private static final int MAX_ASYNC_JOBS = BaseDestination.MAX_AUDIT_DEPTH;
public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
index eb46f8f..38c85da 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485LowLimitTest.java
@@ -101,7 +101,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
addNetworkConnector(broker);
}
broker.setSchedulePeriodForDestinationPurge(0);
- //broker.getSystemUsage().setSendFailIfNoSpace(true);
broker.getSystemUsage().getMemoryUsage().setLimit(256 * 1024 * 1024l);
@@ -406,7 +405,6 @@ public class AMQ4485LowLimitTest extends JmsMultipleBrokersTestSupport {
int id = numMessages - val - 1;
ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
- //LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + id + " payload:" + payload);
textMessage.setIntProperty("NUM", id);
producer.send(compositeQ, textMessage);
http://git-wip-us.apache.org/repos/asf/activemq/blob/97c127d2/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
index 0baf5c3..507e52e 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4595Test.java
@@ -16,12 +16,9 @@
*/
package org.apache.activemq.bugs;
-import static org.junit.Assert.assertEquals;
-
import java.net.URI;
import java.util.Date;
import java.util.Enumeration;
-
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
@@ -29,13 +26,10 @@ import javax.jms.MessageProducer;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
-
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
@@ -43,6 +37,9 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+
public class AMQ4595Test {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4595Test.class);
@@ -112,6 +109,8 @@ public class AMQ4595Test {
}
producerConnection.close();
+ LOG.info("Mem usage after producer done: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
+
// Browse the queue.
Connection connection = factory.createConnection();
connection.start();
@@ -131,6 +130,8 @@ public class AMQ4595Test {
session.close();
connection.close();
+ LOG.info("Mem usage after browser closed: " + broker.getSystemUsage().getMemoryUsage().getPercentUsage() + "%");
+
// The number of messages browsed should be equal to the number of messages sent.
assertEquals(messageToSend, browsed);