You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/21 11:02:55 UTC

[2/2] camel git commit: CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes.

CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cf288b6a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cf288b6a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cf288b6a

Branch: refs/heads/master
Commit: cf288b6a1ce8828ea500077f55161c67c4d95a33
Parents: e58534a
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Feb 21 10:24:31 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Feb 21 11:02:47 2016 +0100

----------------------------------------------------------------------
 .../component/sjms/batch/SessionCompletion.java |   6 +-
 .../component/sjms/batch/SjmsBatchConsumer.java | 149 ++++++++++++++-----
 .../component/sjms/batch/SjmsBatchEndpoint.java |  69 +++++++--
 3 files changed, 176 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
index cae90cb..f2a7e69 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -41,8 +41,7 @@ class SessionCompletion implements Synchronization {
             LOG.debug("Committing");
             session.commit();
         } catch (JMSException ex) {
-            LOG.error("Exception caught while committing: {}", ex.getMessage());
-            exchange.setException(ex);
+            LOG.warn("Exception caught while committing JMS session", ex);
         }
     }
 
@@ -52,8 +51,7 @@ class SessionCompletion implements Synchronization {
             LOG.debug("Rolling back");
             session.rollback();
         } catch (JMSException ex) {
-            LOG.error("Exception caught while rolling back: {}", ex.getMessage());
-            exchange.setException(ex);
+            LOG.warn("Exception caught while rolling back JMS session", ex);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 505aa07..5316664 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,11 +16,10 @@
  */
 package org.apache.camel.component.sjms.batch;
 
-import java.io.PrintWriter;
-import java.io.StringWriter;
 import java.util.Date;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -45,6 +44,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class SjmsBatchConsumer extends DefaultConsumer {
+
+    public static final String SJMS_BATCH_TIMEOUT_CHECKER = "SJmsBatchTimeoutChecker";
+
     private static final boolean TRANSACTED = true;
     private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
 
@@ -53,9 +55,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
     private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong();
     private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong();
 
+    private ScheduledExecutorService timeoutCheckerExecutorService;
+    private boolean shutdownTimeoutCheckerExecutorService;
+    private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
+
     private final SjmsBatchEndpoint sjmsBatchEndpoint;
     private final AggregationStrategy aggregationStrategy;
     private final int completionSize;
+    private final int completionInterval;
     private final int completionTimeout;
     private final int consumerCount;
     private final int pollDuration;
@@ -76,7 +83,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
 
         completionSize = sjmsBatchEndpoint.getCompletionSize();
+        completionInterval = sjmsBatchEndpoint.getCompletionInterval();
         completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
+        if (completionInterval > 0 && completionTimeout != SjmsBatchEndpoint.DEFAULT_COMPLETION_TIMEOUT) {
+            throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
+        }
         pollDuration = sjmsBatchEndpoint.getPollDuration();
         if (pollDuration < 0) {
             throw new IllegalArgumentException("pollDuration must be 0 or greater");
@@ -98,18 +109,21 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         return sjmsBatchEndpoint;
     }
 
+    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
+        return timeoutCheckerExecutorService;
+    }
+
+    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
+        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
+    }
+
     @Override
     protected void doStart() throws Exception {
         super.doStart();
 
         // start up a shared connection
-        try {
-            connection = connectionFactory.createConnection();
-            connection.start();
-        } catch (JMSException ex) {
-            LOG.error("Exception caught closing connection: {}", getStackTrace(ex));
-            return;
-        }
+        connection = connectionFactory.createConnection();
+        connection.start();
 
         if (LOG.isInfoEnabled()) {
             LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize);
@@ -121,12 +135,24 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         for (int i = 0; i < consumerCount; i++) {
             jmsConsumerExecutors.execute(new BatchConsumptionLoop());
         }
+
+        if (completionInterval > 0) {
+            LOG.info("Using CompletionInterval to run every " + completionInterval + " millis.");
+            if (timeoutCheckerExecutorService == null) {
+                setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1));
+                shutdownTimeoutCheckerExecutorService = true;
+            }
+            // trigger completion based on interval
+            timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(completionTimeoutTrigger), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
+        }
+
     }
 
     @Override
     protected void doStop() throws Exception {
         super.doStop();
         running.set(false);
+
         CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
         if (consumersShutdownLatch != null) {
             LOG.info("Stop signalled, waiting on consumers to shut down");
@@ -142,17 +168,40 @@ public class SjmsBatchConsumer extends DefaultConsumer {
         try {
             LOG.debug("Shutting down JMS connection");
             connection.close();
-        } catch (JMSException jex) {
-            LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
+        } catch (Exception e) {
+            LOG.warn("Exception caught closing JMS connection", e);
         }
 
         getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(jmsConsumerExecutors);
+        jmsConsumerExecutors = null;
+
+        if (shutdownTimeoutCheckerExecutorService) {
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+            timeoutCheckerExecutorService = null;
+        }
     }
 
-    private String getStackTrace(Exception ex) {
-        StringWriter writer = new StringWriter();
-        ex.printStackTrace(new PrintWriter(writer));
-        return writer.toString();
+    /**
+     * Background task that triggers completion based on interval.
+     */
+    private final class CompletionIntervalTask implements Runnable {
+
+        private final AtomicBoolean timeoutInterval;
+
+        public CompletionIntervalTask(AtomicBoolean timeoutInterval) {
+            this.timeoutInterval = timeoutInterval;
+        }
+
+        public void run() {
+            // only run if CamelContext has been fully started
+            if (!getEndpoint().getCamelContext().getStatus().isStarted()) {
+                LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", getEndpoint().getCamelContext().getName());
+                return;
+            }
+
+            // signal
+            timeoutInterval.set(true);
+        }
     }
 
     private class BatchConsumptionLoop implements Runnable {
@@ -168,24 +217,32 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     MessageConsumer consumer = session.createConsumer(queue);
 
                     try {
-                        consumeBatchesOnLoop(session, consumer);
+                        consumeBatchesOnLoop(session, consumer, completionTimeoutTrigger);
                     } finally {
                         try {
                             consumer.close();
                         } catch (JMSException ex2) {
-                            log.error("Exception caught closing consumer: {}", ex2.getMessage());
+                            // only include stacktrace in debug logging
+                            if (log.isDebugEnabled()) {
+                                log.debug("Exception caught closing consumer", ex2);
+                            }
+                            log.warn("Exception caught closing consumer: {}", ex2.getMessage());
                         }
                     }
                 } finally {
                     try {
                         session.close();
                     } catch (JMSException ex1) {
-                        log.error("Exception caught closing session: {}", ex1.getMessage());
+                        // only include stacktrace in debug logging
+                        if (log.isDebugEnabled()) {
+                            log.debug("Exception caught closing session: {}", ex1);
+                        }
+                        log.warn("Exception caught closing session: {}", ex1.getMessage());
                     }
                 }
             } catch (JMSException ex) {
                 // from loop
-                LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex));
+                LOG.warn("Exception caught consuming from " + destinationName, ex);
             } finally {
                 // indicate that we have shut down
                 CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
@@ -193,20 +250,23 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             }
         }
 
-        private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
+        private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer, final AtomicBoolean timeoutInterval) throws JMSException {
             final boolean usingTimeout = completionTimeout > 0;
 
         batchConsumption:
             while (running.get()) {
+                // reset the state
+                boolean timeout = false;
                 int messageCount = 0;
-
-                // reset the clock counters
                 long timeElapsed = 0;
                 long startTime = 0;
                 Exchange aggregatedExchange = null;
 
             batch:
-                while ((completionSize <= 0) || (messageCount < completionSize)) {
+                // loop while no timeout or interval triggered and while we have room still for messages in the batch
+                while (!timeout && !timeoutInterval.compareAndSet(true, false)
+                        && (usingTimeout || (completionSize > 0 && messageCount < completionSize))) {
+
                     // check periodically to see whether we should be shutting down
                     long waitTime = (usingTimeout && (timeElapsed > 0))
                             ? getReceiveWaitTime(timeElapsed)
@@ -222,10 +282,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                 startTime = new Date().getTime(); // start counting down the period for this batch
                             }
                             messageCount++;
-                            LOG.debug("Message received: {}", messageCount);
-                            if ((message instanceof ObjectMessage)
-                                    || (message instanceof TextMessage)) {
-
+                            LOG.debug("#{} messages received", messageCount);
+                            // TODO: why only object or text messages?
+                            if (message instanceof ObjectMessage || message instanceof TextMessage) {
                                 final Exchange exchange = getEndpoint().createExchange(message, session);
                                 aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
                                 aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
@@ -241,7 +300,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
                             if (timeElapsed > completionTimeout) {
                                 // batch finished by timeout
-                                break batch;
+                                timeout = true;
                             }
                         }
 
@@ -250,8 +309,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         session.rollback();
                         break batchConsumption;
                     }
-                } // batch
-                process(aggregatedExchange, session);
+                }
+
+                // batch
+                if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) {
+                    processEmptyMessage();
+                } else if (aggregatedExchange != null) {
+                    processBatch(aggregatedExchange, session);
+                }
             }
         }
 
@@ -268,9 +333,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely
                 timeRemaining = 1;
             }
-            final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining;
+            final long waitTime = Math.min(timeRemaining, pollDuration);
 
-            LOG.debug("waiting for {}", waitTime);
+            LOG.trace("Waiting for {}", waitTime);
             return waitTime;
         }
 
@@ -282,7 +347,23 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             return timeRemaining;
         }
 
-        private void process(Exchange exchange, Session session) {
+        /**
+         * No messages in batch so send an empty message instead.
+         */
+        private void processEmptyMessage() {
+            Exchange exchange = getEndpoint().createExchange();
+            log.debug("Sending empty message as there were no messages from polling: {}", getEndpoint());
+            try {
+                getProcessor().process(exchange);
+            } catch (Exception e) {
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+            }
+        }
+
+        /**
+         * Send an message with the batches messages.
+         */
+        private void processBatch(Exchange exchange, Session session) {
             int id = BATCH_COUNT.getAndIncrement();
             int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
@@ -296,7 +377,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 long total = MESSAGE_PROCESSED.addAndGet(batchSize);
                 LOG.debug("Completed processing[{}]:total={}", id, total);
             } catch (Exception e) {
-                LOG.error("Error processing exchange: {}", e.getMessage());
+                getExceptionHandler().handleException("Error processing exchange", exchange, e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 790f1ef..d989e08 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.sjms.batch;
 
+import java.util.concurrent.ScheduledExecutorService;
 import javax.jms.Message;
 import javax.jms.Session;
 
@@ -55,6 +56,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
 
     @UriPath @Metadata(required = "true")
     private String destinationName;
+    @UriParam @Metadata(required = "true")
+    private AggregationStrategy aggregationStrategy;
     @UriParam(defaultValue = "1")
     private int consumerCount = 1;
     @UriParam(defaultValue = "200")
@@ -62,22 +65,25 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     @UriParam(defaultValue = "500")
     private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
     @UriParam(defaultValue = "1000")
-    private int pollDuration = 1000;
-    @UriParam @Metadata(required = "true")
-    private AggregationStrategy aggregationStrategy;
+    private int completionInterval;
     @UriParam
-    private HeaderFilterStrategy headerFilterStrategy;
+    private boolean sendEmptyMessageWhenIdle;
+    @UriParam(defaultValue = "1000")
+    private int pollDuration = 1000;
     @UriParam
     private boolean includeAllJMSXProperties;
     @UriParam(defaultValue = "true")
     private boolean allowNullBody = true;
     @UriParam(defaultValue = "true")
     private boolean mapJmsMessage = true;
-    @UriParam
+    @UriParam(label = "advanced")
+    private HeaderFilterStrategy headerFilterStrategy;
+    @UriParam(label = "advanced")
     private MessageCreatedStrategy messageCreatedStrategy;
-    @UriParam
+    @UriParam(label = "advanced")
     private JmsKeyFormatStrategy jmsKeyFormatStrategy;
-
+    @UriParam(label = "advanced")
+    private ScheduledExecutorService timeoutCheckerExecutorService;
 
     public SjmsBatchEndpoint() {
     }
@@ -100,12 +106,15 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
 
     @Override
     public Producer createProducer() throws Exception {
-        throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName());
+        throw new UnsupportedOperationException("Producer not supported");
     }
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        return new SjmsBatchConsumer(this, processor);
+        SjmsBatchConsumer consumer = new SjmsBatchConsumer(this, processor);
+        consumer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService);
+        configureConsumer(consumer);
+        return consumer;
     }
 
     public Exchange createExchange(Message message, Session session) {
@@ -181,12 +190,41 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
     }
 
     /**
-     * The timeout from receipt of the first first message when the batch will be completed
+     * The timeout in millis from receipt of the first first message when the batch will be completed.
+     * The batch may be empty if the timeout triggered and there was no messages in the batch.
+     * <br/>
+     * Notice you cannot use both completion timeout and completion interval at the same time, only one can be configured.
      */
     public void setCompletionTimeout(int completionTimeout) {
         this.completionTimeout = completionTimeout;
     }
 
+    public int getCompletionInterval() {
+        return completionInterval;
+    }
+
+    /**
+     * The completion interval in millis, which causes batches to be completed in a scheduled fixed rate every interval.
+     * The batch may be empty if the timeout triggered and there was no messages in the batch.
+     * <br/>
+     * Notice you cannot use both completion timeout and completion interval at the same time, only one can be configured.
+     */
+    public void setCompletionInterval(int completionInterval) {
+        this.completionInterval = completionInterval;
+    }
+
+    public boolean isSendEmptyMessageWhenIdle() {
+        return sendEmptyMessageWhenIdle;
+    }
+
+    /**
+     * If using completion timeout or interval, then the batch may be empty if the timeout triggered and there was no messages in the batch.
+     * If this option is <tt>true</tt> and the batch is empty then an empty message is added to the batch so an empty message is routed.
+     */
+    public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
+        this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
+    }
+
     public int getPollDuration() {
         return pollDuration;
     }
@@ -280,4 +318,15 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
         this.includeAllJMSXProperties = includeAllJMSXProperties;
     }
 
+    public ScheduledExecutorService getTimeoutCheckerExecutorService() {
+        return timeoutCheckerExecutorService;
+    }
+
+    /**
+     * If using the completionInterval option a background thread is created to trigger the completion interval.
+     * Set this option to provide a custom thread pool to be used rather than creating a new thread for every consumer.
+     */
+    public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
+        this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
+    }
 }