You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by "William Tam (JIRA)" <ji...@apache.org> on 2009/04/15 07:03:31 UTC

[jira] Issue Comment Edited: (CAMEL-1510) BatchProcessor interrupt has side effects

    [ https://issues.apache.org/activemq/browse/CAMEL-1510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=51203#action_51203 ] 

William Tam edited comment on CAMEL-1510 at 4/14/09 10:02 PM:
--------------------------------------------------------------

@Martin
Let me try to answer your question regarding InBatchCompleted(),  the original patch from CAMEL-1037 has an issue.  (I thought it had anyway.)   Suppose the batch size is a very small number (say 2).  Someone can send a large number (say 1000) of messages to the BatchProcessor in a short period of time.  It can cause the queue size to become much greater than the batch size. The reason is that the enqueueExchange only interrupts the Sender thread if it is sleeping.  If the Sender thread is not sleeping it only drain 2 messages from the queue.  The queue can back up pretty easily.   When that happens, (hundreds of) messages are stuck on the queue until batchTimeout expires.  However, it only drains 2 messages (batchSize) for each batchTimeout.  Most of messages in the queue really should have been sent.   It should have at most 1 message left in the queue in this example.

The "while (isInBatchCompleted(queue.size()" is to address the issue.  We actually introduced new parameters InBatchSize and OutBatchSize.  InBatchSize is how big the queue can grow before draining the messages to the collection.  OutBatchSize is how big the collection can grow before messages are sent.  

BTW, setting the OutBatchSize would allow you get the behavior in the original patch.



      was (Author: wtam):
    @Martin
Let me try to answer your question regarding InBatchCompleted(),  the original patch from CAMEL-1037 has an issue.   Suppose the batch size is a very small number (say 2).  Someone can send a large number (say 1000) of messages to the BatchProcessor in a short period of time.  It can cause the queue size to become much greater than the batch size. The reason is that the enqueueExchange only interrupts the Sender thread if it is sleeping.  If the Sender thread is not sleeping it only drain 2 messages from the queue.  The queue can back up pretty easily.   When that happens, messages are stuck on the queue until batchTimeout expires.  However, it only drains 2 messages (batchSize) for each batchTimeout.  

The "while (isInBatchCompleted(queue.size()" is a solution for that issue.  We actually introduced new parameters InBatchSize and OutBatchSize.  InBatchSize is how big the queue can grow before draining the messages to the collection.  OutBatchSize is how big the collection can grow before messages are sent. 


  
> BatchProcessor interrupt has side effects
> -----------------------------------------
>
>                 Key: CAMEL-1510
>                 URL: https://issues.apache.org/activemq/browse/CAMEL-1510
>             Project: Apache Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 1.6.0, 2.0-M1
>         Environment: Mac OS X
>            Reporter: Christopher Hunt
>            Priority: Critical
>
> I have noticed that the BatchProcessor class uses the Thread class interrupt method to wake the run loop from sleeping within the enqueueExchange method.
> The unfortunate side effect of this is that if the run loop is in the middle of processing exchanges, and the processing involves something slow like establishing a JMS connection over SSL or queuing to an asynchronous processor, then the processing can become interrupted. The consequence of this side effect is that the batch sender thread rarely gets the opportunity to complete properly and exceptions regarding the interrupt are thrown.
> This all became apparent during some performance testing that resulted in continuously adding exchanges to the aggregator, the threshold becoming reached, and then trying to enqueue the aggregated result to a JMS queue.
> If my analysis of the BatchProcessor is correct then I would recommend finer grained concurrency controls being used instead of relying upon interrupting a thread. Perhaps something like the following (untested) re-write of the sender:
> {code}
>     private class BatchSender extends Thread {
>         private Queue<Exchange> queue;
>         private boolean exchangeQueued = false;
>         private Lock queueMutex = new ReentrantLock();
>         private Condition queueCondition = queueMutex.newCondition();
>         public BatchSender() {
>             super("Batch Sender");
>             this.queue = new LinkedList<Exchange>();
>         }
>         public void cancel() {
>             interrupt();
>         }
>         private void drainQueueTo(Collection<Exchange> collection, int batchSize) {
>             for (int i = 0; i < batchSize; ++i) {
>                 Exchange e = queue.poll();
>                 if (e != null) {
>                     collection.add(e);
>                 } else {
>                     break;
>                 }
>             }
>         }
>         public void enqueueExchange(Exchange exchange) {
>             queueMutex.lock();
>             try {
>                 queue.add(exchange);
>                 exchangeQueued = true;
>                 queueCondition.signal();
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         @Override
>         public void run() {
>             queueMutex.lock();
>             try {
>                 do {
>                     try {
>                         if (!exchangeQueued) {
>                             queueCondition.await(batchTimeout,
>                                     TimeUnit.MILLISECONDS);
>                             if (!exchangeQueued) {
>                                 drainQueueTo(collection, batchSize);
>                             }
>                         }
>                         if (exchangeQueued) {
>                             exchangeQueued = false;
>                             queueMutex.unlock();
>                             try {
>                                 while (isInBatchCompleted(queue.size())) {
>                                     queueMutex.lock();
>                                     try {
>                                         drainQueueTo(collection, batchSize);
>                                     } finally {
>                                         queueMutex.unlock();
>                                     }
>                                 }
>                                 if (!isOutBatchCompleted()) {
>                                     continue;
>                                 }
>                             } finally {
>                                 queueMutex.lock();
>                             }
>                         }
>                         queueMutex.unlock();
>                         try {
>                             try {
>                                 sendExchanges();
>                             } catch (Exception e) {
>                                 getExceptionHandler().handleException(e);
>                             }
>                         } finally {
>                             queueMutex.lock();
>                         }
>                     } catch (InterruptedException e) {
>                         break;
>                     }
>                 } while (true);
>             } finally {
>                 queueMutex.unlock();
>             }
>         }
>         private void sendExchanges() throws Exception {
>             Iterator<Exchange> iter = collection.iterator();
>             while (iter.hasNext()) {
>                 Exchange exchange = iter.next();
>                 iter.remove();
>                 processExchange(exchange);
>             }
>         }
>     }
> {code}
> I have replaced the concurrent queue with a regular linked list and mutexed its access. In addition any queuing of exchanges is noted. This should result in less locking.
> The main change though is that queuing an exchange does not interrupt the batch sender's current activity.
> I hope that this sample is useful.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.