You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by wt...@apache.org on 2009/04/16 18:56:08 UTC

svn commit: r765686 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java

Author: wtam
Date: Thu Apr 16 16:56:08 2009
New Revision: 765686

URL: http://svn.apache.org/viewvc?rev=765686&view=rev
Log:
[CAMEL-1510] BatchProcessor interrupt has side effects (submitted on behalf of Christopher Hunt)

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=765686&r1=765685&r2=765686&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Thu Apr 16 16:56:08 2009
@@ -18,7 +18,12 @@
 
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
@@ -30,13 +35,12 @@
 import org.apache.camel.util.ServiceHelper;
 
 /**
- * A base class for any kind of {@link Processor} which implements some kind of
- * batch processing.
+ * A base class for any kind of {@link Processor} which implements some kind of batch processing.
  * 
  * @version $Revision$
  */
 public class BatchProcessor extends ServiceSupport implements Processor {
-    
+
     public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
     public static final int DEFAULT_BATCH_SIZE = 100;
 
@@ -82,9 +86,9 @@
     }
 
     /**
-     * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor
-     * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
-     *
+     * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
+     * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
+     * 
      * @param batchSize the size
      */
     public void setBatchSize(int batchSize) {
@@ -96,10 +100,10 @@
     }
 
     /**
-     * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
-     * the completion is triggered. Can for instance be used to ensure that this batch is completed when
-     * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled.
-     *
+     * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
+     * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
+     * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
+     * 
      * @param outBatchSize the size
      */
     public void setOutBatchSize(int outBatchSize) {
@@ -127,16 +131,16 @@
     }
 
     /**
-     * A strategy method to decide if the "in" batch is completed.  That is, whether the resulting 
-     * exchanges in the in queue should be drained to the "out" collection.
+     * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
+     * the in queue should be drained to the "out" collection.
      */
     protected boolean isInBatchCompleted(int num) {
         return num >= batchSize;
     }
-    
+
     /**
-     * A strategy method to decide if the "out" batch is completed. That is, whether the resulting 
-     * exchange in the out collection should be sent.
+     * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
+     * the out collection should be sent.
      */
     protected boolean isOutBatchCompleted() {
         if (outBatchSize == 0) {
@@ -147,9 +151,8 @@
     }
 
     /**
-     * Strategy Method to process an exchange in the batch. This method allows
-     * derived classes to perform custom processing before or after an
-     * individual exchange is processed
+     * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
+     * custom processing before or after an individual exchange is processed
      */
     protected void processExchange(Exchange exchange) throws Exception {
         processor.process(exchange);
@@ -181,53 +184,119 @@
      * Sender thread for queued-up exchanges.
      */
     private class BatchSender extends Thread {
-        
-        private volatile boolean cancelRequested;
 
-        private LinkedBlockingQueue<Exchange> queue;
-        
+        private Queue<Exchange> queue;
+        private Lock queueLock = new ReentrantLock();
+        private boolean exchangeEnqueued;
+        private Condition exchangeEnqueuedCondition = queueLock.newCondition();
+
         public BatchSender() {
             super("Batch Sender");
-            this.queue = new LinkedBlockingQueue<Exchange>();
+            this.queue = new LinkedList<Exchange>();
         }
 
         @Override
         public void run() {
-            while (true) {
-                try {
-                    Thread.sleep(batchTimeout);
-                    queue.drainTo(collection, batchSize);  
-                } catch (InterruptedException e) {
-                    if (cancelRequested) {
-                        return;
-                    }
-                    
-                    while (isInBatchCompleted(queue.size())) {
-                        queue.drainTo(collection, batchSize);  
-                    }
-                    
-                    if (!isOutBatchCompleted()) {
-                        continue;
+            // Wait until one of either:
+            // * an exchange being queued;
+            // * the batch timeout expiring; or
+            // * the thread being cancelled.
+            //
+            // If an exchange is queued then we need to determine whether the
+            // batch is complete. If it is complete then we send out the batched
+            // exchanges. Otherwise we move back into our wait state.
+            //
+            // If the batch times out then we send out the batched exchanges
+            // collected so far.
+            //
+            // If we receive an interrupt then all blocking operations are
+            // interrupted and our thread terminates.
+            //
+            // The goal of the following algorithm in terms of synchronisation
+            // is to provide fine grained locking i.e. retaining the lock only
+            // when required. Special consideration is given to releasing the
+            // lock when calling an overloaded method such as isInBatchComplete,
+            // isOutBatchComplete and around sendExchanges. The latter is
+            // especially important as the process of sending out the exchanges
+            // would otherwise block new exchanges from being queued.
+
+            queueLock.lock();
+            try {
+                do {
+                    try {
+                        if (!exchangeEnqueued) {
+                            exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
+                        }
+
+                        if (!exchangeEnqueued) {
+                            drainQueueTo(collection, batchSize);
+                        } else {             
+                            exchangeEnqueued = false;
+                            while (isInBatchCompleted(queue.size())) {   
+                                drainQueueTo(collection, batchSize);
+                            }
+                            
+                            queueLock.unlock();
+                            try {
+                                if (!isOutBatchCompleted()) {
+                                    continue;
+                                }
+                            } finally {
+                                queueLock.lock();
+                            }
+                        }
+
+                        queueLock.unlock();
+                        try {
+                            try {
+                                sendExchanges();
+                            } catch (Exception e) {
+                                getExceptionHandler().handleException(e);
+                            }
+                        } finally {
+                            queueLock.lock();
+                        }
+
+                    } catch (InterruptedException e) {
+                        break;
                     }
-                }
-                try {
-                    sendExchanges();
-                } catch (Exception e) {
-                    getExceptionHandler().handleException(e);
+
+                } while (true);
+
+            } finally {
+                queueLock.unlock();
+            }
+        }
+
+        /**
+         * This method should be called with queueLock held
+         */
+        private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
+            for (int i = 0; i < batchSize; ++i) {
+                Exchange e = queue.poll();
+                if (e != null) {
+                    collection.add(e);
+                } else {
+                    break;
                 }
             }
         }
-        
+
         public void cancel() {
-            cancelRequested = true;
             interrupt();
         }
-     
+
         public void enqueueExchange(Exchange exchange) {
-            queue.add(exchange);
-            interrupt();
+            queueLock.lock();
+            try {
+                queue.add(exchange);
+                exchangeEnqueued = true;
+                exchangeEnqueuedCondition.signal();
+            } finally {
+                queueLock.unlock();
+            }
         }
-        
+
         private void sendExchanges() throws Exception {
             GroupedExchange grouped = null;
 
@@ -253,5 +322,5 @@
             }
         }
     }
-    
+
 }