You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/02 19:11:00 UTC
svn commit: r960060 - in /activemq/trunk:
activemq-core/src/main/java/org/apache/activemq/store/kahadb/
activemq-core/src/test/java/org/apache/activemq/bugs/
activemq-core/src/test/java/org/apache/activemq/perf/
kahadb/src/main/java/org/apache/kahadb/j...
Author: gtully
Date: Fri Jul 2 17:11:00 2010
New Revision: 960060
URL: http://svn.apache.org/viewvc?rev=960060&view=rev
Log:
improve async task concurrency, contention over indexMutex for transactions and taskMap, not per destination. Added some metrics enabled by system properties to help further diagnosis of journal usage
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jul 2 17:11:00 2010
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -84,10 +85,16 @@ import org.apache.kahadb.page.Transactio
public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
static final Log LOG = LogFactory.getLog(KahaDBStore.class);
private static final int MAX_ASYNC_JOBS = 10000;
+
+ public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
+ public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
+ public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
+ private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
+
protected ExecutorService queueExecutor;
protected ExecutorService topicExecutor;
- protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
- protected final Map<AsyncJobKey, StoreTopicTask> asyncTopicMap = new HashMap<AsyncJobKey, StoreTopicTask>();
+ protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
+ protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
private final WireFormat wireFormat = new OpenWireFormat();
private SystemUsage usageManager;
private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
@@ -183,7 +190,7 @@ public class KahaDBStore extends Message
this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
- this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
+ this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
@@ -191,7 +198,7 @@ public class KahaDBStore extends Message
return thread;
}
});
- this.topicExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
+ this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
@@ -208,21 +215,29 @@ public class KahaDBStore extends Message
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
- synchronized (this.asyncQueueMap) {
- for (StoreQueueTask task : this.asyncQueueMap.values()) {
- task.cancel();
+ synchronized (this.asyncQueueMaps) {
+ for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
+ synchronized (m) {
+ for (StoreTask task : m.values()) {
+ task.cancel();
+ }
+ }
}
- this.asyncQueueMap.clear();
+ this.asyncQueueMaps.clear();
}
LOG.info("Stopping async topic tasks");
if (this.globalTopicSemaphore != null) {
this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
}
- synchronized (this.asyncTopicMap) {
- for (StoreTopicTask task : this.asyncTopicMap.values()) {
- task.cancel();
+ synchronized (this.asyncTopicMaps) {
+ for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
+ synchronized (m) {
+ for (StoreTask task : m.values()) {
+ task.cancel();
+ }
+ }
}
- this.asyncTopicMap.clear();
+ this.asyncTopicMaps.clear();
}
if (this.globalQueueSemaphore != null) {
this.globalQueueSemaphore.drainPermits();
@@ -242,30 +257,30 @@ public class KahaDBStore extends Message
protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
StoreQueueTask task = null;
- synchronized (this.asyncQueueMap) {
- task = this.asyncQueueMap.remove(new AsyncJobKey(id, store.getDestination()));
+ synchronized (store.asyncTaskMap) {
+ task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
- synchronized (this.asyncQueueMap) {
- this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
+ synchronized (store.asyncTaskMap) {
+ store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.queueExecutor.execute(task);
}
- protected StoreTopicTask removeTopicTask(KahaDBMessageStore store, MessageId id) {
+ protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
StoreTopicTask task = null;
- synchronized (this.asyncTopicMap) {
- task = this.asyncTopicMap.remove(new AsyncJobKey(id, store.getDestination()));
+ synchronized (store.asyncTaskMap) {
+ task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
}
return task;
}
- protected void addTopicTask(KahaDBMessageStore store, StoreTopicTask task) throws IOException {
- synchronized (this.asyncTopicMap) {
- this.asyncTopicMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
+ protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
+ synchronized (store.asyncTaskMap) {
+ store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
}
this.topicExecutor.execute(task);
}
@@ -275,9 +290,12 @@ public class KahaDBStore extends Message
}
public class KahaDBMessageStore extends AbstractMessageStore {
+ protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
protected KahaDestination dest;
private final int maxAsyncJobs;
private final Semaphore localDestinationSemaphore;
+
+ double doneTasks, canceledTasks = 0;
public KahaDBMessageStore(ActiveMQDestination destination) {
super(destination);
@@ -309,8 +327,8 @@ public class KahaDBStore extends Message
if (isConcurrentStoreAndDispatchQueues()) {
AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
StoreQueueTask task = null;
- synchronized (asyncQueueMap) {
- task = asyncQueueMap.get(key);
+ synchronized (asyncTaskMap) {
+ task = (StoreQueueTask) asyncTaskMap.get(key);
}
if (task != null) {
if (!task.cancel()) {
@@ -324,8 +342,8 @@ public class KahaDBStore extends Message
}
removeMessage(context, ack);
} else {
- synchronized (asyncQueueMap) {
- asyncQueueMap.remove(key);
+ synchronized (asyncTaskMap) {
+ asyncTaskMap.remove(key);
}
}
} else {
@@ -545,6 +563,7 @@ public class KahaDBStore extends Message
public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
super(destination);
this.subscriptionCount.set(getAllSubscriptions().length);
+ asyncTopicMaps.add(asyncTaskMap);
}
@Override
@@ -566,15 +585,15 @@ public class KahaDBStore extends Message
if (isConcurrentStoreAndDispatchTopics()) {
AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
StoreTopicTask task = null;
- synchronized (asyncTopicMap) {
- task = asyncTopicMap.get(key);
+ synchronized (asyncTaskMap) {
+ task = (StoreTopicTask) asyncTaskMap.get(key);
}
if (task != null) {
if (task.addSubscriptionKey(subscriptionKey)) {
removeTopicTask(this, messageId);
if (task.cancel()) {
- synchronized (asyncTopicMap) {
- asyncTopicMap.remove(key);
+ synchronized (asyncTaskMap) {
+ asyncTaskMap.remove(key);
}
}
}
@@ -994,8 +1013,12 @@ public class KahaDBStore extends Message
return destination.getPhysicalName() + "-" + id;
}
}
-
- class StoreQueueTask implements Runnable {
+
+ interface StoreTask {
+ public boolean cancel();
+ }
+
+ class StoreQueueTask implements Runnable, StoreTask {
protected final Message message;
protected final ConnectionContext context;
protected final KahaDBMessageStore store;
@@ -1044,11 +1067,15 @@ public class KahaDBStore extends Message
}
public void run() {
+ this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.store.addMessage(context, message);
removeQueueTask(this.store, this.message.getMessageId());
this.future.complete();
+ } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
+ System.err.println(this.store.dest.getName() + " cancelled: " + (this.store.canceledTasks/this.store.doneTasks) * 100);
+ this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
this.future.setException(e);
@@ -1128,6 +1155,7 @@ public class KahaDBStore extends Message
@Override
public void run() {
+ this.store.doneTasks++;
try {
if (this.done.compareAndSet(false, true)) {
this.topicStore.addMessage(context, message);
@@ -1140,6 +1168,9 @@ public class KahaDBStore extends Message
}
removeTopicTask(this.topicStore, this.message.getMessageId());
this.future.complete();
+ } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
+ System.err.println(this.store.dest.getName() + " cancelled: " + (this.store.canceledTasks/this.store.doneTasks) * 100);
+ this.store.canceledTasks = this.store.doneTasks = 0;
}
} catch (Exception e) {
this.future.setException(e);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Jul 2 17:11:00 2010
@@ -367,6 +367,7 @@ public class MessageDatabase extends Ser
*/
private Location getFirstInProgressTxLocation() {
Location l = null;
+ synchronized (inflightTransactions) {
if (!inflightTransactions.isEmpty()) {
l = inflightTransactions.values().iterator().next().get(0).getLocation();
}
@@ -376,6 +377,7 @@ public class MessageDatabase extends Ser
l = t;
}
}
+ }
return l;
}
@@ -746,7 +748,7 @@ public class MessageDatabase extends Ser
protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
- synchronized (indexMutex) {
+ synchronized (inflightTransactions) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new AddOpperation(command, location));
}
@@ -763,7 +765,7 @@ public class MessageDatabase extends Ser
protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
if (command.hasTransactionInfo()) {
- synchronized (indexMutex) {
+ synchronized (inflightTransactions) {
ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
inflightTx.add(new RemoveOpperation(command, location));
}
@@ -801,16 +803,19 @@ public class MessageDatabase extends Ser
protected void process(KahaCommitCommand command, Location location) throws IOException {
TransactionId key = key(command.getTransactionInfo());
- synchronized (indexMutex) {
- ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
+ ArrayList<Operation> inflightTx = null;
+ synchronized (inflightTransactions) {
+ inflightTx = inflightTransactions.remove(key);
if (inflightTx == null) {
inflightTx = preparedTransactions.remove(key);
}
- if (inflightTx == null) {
- return;
- }
+ }
+ if (inflightTx == null) {
+ return;
+ }
- final ArrayList<Operation> messagingTx = inflightTx;
+ final ArrayList<Operation> messagingTx = inflightTx;
+ synchronized (indexMutex) {
pageFile.tx().execute(new Transaction.Closure<IOException>() {
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
@@ -822,7 +827,7 @@ public class MessageDatabase extends Ser
}
protected void process(KahaPrepareCommand command, Location location) {
- synchronized (indexMutex) {
+ synchronized (inflightTransactions) {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> tx = inflightTransactions.remove(key);
if (tx != null) {
@@ -832,7 +837,7 @@ public class MessageDatabase extends Ser
}
protected void process(KahaRollbackCommand command, Location location) {
- synchronized (indexMutex) {
+ synchronized (inflightTransactions) {
TransactionId key = key(command.getTransactionInfo());
ArrayList<Operation> tx = inflightTransactions.remove(key);
if (tx == null) {
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Fri Jul 2 17:11:00 2010
@@ -86,8 +86,7 @@ public class AMQ2149Test extends TestCas
public void createBroker(Configurer configurer) throws Exception {
broker = new BrokerService();
- AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
- persistenceFactory.setDataDirectory(dataDirFile);
+ configurePersistenceAdapter(broker);
SystemUsage usage = new SystemUsage();
MemoryUsage memoryUsage = new MemoryUsage();
@@ -95,7 +94,7 @@ public class AMQ2149Test extends TestCas
usage.setMemoryUsage(memoryUsage);
broker.setSystemUsage(usage);
- broker.setPersistenceFactory(persistenceFactory);
+
broker.addConnector(BROKER_CONNECTOR);
broker.setBrokerName(getName());
@@ -106,6 +105,12 @@ public class AMQ2149Test extends TestCas
broker.start();
}
+ protected void configurePersistenceAdapter(BrokerService brokerService) {
+ AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+ persistenceFactory.setDataDirectory(dataDirFile);
+ brokerService.setPersistenceFactory(persistenceFactory);
+ }
+
public void setUp() throws Exception {
dataDirFile = new File("target/"+ getName());
numtoSend = DEFAULT_NUM_TO_SEND;
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java Fri Jul 2 17:11:00 2010
@@ -47,5 +47,6 @@ public class KahaDBDurableTopicTest exte
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector(uri);
answer.setUseShutdownHook(false);
+ answer.setEnableStatistics(false);
}
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri Jul 2 17:11:00 2010
@@ -257,7 +257,11 @@ class DataFileAppender {
// Otherwise wait for the queuedCommand to be null
try {
while (nextWriteBatch != null) {
+ final long start = System.currentTimeMillis();
enqueueMutex.wait();
+ if (maxStat > 0) {
+ System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
+ }
}
} catch (InterruptedException e) {
throw new InterruptedIOException();
@@ -295,6 +299,10 @@ class DataFileAppender {
}
+ public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
+ public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
+ int statIdx = 0;
+ int[] stats = new int[maxStat];
/**
* The async processing loop that writes to the data files and does the
* force calls. Since the file sync() call is the slowest of all the
@@ -376,6 +384,17 @@ class DataFileAppender {
// Now do the 1 big write.
file.seek(wb.offset);
+ if (maxStat > 0) {
+ if (statIdx < maxStat) {
+ stats[statIdx++] = sequence.getLength();
+ } else {
+ long all = 0;
+ for (;statIdx > 0;) {
+ all+= stats[--statIdx];
+ }
+ System.err.println("Ave writeSize: " + all/maxStat);
+ }
+ }
file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
ReplicationTarget replicationTarget = journal.getReplicationTarget();