You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/07/02 19:11:00 UTC

svn commit: r960060 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-core/src/test/java/org/apache/activemq/bugs/ activemq-core/src/test/java/org/apache/activemq/perf/ kahadb/src/main/java/org/apache/kahadb/j...

Author: gtully
Date: Fri Jul  2 17:11:00 2010
New Revision: 960060

URL: http://svn.apache.org/viewvc?rev=960060&view=rev
Log:
improve async task concurrency, contention over indexMutex for transactions and taskMap, not per destination. Added some metrics enabled by system properties to help further diagnosis of journal usage

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jul  2 17:11:00 2010
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -84,10 +85,16 @@ import org.apache.kahadb.page.Transactio
 public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
     static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = 10000;
+    
+    public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
+    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
+    public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
+    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
+
     protected ExecutorService queueExecutor;
     protected ExecutorService topicExecutor;
-    protected final Map<AsyncJobKey, StoreQueueTask> asyncQueueMap = new HashMap<AsyncJobKey, StoreQueueTask>();
-    protected final Map<AsyncJobKey, StoreTopicTask> asyncTopicMap = new HashMap<AsyncJobKey, StoreTopicTask>();
+    protected final List<Map<AsyncJobKey, StoreTask>> asyncQueueMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
+    protected final List<Map<AsyncJobKey, StoreTask>> asyncTopicMaps = new LinkedList<Map<AsyncJobKey, StoreTask>>();
     private final WireFormat wireFormat = new OpenWireFormat();
     private SystemUsage usageManager;
     private LinkedBlockingQueue<Runnable> asyncQueueJobQueue;
@@ -183,7 +190,7 @@ public class KahaDBStore extends Message
         this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
         this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
         this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
-        this.queueExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
+        this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
                 new ThreadFactory() {
                     public Thread newThread(Runnable runnable) {
                         Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
@@ -191,7 +198,7 @@ public class KahaDBStore extends Message
                         return thread;
                     }
                 });
-        this.topicExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
+        this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
                 new ThreadFactory() {
                     public Thread newThread(Runnable runnable) {
                         Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
@@ -208,21 +215,29 @@ public class KahaDBStore extends Message
         if (this.globalQueueSemaphore != null) {
             this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
         }
-        synchronized (this.asyncQueueMap) {
-            for (StoreQueueTask task : this.asyncQueueMap.values()) {
-                task.cancel();
+        synchronized (this.asyncQueueMaps) {
+            for (Map<AsyncJobKey, StoreTask> m : asyncQueueMaps) {
+                synchronized (m) {
+                    for (StoreTask task : m.values()) {
+                        task.cancel();
+                    }
+                }
             }
-            this.asyncQueueMap.clear();
+            this.asyncQueueMaps.clear();
         }
         LOG.info("Stopping async topic tasks");
         if (this.globalTopicSemaphore != null) {
             this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
         }
-        synchronized (this.asyncTopicMap) {
-            for (StoreTopicTask task : this.asyncTopicMap.values()) {
-                task.cancel();
+        synchronized (this.asyncTopicMaps) {
+            for (Map<AsyncJobKey, StoreTask> m : asyncTopicMaps) {
+                synchronized (m) {
+                    for (StoreTask task : m.values()) {
+                        task.cancel();
+                    }
+                }
             }
-            this.asyncTopicMap.clear();
+            this.asyncTopicMaps.clear();
         }
         if (this.globalQueueSemaphore != null) {
             this.globalQueueSemaphore.drainPermits();
@@ -242,30 +257,30 @@ public class KahaDBStore extends Message
 
     protected StoreQueueTask removeQueueTask(KahaDBMessageStore store, MessageId id) {
         StoreQueueTask task = null;
-        synchronized (this.asyncQueueMap) {
-            task = this.asyncQueueMap.remove(new AsyncJobKey(id, store.getDestination()));
+        synchronized (store.asyncTaskMap) {
+            task = (StoreQueueTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
         }
         return task;
     }
 
     protected void addQueueTask(KahaDBMessageStore store, StoreQueueTask task) throws IOException {
-        synchronized (this.asyncQueueMap) {
-            this.asyncQueueMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
+        synchronized (store.asyncTaskMap) {
+            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
         }
         this.queueExecutor.execute(task);
     }
 
-    protected StoreTopicTask removeTopicTask(KahaDBMessageStore store, MessageId id) {
+    protected StoreTopicTask removeTopicTask(KahaDBTopicMessageStore store, MessageId id) {
         StoreTopicTask task = null;
-        synchronized (this.asyncTopicMap) {
-            task = this.asyncTopicMap.remove(new AsyncJobKey(id, store.getDestination()));
+        synchronized (store.asyncTaskMap) {
+            task = (StoreTopicTask) store.asyncTaskMap.remove(new AsyncJobKey(id, store.getDestination()));
         }
         return task;
     }
 
-    protected void addTopicTask(KahaDBMessageStore store, StoreTopicTask task) throws IOException {
-        synchronized (this.asyncTopicMap) {
-            this.asyncTopicMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
+    protected void addTopicTask(KahaDBTopicMessageStore store, StoreTopicTask task) throws IOException {
+        synchronized (store.asyncTaskMap) {
+            store.asyncTaskMap.put(new AsyncJobKey(task.getMessage().getMessageId(), store.getDestination()), task);
         }
         this.topicExecutor.execute(task);
     }
@@ -275,9 +290,12 @@ public class KahaDBStore extends Message
     }
 
     public class KahaDBMessageStore extends AbstractMessageStore {
+        protected final Map<AsyncJobKey, StoreTask> asyncTaskMap = new HashMap<AsyncJobKey, StoreTask>();
         protected KahaDestination dest;
         private final int maxAsyncJobs;
         private final Semaphore localDestinationSemaphore;
+        
+        double doneTasks, canceledTasks = 0;
 
         public KahaDBMessageStore(ActiveMQDestination destination) {
             super(destination);
@@ -309,8 +327,8 @@ public class KahaDBStore extends Message
             if (isConcurrentStoreAndDispatchQueues()) {
                 AsyncJobKey key = new AsyncJobKey(ack.getLastMessageId(), getDestination());
                 StoreQueueTask task = null;
-                synchronized (asyncQueueMap) {
-                    task = asyncQueueMap.get(key);
+                synchronized (asyncTaskMap) {
+                    task = (StoreQueueTask) asyncTaskMap.get(key);
                 }
                 if (task != null) {
                     if (!task.cancel()) {
@@ -324,8 +342,8 @@ public class KahaDBStore extends Message
                         }
                         removeMessage(context, ack);
                     } else {
-                        synchronized (asyncQueueMap) {
-                            asyncQueueMap.remove(key);
+                        synchronized (asyncTaskMap) {
+                            asyncTaskMap.remove(key);
                         }
                     }
                 } else {
@@ -545,6 +563,7 @@ public class KahaDBStore extends Message
         public KahaDBTopicMessageStore(ActiveMQTopic destination) throws IOException {
             super(destination);
             this.subscriptionCount.set(getAllSubscriptions().length);
+            asyncTopicMaps.add(asyncTaskMap);
         }
 
         @Override
@@ -566,15 +585,15 @@ public class KahaDBStore extends Message
             if (isConcurrentStoreAndDispatchTopics()) {
                 AsyncJobKey key = new AsyncJobKey(messageId, getDestination());
                 StoreTopicTask task = null;
-                synchronized (asyncTopicMap) {
-                    task = asyncTopicMap.get(key);
+                synchronized (asyncTaskMap) {
+                    task = (StoreTopicTask) asyncTaskMap.get(key);
                 }
                 if (task != null) {
                     if (task.addSubscriptionKey(subscriptionKey)) {
                         removeTopicTask(this, messageId);
                         if (task.cancel()) {
-                            synchronized (asyncTopicMap) {
-                                asyncTopicMap.remove(key);
+                            synchronized (asyncTaskMap) {
+                                asyncTaskMap.remove(key);
                             }
                         }
                     }
@@ -994,8 +1013,12 @@ public class KahaDBStore extends Message
             return destination.getPhysicalName() + "-" + id;
         }
     }
-
-    class StoreQueueTask implements Runnable {
+    
+    interface StoreTask {
+        public boolean cancel();
+    }
+    
+    class StoreQueueTask implements Runnable, StoreTask {
         protected final Message message;
         protected final ConnectionContext context;
         protected final KahaDBMessageStore store;
@@ -1044,11 +1067,15 @@ public class KahaDBStore extends Message
         }
 
         public void run() {
+            this.store.doneTasks++;
             try {
                 if (this.done.compareAndSet(false, true)) {
                     this.store.addMessage(context, message);
                     removeQueueTask(this.store, this.message.getMessageId());
                     this.future.complete();
+                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
+                    System.err.println(this.store.dest.getName() + " cancelled: " +  (this.store.canceledTasks/this.store.doneTasks) * 100);
+                    this.store.canceledTasks = this.store.doneTasks = 0;
                 }
             } catch (Exception e) {
                 this.future.setException(e);
@@ -1128,6 +1155,7 @@ public class KahaDBStore extends Message
 
         @Override
         public void run() {
+            this.store.doneTasks++;
             try {
                 if (this.done.compareAndSet(false, true)) {
                     this.topicStore.addMessage(context, message);
@@ -1140,6 +1168,9 @@ public class KahaDBStore extends Message
                     }
                     removeTopicTask(this.topicStore, this.message.getMessageId());
                     this.future.complete();
+                } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
+                    System.err.println(this.store.dest.getName() + " cancelled: " +  (this.store.canceledTasks/this.store.doneTasks) * 100);
+                    this.store.canceledTasks = this.store.doneTasks = 0;
                 }
             } catch (Exception e) {
                 this.future.setException(e);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java Fri Jul  2 17:11:00 2010
@@ -367,6 +367,7 @@ public class MessageDatabase extends Ser
      */
     private Location getFirstInProgressTxLocation() {
         Location l = null;
+        synchronized (inflightTransactions) {
         if (!inflightTransactions.isEmpty()) {
             l = inflightTransactions.values().iterator().next().get(0).getLocation();
         }
@@ -376,6 +377,7 @@ public class MessageDatabase extends Ser
                 l = t;
             }
         }
+        }
         return l;
     }
 
@@ -746,7 +748,7 @@ public class MessageDatabase extends Ser
 
     protected void process(final KahaAddMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            synchronized (indexMutex) {
+            synchronized (inflightTransactions) {
                 ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
                 inflightTx.add(new AddOpperation(command, location));
             }
@@ -763,7 +765,7 @@ public class MessageDatabase extends Ser
 
     protected void process(final KahaRemoveMessageCommand command, final Location location) throws IOException {
         if (command.hasTransactionInfo()) {
-            synchronized (indexMutex) {
+            synchronized (inflightTransactions) {
                 ArrayList<Operation> inflightTx = getInflightTx(command.getTransactionInfo(), location);
                 inflightTx.add(new RemoveOpperation(command, location));
             }
@@ -801,16 +803,19 @@ public class MessageDatabase extends Ser
 
     protected void process(KahaCommitCommand command, Location location) throws IOException {
         TransactionId key = key(command.getTransactionInfo());
-        synchronized (indexMutex) {
-            ArrayList<Operation> inflightTx = inflightTransactions.remove(key);
+        ArrayList<Operation> inflightTx = null;
+        synchronized (inflightTransactions) {
+            inflightTx = inflightTransactions.remove(key);
             if (inflightTx == null) {
                 inflightTx = preparedTransactions.remove(key);
             }
-            if (inflightTx == null) {
-                return;
-            }
+        }
+        if (inflightTx == null) {
+            return;
+        }
 
-            final ArrayList<Operation> messagingTx = inflightTx;
+        final ArrayList<Operation> messagingTx = inflightTx;
+        synchronized (indexMutex) {
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
                     for (Operation op : messagingTx) {
@@ -822,7 +827,7 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaPrepareCommand command, Location location) {
-        synchronized (indexMutex) {
+        synchronized (inflightTransactions) {
             TransactionId key = key(command.getTransactionInfo());
             ArrayList<Operation> tx = inflightTransactions.remove(key);
             if (tx != null) {
@@ -832,7 +837,7 @@ public class MessageDatabase extends Ser
     }
 
     protected void process(KahaRollbackCommand command, Location location) {
-        synchronized (indexMutex) {
+        synchronized (inflightTransactions) {
             TransactionId key = key(command.getTransactionInfo());
             ArrayList<Operation> tx = inflightTransactions.remove(key);
             if (tx == null) {

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java Fri Jul  2 17:11:00 2010
@@ -86,8 +86,7 @@ public class AMQ2149Test extends TestCas
     
     public void createBroker(Configurer configurer) throws Exception {
         broker = new BrokerService();
-        AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
-        persistenceFactory.setDataDirectory(dataDirFile);
+        configurePersistenceAdapter(broker);
         
         SystemUsage usage = new SystemUsage();
         MemoryUsage memoryUsage = new MemoryUsage();
@@ -95,7 +94,7 @@ public class AMQ2149Test extends TestCas
         usage.setMemoryUsage(memoryUsage);
         broker.setSystemUsage(usage);
         
-        broker.setPersistenceFactory(persistenceFactory);
+        
 
         broker.addConnector(BROKER_CONNECTOR);        
         broker.setBrokerName(getName());
@@ -106,6 +105,12 @@ public class AMQ2149Test extends TestCas
         broker.start();
     }
     
+    protected void configurePersistenceAdapter(BrokerService brokerService) {
+        AMQPersistenceAdapterFactory persistenceFactory = new AMQPersistenceAdapterFactory();
+        persistenceFactory.setDataDirectory(dataDirFile);
+        brokerService.setPersistenceFactory(persistenceFactory);
+    }
+
     public void setUp() throws Exception {
         dataDirFile = new File("target/"+ getName());
         numtoSend = DEFAULT_NUM_TO_SEND;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java Fri Jul  2 17:11:00 2010
@@ -47,5 +47,6 @@ public class KahaDBDurableTopicTest exte
         answer.setDeleteAllMessagesOnStartup(true);
         answer.addConnector(uri);
         answer.setUseShutdownHook(false);
+        answer.setEnableStatistics(false);
     }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java?rev=960060&r1=960059&r2=960060&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/journal/DataFileAppender.java Fri Jul  2 17:11:00 2010
@@ -257,7 +257,11 @@ class DataFileAppender {
 	                    // Otherwise wait for the queuedCommand to be null
 	                    try {
 	                        while (nextWriteBatch != null) {
+	                            final long start = System.currentTimeMillis();
 	                            enqueueMutex.wait();
+	                            if (maxStat > 0) { 
+	                                System.err.println("Watiting for write to finish with full batch... millis: " + (System.currentTimeMillis() - start));
+	                            }
 	                        }
 	                    } catch (InterruptedException e) {
 	                        throw new InterruptedIOException();
@@ -295,6 +299,10 @@ class DataFileAppender {
 
     }
 
+    public static final String PROPERTY_LOG_WRITE_STAT_WINDOW = "org.apache.kahadb.journal.appender.WRITE_STAT_WINDOW";
+    public static final int maxStat = Integer.parseInt(System.getProperty(PROPERTY_LOG_WRITE_STAT_WINDOW, "0"));
+    int statIdx = 0;
+    int[] stats = new int[maxStat];
     /**
      * The async processing loop that writes to the data files and does the
      * force calls. Since the file sync() call is the slowest of all the
@@ -376,6 +384,17 @@ class DataFileAppender {
 
                 // Now do the 1 big write.
                 file.seek(wb.offset);
+                if (maxStat > 0) {
+                    if (statIdx < maxStat) {
+                        stats[statIdx++] = sequence.getLength();
+                    } else {
+                        long all = 0;
+                        for (;statIdx > 0;) {
+                            all+= stats[--statIdx];
+                        }
+                        System.err.println("Ave writeSize: " + all/maxStat);
+                    }
+                }
                 file.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
                 
                 ReplicationTarget replicationTarget = journal.getReplicationTarget();