You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/07/05 19:18:10 UTC

svn commit: r960645 - /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Author: rajdavies
Date: Mon Jul  5 17:18:10 2010
New Revision: 960645

URL: http://svn.apache.org/viewvc?rev=960645&view=rev
Log:
synchronization around creation of preparedTransactions

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=960645&r1=960644&r2=960645&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Mon Jul  5 17:18:10 2010
@@ -82,14 +82,16 @@ import org.apache.commons.logging.LogFac
 import org.apache.kahadb.journal.Location;
 import org.apache.kahadb.page.Transaction;
 
-public class KahaDBStore extends MessageDatabase implements PersistenceAdapter{
+public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
     static final Log LOG = LogFactory.getLog(KahaDBStore.class);
     private static final int MAX_ASYNC_JOBS = 10000;
-    
+
     public static final String PROPERTY_CANCELED_TASK_MOD_METRIC = "org.apache.activemq.store.kahadb.CANCELED_TASK_MOD_METRIC";
-    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
+    public static final int cancelledTaskModMetric = Integer.parseInt(System.getProperty(
+            PROPERTY_CANCELED_TASK_MOD_METRIC, "0"), 10);
     public static final String PROPERTY_ASYNC_EXECUTOR_MAX_THREADS = "org.apache.activemq.store.kahadb.ASYNC_EXECUTOR_MAX_THREADS";
-    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
+    private static final int asyncExecutorMaxThreads = Integer.parseInt(System.getProperty(
+            PROPERTY_ASYNC_EXECUTOR_MAX_THREADS, "1"), 10);;
 
     protected ExecutorService queueExecutor;
     protected ExecutorService topicExecutor;
@@ -151,7 +153,7 @@ public class KahaDBStore extends Message
     public void setConcurrentStoreAndDispatchTopics(boolean concurrentStoreAndDispatch) {
         this.concurrentStoreAndDispatchTopics = concurrentStoreAndDispatch;
     }
-    
+
     /**
      * @return the concurrentStoreAndDispatchTransactions
      */
@@ -160,7 +162,8 @@ public class KahaDBStore extends Message
     }
 
     /**
-     * @param concurrentStoreAndDispatchTransactions the concurrentStoreAndDispatchTransactions to set
+     * @param concurrentStoreAndDispatchTransactions
+     *            the concurrentStoreAndDispatchTransactions to set
      */
     public void setConcurrentStoreAndDispatchTransactions(boolean concurrentStoreAndDispatchTransactions) {
         this.concurrentStoreAndDispatchTransactions = concurrentStoreAndDispatchTransactions;
@@ -179,9 +182,6 @@ public class KahaDBStore extends Message
     public void setMaxAsyncJobs(int maxAsyncJobs) {
         this.maxAsyncJobs = maxAsyncJobs;
     }
-    
-    
-
 
     @Override
     public void doStart() throws Exception {
@@ -190,16 +190,16 @@ public class KahaDBStore extends Message
         this.globalTopicSemaphore = new Semaphore(getMaxAsyncJobs());
         this.asyncQueueJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
         this.asyncTopicJobQueue = new LinkedBlockingQueue<Runnable>(getMaxAsyncJobs());
-        this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncQueueJobQueue,
-                new ThreadFactory() {
+        this.queueExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
+                asyncQueueJobQueue, new ThreadFactory() {
                     public Thread newThread(Runnable runnable) {
                         Thread thread = new Thread(runnable, "ConcurrentQueueStoreAndDispatch");
                         thread.setDaemon(true);
                         return thread;
                     }
                 });
-        this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS, asyncTopicJobQueue,
-                new ThreadFactory() {
+        this.topicExecutor = new ThreadPoolExecutor(1, asyncExecutorMaxThreads, 0L, TimeUnit.MILLISECONDS,
+                asyncTopicJobQueue, new ThreadFactory() {
                     public Thread newThread(Runnable runnable) {
                         Thread thread = new Thread(runnable, "ConcurrentTopicStoreAndDispatch");
                         thread.setDaemon(true);
@@ -210,7 +210,7 @@ public class KahaDBStore extends Message
 
     @Override
     public void doStop(ServiceStopper stopper) throws Exception {
-        //drain down async jobs
+        // drain down async jobs
         LOG.info("Stopping async queue tasks");
         if (this.globalQueueSemaphore != null) {
             this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
@@ -294,7 +294,7 @@ public class KahaDBStore extends Message
         protected KahaDestination dest;
         private final int maxAsyncJobs;
         private final Semaphore localDestinationSemaphore;
-        
+
         double doneTasks, canceledTasks = 0;
 
         public KahaDBMessageStore(ActiveMQDestination destination) {
@@ -697,14 +697,15 @@ public class KahaDBStore extends Message
                             if (selector != null) {
                                 selectorExpression = SelectorParser.parse(selector);
                             }
-                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator.hasNext();) {
+                            for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, cursorPos); iterator
+                                    .hasNext();) {
                                 Entry<Long, MessageKeys> entry = iterator.next();
-                                if (selectorExpression != null) { 
-                                        MessageEvaluationContext ctx = new MessageEvaluationContext();
-                                        ctx.setMessageReference(loadMessage(entry.getValue().location));
-                                        if (selectorExpression.matches(ctx)) {
-                                            counter++;
-                                        }
+                                if (selectorExpression != null) {
+                                    MessageEvaluationContext ctx = new MessageEvaluationContext();
+                                    ctx.setMessageReference(loadMessage(entry.getValue().location));
+                                    if (selectorExpression.matches(ctx)) {
+                                        counter++;
+                                    }
                                 } else {
                                     counter++;
                                 }
@@ -913,9 +914,11 @@ public class KahaDBStore extends Message
         KahaTransactionInfo rc = new KahaTransactionInfo();
 
         // Link it up to the previous record that was part of the transaction.
-        ArrayList<Operation> tx = inflightTransactions.get(txid);
-        if (tx != null) {
-            rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
+        synchronized (inflightTransactions) {
+            ArrayList<Operation> tx = inflightTransactions.get(txid);
+            if (tx != null) {
+                rc.setPreviousEntry(convert(tx.get(tx.size() - 1).location));
+            }
         }
 
         if (txid.isLocalTransaction()) {
@@ -1013,11 +1016,11 @@ public class KahaDBStore extends Message
             return destination.getPhysicalName() + "-" + id;
         }
     }
-    
+
     interface StoreTask {
         public boolean cancel();
     }
-    
+
     class StoreQueueTask implements Runnable, StoreTask {
         protected final Message message;
         protected final ConnectionContext context;
@@ -1074,7 +1077,8 @@ public class KahaDBStore extends Message
                     removeQueueTask(this.store, this.message.getMessageId());
                     this.future.complete();
                 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
-                    System.err.println(this.store.dest.getName() + " cancelled: " +  (this.store.canceledTasks/this.store.doneTasks) * 100);
+                    System.err.println(this.store.dest.getName() + " cancelled: "
+                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
                     this.store.canceledTasks = this.store.doneTasks = 0;
                 }
             } catch (Exception e) {
@@ -1163,13 +1167,14 @@ public class KahaDBStore extends Message
                     synchronized (this.subscriptionKeys) {
                         for (String key : this.subscriptionKeys) {
                             this.topicStore.doAcknowledge(context, key, this.message.getMessageId());
-                            
+
                         }
                     }
                     removeTopicTask(this.topicStore, this.message.getMessageId());
                     this.future.complete();
                 } else if (cancelledTaskModMetric > 0 && this.store.canceledTasks++ % cancelledTaskModMetric == 0) {
-                    System.err.println(this.store.dest.getName() + " cancelled: " +  (this.store.canceledTasks/this.store.doneTasks) * 100);
+                    System.err.println(this.store.dest.getName() + " cancelled: "
+                            + (this.store.canceledTasks / this.store.doneTasks) * 100);
                     this.store.canceledTasks = this.store.doneTasks = 0;
                 }
             } catch (Exception e) {