You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/21 11:02:55 UTC
[2/2] camel git commit: CAMEL-9239: camel-sjms - Add
completionInterval to batch consumer. Polished the code and fixed some
mistakes.
CAMEL-9239: camel-sjms - Add completionInterval to batch consumer. Polished the code and fixed some mistakes.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/cf288b6a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/cf288b6a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/cf288b6a
Branch: refs/heads/master
Commit: cf288b6a1ce8828ea500077f55161c67c4d95a33
Parents: e58534a
Author: Claus Ibsen <da...@apache.org>
Authored: Sun Feb 21 10:24:31 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun Feb 21 11:02:47 2016 +0100
----------------------------------------------------------------------
.../component/sjms/batch/SessionCompletion.java | 6 +-
.../component/sjms/batch/SjmsBatchConsumer.java | 149 ++++++++++++++-----
.../component/sjms/batch/SjmsBatchEndpoint.java | 69 +++++++--
3 files changed, 176 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
index cae90cb..f2a7e69 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SessionCompletion.java
@@ -41,8 +41,7 @@ class SessionCompletion implements Synchronization {
LOG.debug("Committing");
session.commit();
} catch (JMSException ex) {
- LOG.error("Exception caught while committing: {}", ex.getMessage());
- exchange.setException(ex);
+ LOG.warn("Exception caught while committing JMS session", ex);
}
}
@@ -52,8 +51,7 @@ class SessionCompletion implements Synchronization {
LOG.debug("Rolling back");
session.rollback();
} catch (JMSException ex) {
- LOG.error("Exception caught while rolling back: {}", ex.getMessage());
- exchange.setException(ex);
+ LOG.warn("Exception caught while rolling back JMS session", ex);
}
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index 505aa07..5316664 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -16,11 +16,10 @@
*/
package org.apache.camel.component.sjms.batch;
-import java.io.PrintWriter;
-import java.io.StringWriter;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -45,6 +44,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SjmsBatchConsumer extends DefaultConsumer {
+
+ public static final String SJMS_BATCH_TIMEOUT_CHECKER = "SJmsBatchTimeoutChecker";
+
private static final boolean TRANSACTED = true;
private static final Logger LOG = LoggerFactory.getLogger(SjmsBatchConsumer.class);
@@ -53,9 +55,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
private static final AtomicLong MESSAGE_RECEIVED = new AtomicLong();
private static final AtomicLong MESSAGE_PROCESSED = new AtomicLong();
+ private ScheduledExecutorService timeoutCheckerExecutorService;
+ private boolean shutdownTimeoutCheckerExecutorService;
+ private final AtomicBoolean completionTimeoutTrigger = new AtomicBoolean();
+
private final SjmsBatchEndpoint sjmsBatchEndpoint;
private final AggregationStrategy aggregationStrategy;
private final int completionSize;
+ private final int completionInterval;
private final int completionTimeout;
private final int consumerCount;
private final int pollDuration;
@@ -76,7 +83,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
destinationName = ObjectHelper.notEmpty(sjmsBatchEndpoint.getDestinationName(), "destinationName");
completionSize = sjmsBatchEndpoint.getCompletionSize();
+ completionInterval = sjmsBatchEndpoint.getCompletionInterval();
completionTimeout = sjmsBatchEndpoint.getCompletionTimeout();
+ if (completionInterval > 0 && completionTimeout != SjmsBatchEndpoint.DEFAULT_COMPLETION_TIMEOUT) {
+ throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
+ }
pollDuration = sjmsBatchEndpoint.getPollDuration();
if (pollDuration < 0) {
throw new IllegalArgumentException("pollDuration must be 0 or greater");
@@ -98,18 +109,21 @@ public class SjmsBatchConsumer extends DefaultConsumer {
return sjmsBatchEndpoint;
}
+ public ScheduledExecutorService getTimeoutCheckerExecutorService() {
+ return timeoutCheckerExecutorService;
+ }
+
+ public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
+ this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
+ }
+
@Override
protected void doStart() throws Exception {
super.doStart();
// start up a shared connection
- try {
- connection = connectionFactory.createConnection();
- connection.start();
- } catch (JMSException ex) {
- LOG.error("Exception caught closing connection: {}", getStackTrace(ex));
- return;
- }
+ connection = connectionFactory.createConnection();
+ connection.start();
if (LOG.isInfoEnabled()) {
LOG.info("Starting " + consumerCount + " consumer(s) for " + destinationName + ":" + completionSize);
@@ -121,12 +135,24 @@ public class SjmsBatchConsumer extends DefaultConsumer {
for (int i = 0; i < consumerCount; i++) {
jmsConsumerExecutors.execute(new BatchConsumptionLoop());
}
+
+ if (completionInterval > 0) {
+ LOG.info("Using CompletionInterval to run every " + completionInterval + " millis.");
+ if (timeoutCheckerExecutorService == null) {
+ setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1));
+ shutdownTimeoutCheckerExecutorService = true;
+ }
+ // trigger completion based on interval
+ timeoutCheckerExecutorService.scheduleAtFixedRate(new CompletionIntervalTask(completionTimeoutTrigger), completionInterval, completionInterval, TimeUnit.MILLISECONDS);
+ }
+
}
@Override
protected void doStop() throws Exception {
super.doStop();
running.set(false);
+
CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
if (consumersShutdownLatch != null) {
LOG.info("Stop signalled, waiting on consumers to shut down");
@@ -142,17 +168,40 @@ public class SjmsBatchConsumer extends DefaultConsumer {
try {
LOG.debug("Shutting down JMS connection");
connection.close();
- } catch (JMSException jex) {
- LOG.error("Exception caught closing connection: {}", getStackTrace(jex));
+ } catch (Exception e) {
+ LOG.warn("Exception caught closing JMS connection", e);
}
getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(jmsConsumerExecutors);
+ jmsConsumerExecutors = null;
+
+ if (shutdownTimeoutCheckerExecutorService) {
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(timeoutCheckerExecutorService);
+ timeoutCheckerExecutorService = null;
+ }
}
- private String getStackTrace(Exception ex) {
- StringWriter writer = new StringWriter();
- ex.printStackTrace(new PrintWriter(writer));
- return writer.toString();
+ /**
+ * Background task that triggers completion based on interval.
+ */
+ private final class CompletionIntervalTask implements Runnable {
+
+ private final AtomicBoolean timeoutInterval;
+
+ public CompletionIntervalTask(AtomicBoolean timeoutInterval) {
+ this.timeoutInterval = timeoutInterval;
+ }
+
+ public void run() {
+ // only run if CamelContext has been fully started
+ if (!getEndpoint().getCamelContext().getStatus().isStarted()) {
+ LOG.trace("Completion interval task cannot start due CamelContext({}) has not been started yet", getEndpoint().getCamelContext().getName());
+ return;
+ }
+
+ // signal
+ timeoutInterval.set(true);
+ }
}
private class BatchConsumptionLoop implements Runnable {
@@ -168,24 +217,32 @@ public class SjmsBatchConsumer extends DefaultConsumer {
MessageConsumer consumer = session.createConsumer(queue);
try {
- consumeBatchesOnLoop(session, consumer);
+ consumeBatchesOnLoop(session, consumer, completionTimeoutTrigger);
} finally {
try {
consumer.close();
} catch (JMSException ex2) {
- log.error("Exception caught closing consumer: {}", ex2.getMessage());
+ // only include stacktrace in debug logging
+ if (log.isDebugEnabled()) {
+ log.debug("Exception caught closing consumer", ex2);
+ }
+ log.warn("Exception caught closing consumer: {}", ex2.getMessage());
}
}
} finally {
try {
session.close();
} catch (JMSException ex1) {
- log.error("Exception caught closing session: {}", ex1.getMessage());
+ // only include stacktrace in debug logging
+ if (log.isDebugEnabled()) {
+ log.debug("Exception caught closing session: {}", ex1);
+ }
+ log.warn("Exception caught closing session: {}", ex1.getMessage());
}
}
} catch (JMSException ex) {
// from loop
- LOG.error("Exception caught consuming from {}: {}", destinationName, getStackTrace(ex));
+ LOG.warn("Exception caught consuming from " + destinationName, ex);
} finally {
// indicate that we have shut down
CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
@@ -193,20 +250,23 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
}
- private void consumeBatchesOnLoop(Session session, MessageConsumer consumer) throws JMSException {
+ private void consumeBatchesOnLoop(final Session session, final MessageConsumer consumer, final AtomicBoolean timeoutInterval) throws JMSException {
final boolean usingTimeout = completionTimeout > 0;
batchConsumption:
while (running.get()) {
+ // reset the state
+ boolean timeout = false;
int messageCount = 0;
-
- // reset the clock counters
long timeElapsed = 0;
long startTime = 0;
Exchange aggregatedExchange = null;
batch:
- while ((completionSize <= 0) || (messageCount < completionSize)) {
+ // loop while no timeout or interval triggered and while we have room still for messages in the batch
+ while (!timeout && !timeoutInterval.compareAndSet(true, false)
+ && (usingTimeout || (completionSize > 0 && messageCount < completionSize))) {
+
// check periodically to see whether we should be shutting down
long waitTime = (usingTimeout && (timeElapsed > 0))
? getReceiveWaitTime(timeElapsed)
@@ -222,10 +282,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
startTime = new Date().getTime(); // start counting down the period for this batch
}
messageCount++;
- LOG.debug("Message received: {}", messageCount);
- if ((message instanceof ObjectMessage)
- || (message instanceof TextMessage)) {
-
+ LOG.debug("#{} messages received", messageCount);
+ // TODO: why only object or text messages?
+ if (message instanceof ObjectMessage || message instanceof TextMessage) {
final Exchange exchange = getEndpoint().createExchange(message, session);
aggregatedExchange = aggregationStrategy.aggregate(aggregatedExchange, exchange);
aggregatedExchange.setProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, messageCount);
@@ -241,7 +300,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (timeElapsed > completionTimeout) {
// batch finished by timeout
- break batch;
+ timeout = true;
}
}
@@ -250,8 +309,14 @@ public class SjmsBatchConsumer extends DefaultConsumer {
session.rollback();
break batchConsumption;
}
- } // batch
- process(aggregatedExchange, session);
+ }
+
+ // batch
+ if (aggregatedExchange == null && getEndpoint().isSendEmptyMessageWhenIdle()) {
+ processEmptyMessage();
+ } else if (aggregatedExchange != null) {
+ processBatch(aggregatedExchange, session);
+ }
}
}
@@ -268,9 +333,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (timeRemaining <= 0) { // ensure that the thread doesn't wait indefinitely
timeRemaining = 1;
}
- final long waitTime = (timeRemaining > pollDuration) ? pollDuration : timeRemaining;
+ final long waitTime = Math.min(timeRemaining, pollDuration);
- LOG.debug("waiting for {}", waitTime);
+ LOG.trace("Waiting for {}", waitTime);
return waitTime;
}
@@ -282,7 +347,23 @@ public class SjmsBatchConsumer extends DefaultConsumer {
return timeRemaining;
}
- private void process(Exchange exchange, Session session) {
+ /**
+ * No messages in batch so send an empty message instead.
+ */
+ private void processEmptyMessage() {
+ Exchange exchange = getEndpoint().createExchange();
+ log.debug("Sending empty message as there were no messages from polling: {}", getEndpoint());
+ try {
+ getProcessor().process(exchange);
+ } catch (Exception e) {
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ }
+ }
+
+ /**
+ * Send an message with the batches messages.
+ */
+ private void processBatch(Exchange exchange, Session session) {
int id = BATCH_COUNT.getAndIncrement();
int batchSize = exchange.getProperty(SjmsBatchEndpoint.PROPERTY_BATCH_SIZE, Integer.class);
if (LOG.isDebugEnabled()) {
@@ -296,7 +377,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
long total = MESSAGE_PROCESSED.addAndGet(batchSize);
LOG.debug("Completed processing[{}]:total={}", id, total);
} catch (Exception e) {
- LOG.error("Error processing exchange: {}", e.getMessage());
+ getExceptionHandler().handleException("Error processing exchange", exchange, e);
}
}
http://git-wip-us.apache.org/repos/asf/camel/blob/cf288b6a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
index 790f1ef..d989e08 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchEndpoint.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.sjms.batch;
+import java.util.concurrent.ScheduledExecutorService;
import javax.jms.Message;
import javax.jms.Session;
@@ -55,6 +56,8 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
@UriPath @Metadata(required = "true")
private String destinationName;
+ @UriParam @Metadata(required = "true")
+ private AggregationStrategy aggregationStrategy;
@UriParam(defaultValue = "1")
private int consumerCount = 1;
@UriParam(defaultValue = "200")
@@ -62,22 +65,25 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
@UriParam(defaultValue = "500")
private int completionTimeout = DEFAULT_COMPLETION_TIMEOUT;
@UriParam(defaultValue = "1000")
- private int pollDuration = 1000;
- @UriParam @Metadata(required = "true")
- private AggregationStrategy aggregationStrategy;
+ private int completionInterval;
@UriParam
- private HeaderFilterStrategy headerFilterStrategy;
+ private boolean sendEmptyMessageWhenIdle;
+ @UriParam(defaultValue = "1000")
+ private int pollDuration = 1000;
@UriParam
private boolean includeAllJMSXProperties;
@UriParam(defaultValue = "true")
private boolean allowNullBody = true;
@UriParam(defaultValue = "true")
private boolean mapJmsMessage = true;
- @UriParam
+ @UriParam(label = "advanced")
+ private HeaderFilterStrategy headerFilterStrategy;
+ @UriParam(label = "advanced")
private MessageCreatedStrategy messageCreatedStrategy;
- @UriParam
+ @UriParam(label = "advanced")
private JmsKeyFormatStrategy jmsKeyFormatStrategy;
-
+ @UriParam(label = "advanced")
+ private ScheduledExecutorService timeoutCheckerExecutorService;
public SjmsBatchEndpoint() {
}
@@ -100,12 +106,15 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
@Override
public Producer createProducer() throws Exception {
- throw new UnsupportedOperationException("Cannot produce though a " + SjmsBatchEndpoint.class.getName());
+ throw new UnsupportedOperationException("Producer not supported");
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
- return new SjmsBatchConsumer(this, processor);
+ SjmsBatchConsumer consumer = new SjmsBatchConsumer(this, processor);
+ consumer.setTimeoutCheckerExecutorService(timeoutCheckerExecutorService);
+ configureConsumer(consumer);
+ return consumer;
}
public Exchange createExchange(Message message, Session session) {
@@ -181,12 +190,41 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
}
/**
- * The timeout from receipt of the first first message when the batch will be completed
+ * The timeout in millis from receipt of the first first message when the batch will be completed.
+ * The batch may be empty if the timeout triggered and there was no messages in the batch.
+ * <br/>
+ * Notice you cannot use both completion timeout and completion interval at the same time, only one can be configured.
*/
public void setCompletionTimeout(int completionTimeout) {
this.completionTimeout = completionTimeout;
}
+ public int getCompletionInterval() {
+ return completionInterval;
+ }
+
+ /**
+ * The completion interval in millis, which causes batches to be completed in a scheduled fixed rate every interval.
+ * The batch may be empty if the timeout triggered and there was no messages in the batch.
+ * <br/>
+ * Notice you cannot use both completion timeout and completion interval at the same time, only one can be configured.
+ */
+ public void setCompletionInterval(int completionInterval) {
+ this.completionInterval = completionInterval;
+ }
+
+ public boolean isSendEmptyMessageWhenIdle() {
+ return sendEmptyMessageWhenIdle;
+ }
+
+ /**
+ * If using completion timeout or interval, then the batch may be empty if the timeout triggered and there was no messages in the batch.
+ * If this option is <tt>true</tt> and the batch is empty then an empty message is added to the batch so an empty message is routed.
+ */
+ public void setSendEmptyMessageWhenIdle(boolean sendEmptyMessageWhenIdle) {
+ this.sendEmptyMessageWhenIdle = sendEmptyMessageWhenIdle;
+ }
+
public int getPollDuration() {
return pollDuration;
}
@@ -280,4 +318,15 @@ public class SjmsBatchEndpoint extends DefaultEndpoint implements HeaderFilterSt
this.includeAllJMSXProperties = includeAllJMSXProperties;
}
+ public ScheduledExecutorService getTimeoutCheckerExecutorService() {
+ return timeoutCheckerExecutorService;
+ }
+
+ /**
+ * If using the completionInterval option a background thread is created to trigger the completion interval.
+ * Set this option to provide a custom thread pool to be used rather than creating a new thread for every consumer.
+ */
+ public void setTimeoutCheckerExecutorService(ScheduledExecutorService timeoutCheckerExecutorService) {
+ this.timeoutCheckerExecutorService = timeoutCheckerExecutorService;
+ }
}