You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by wt...@apache.org on 2009/04/16 18:56:08 UTC
svn commit: r765686 -
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Author: wtam
Date: Thu Apr 16 16:56:08 2009
New Revision: 765686
URL: http://svn.apache.org/viewvc?rev=765686&view=rev
Log:
[CAMEL-1510] BatchProcessor interrupt has side effects (submitted on behalf of Christopher Hunt)
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=765686&r1=765685&r2=765686&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Thu Apr 16 16:56:08 2009
@@ -18,7 +18,12 @@
import java.util.Collection;
import java.util.Iterator;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -30,13 +35,12 @@
import org.apache.camel.util.ServiceHelper;
/**
- * A base class for any kind of {@link Processor} which implements some kind of
- * batch processing.
+ * A base class for any kind of {@link Processor} which implements some kind of batch processing.
*
* @version $Revision$
*/
public class BatchProcessor extends ServiceSupport implements Processor {
-
+
public static final long DEFAULT_BATCH_TIMEOUT = 1000L;
public static final int DEFAULT_BATCH_SIZE = 100;
@@ -82,9 +86,9 @@
}
/**
- * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor
- * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
- *
+ * Sets the <b>in</b> batch size. This is the number of incoming exchanges that this batch processor will
+ * process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
+ *
* @param batchSize the size
*/
public void setBatchSize(int batchSize) {
@@ -96,10 +100,10 @@
}
/**
- * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
- * the completion is triggered. Can for instance be used to ensure that this batch is completed when
- * a certain number of exchanges has been collected. By default this feature is <b>not</b> enabled.
- *
+ * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then the
+ * completion is triggered. Can for instance be used to ensure that this batch is completed when a certain
+ * number of exchanges has been collected. By default this feature is <b>not</b> enabled.
+ *
* @param outBatchSize the size
*/
public void setOutBatchSize(int outBatchSize) {
@@ -127,16 +131,16 @@
}
/**
- * A strategy method to decide if the "in" batch is completed. That is, whether the resulting
- * exchanges in the in queue should be drained to the "out" collection.
+ * A strategy method to decide if the "in" batch is completed. That is, whether the resulting exchanges in
+ * the in queue should be drained to the "out" collection.
*/
protected boolean isInBatchCompleted(int num) {
return num >= batchSize;
}
-
+
/**
- * A strategy method to decide if the "out" batch is completed. That is, whether the resulting
- * exchange in the out collection should be sent.
+ * A strategy method to decide if the "out" batch is completed. That is, whether the resulting exchange in
+ * the out collection should be sent.
*/
protected boolean isOutBatchCompleted() {
if (outBatchSize == 0) {
@@ -147,9 +151,8 @@
}
/**
- * Strategy Method to process an exchange in the batch. This method allows
- * derived classes to perform custom processing before or after an
- * individual exchange is processed
+ * Strategy Method to process an exchange in the batch. This method allows derived classes to perform
+ * custom processing before or after an individual exchange is processed
*/
protected void processExchange(Exchange exchange) throws Exception {
processor.process(exchange);
@@ -181,53 +184,119 @@
* Sender thread for queued-up exchanges.
*/
private class BatchSender extends Thread {
-
- private volatile boolean cancelRequested;
- private LinkedBlockingQueue<Exchange> queue;
-
+ private Queue<Exchange> queue;
+ private Lock queueLock = new ReentrantLock();
+ private boolean exchangeEnqueued;
+ private Condition exchangeEnqueuedCondition = queueLock.newCondition();
+
public BatchSender() {
super("Batch Sender");
- this.queue = new LinkedBlockingQueue<Exchange>();
+ this.queue = new LinkedList<Exchange>();
}
@Override
public void run() {
- while (true) {
- try {
- Thread.sleep(batchTimeout);
- queue.drainTo(collection, batchSize);
- } catch (InterruptedException e) {
- if (cancelRequested) {
- return;
- }
-
- while (isInBatchCompleted(queue.size())) {
- queue.drainTo(collection, batchSize);
- }
-
- if (!isOutBatchCompleted()) {
- continue;
+ // Wait until one of either:
+ // * an exchange being queued;
+ // * the batch timeout expiring; or
+ // * the thread being cancelled.
+ //
+ // If an exchange is queued then we need to determine whether the
+ // batch is complete. If it is complete then we send out the batched
+ // exchanges. Otherwise we move back into our wait state.
+ //
+ // If the batch times out then we send out the batched exchanges
+ // collected so far.
+ //
+ // If we receive an interrupt then all blocking operations are
+ // interrupted and our thread terminates.
+ //
+ // The goal of the following algorithm in terms of synchronisation
+ // is to provide fine grained locking i.e. retaining the lock only
+ // when required. Special consideration is given to releasing the
+ // lock when calling an overloaded method such as isInBatchComplete,
+ // isOutBatchComplete and around sendExchanges. The latter is
+ // especially important as the process of sending out the exchanges
+ // would otherwise block new exchanges from being queued.
+
+ queueLock.lock();
+ try {
+ do {
+ try {
+ if (!exchangeEnqueued) {
+ exchangeEnqueuedCondition.await(batchTimeout, TimeUnit.MILLISECONDS);
+ }
+
+ if (!exchangeEnqueued) {
+ drainQueueTo(collection, batchSize);
+ } else {
+ exchangeEnqueued = false;
+ while (isInBatchCompleted(queue.size())) {
+ drainQueueTo(collection, batchSize);
+ }
+
+ queueLock.unlock();
+ try {
+ if (!isOutBatchCompleted()) {
+ continue;
+ }
+ } finally {
+ queueLock.lock();
+ }
+ }
+
+ queueLock.unlock();
+ try {
+ try {
+ sendExchanges();
+ } catch (Exception e) {
+ getExceptionHandler().handleException(e);
+ }
+ } finally {
+ queueLock.lock();
+ }
+
+ } catch (InterruptedException e) {
+ break;
}
- }
- try {
- sendExchanges();
- } catch (Exception e) {
- getExceptionHandler().handleException(e);
+
+ } while (true);
+
+ } finally {
+ queueLock.unlock();
+ }
+ }
+
+ /**
+ * This method should be called with queueLock held
+ */
+ private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
+ for (int i = 0; i < batchSize; ++i) {
+ Exchange e = queue.poll();
+ if (e != null) {
+ collection.add(e);
+ } else {
+ break;
}
}
}
-
+
public void cancel() {
- cancelRequested = true;
interrupt();
}
-
+
public void enqueueExchange(Exchange exchange) {
- queue.add(exchange);
- interrupt();
+ queueLock.lock();
+ try {
+ queue.add(exchange);
+ exchangeEnqueued = true;
+ exchangeEnqueuedCondition.signal();
+ } finally {
+ queueLock.unlock();
+ }
}
-
+
private void sendExchanges() throws Exception {
GroupedExchange grouped = null;
@@ -253,5 +322,5 @@
}
}
}
-
+
}