You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/12/06 17:24:08 UTC
[camel] branch master updated: CAMEL-15653: camel-sjms - Improve
logging for sjms batch recovery task. Thanks to Brad Harvey for the patch.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new e76bf0f CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch.
e76bf0f is described below
commit e76bf0ffd1738386d580782f3c0c71deda4b2fa3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Dec 6 18:21:29 2020 +0100
CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch.
---
.../component/sjms/batch/SjmsBatchComponent.java | 22 -----------
.../component/sjms/batch/SjmsBatchConsumer.java | 44 +++++++++++++---------
2 files changed, 27 insertions(+), 39 deletions(-)
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 8bdb635..ddb0dce 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -17,7 +17,6 @@
package org.apache.camel.component.sjms.batch;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import javax.jms.ConnectionFactory;
@@ -30,8 +29,6 @@ import org.apache.camel.util.ObjectHelper;
@Component("sjms-batch")
public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
- private ExecutorService asyncStartStopExecutorService;
-
@Metadata(label = "advanced")
private ConnectionFactory connectionFactory;
@Metadata(label = "advanced")
@@ -97,23 +94,4 @@ public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
this.recoveryInterval = recoveryInterval;
}
- @Override
- protected void doShutdown() throws Exception {
- if (asyncStartStopExecutorService != null) {
- getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
- asyncStartStopExecutorService = null;
- }
- super.doShutdown();
- }
-
- protected synchronized ExecutorService getAsyncStartStopExecutorService() {
- if (asyncStartStopExecutorService == null) {
- // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
- // for each task, and the thread pool will shrink when no more tasks running
- asyncStartStopExecutorService
- = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
- }
- return asyncStartStopExecutorService;
- }
-
}
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index a625877..b8c7e95 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -42,6 +42,7 @@ import org.apache.camel.ExtendedExchange;
import org.apache.camel.Predicate;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.StringHelper;
@@ -138,8 +139,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
= new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay());
if (recovery) {
- // use a background thread to keep starting the consumer until
- getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(task);
+ // use a background thread to keep starting the consumer until it can connect successfully
+ String threadNameSuffix = "AsyncStartStopListener[" + destinationName + "]";
+ ExecutorServiceManager executorServiceManager = getEndpoint().getCamelContext().getExecutorServiceManager();
+ Thread thread = executorServiceManager.newThread(threadNameSuffix, task);
+ thread.start();
} else {
task.run();
}
@@ -168,7 +172,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
if (completionInterval > 0) {
- LOG.info("Using CompletionInterval to run every {} millis.", completionInterval);
+ LOG.info("Using CompletionInterval to run every {} millis for {}.", completionInterval, destinationName);
if (timeoutCheckerExecutorService == null) {
setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager()
.newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1));
@@ -208,7 +212,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
if (attempt > 1) {
- LOG.info("Successfully refreshed connection after {} attempts.", attempt);
+ LOG.info("Successfully refreshed connection to {} after {} attempts.", destinationName, attempt);
}
LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
@@ -234,7 +238,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
// sleeping before next attempt
try {
- LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover", attempt, recoveryInterval);
+ LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover {}", attempt, recoveryInterval,
+ destinationName);
Thread.sleep(recoveryInterval);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -251,18 +256,19 @@ public class SjmsBatchConsumer extends DefaultConsumer {
CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
if (consumersShutdownLatch != null) {
- LOG.info("Stop signalled, waiting on consumers to shut down");
+ LOG.info("Stop signalled, waiting on consumers for {} to shut down", destinationName);
if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
- LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
+ LOG.warn("Timeout waiting on consumer threads for {} to signal completion - shutting down", destinationName);
} else {
- LOG.info("All consumers have been shutdown");
+ LOG.info("All consumers for {} have been shutdown", destinationName);
}
} else {
- LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
+ LOG.info("Stop signalled while there are no consumers for {} yet, so no need to wait for consumers",
+ destinationName);
}
try {
- LOG.debug("Shutting down JMS connection");
+ LOG.debug("Shutting down JMS connection for {}", destinationName);
connection.close();
} catch (Exception e) {
// ignore
@@ -374,7 +380,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception caught closing consumer", ex2);
}
- LOG.warn("Exception caught closing consumer: {}. This exception is ignored.", ex2.getMessage());
+ LOG.warn("Exception caught closing consumer for : {}. This exception is ignored.", destinationName,
+ ex2.getMessage());
}
}
@@ -388,7 +395,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
if (LOG.isDebugEnabled()) {
LOG.debug("Exception caught closing session", ex2);
}
- LOG.warn("Exception caught closing session: {}. This exception is ignored.", ex2.getMessage());
+ LOG.warn("Exception caught closing session for {}: {}. This exception is ignored.", destinationName,
+ ex2.getMessage());
}
}
@@ -473,8 +481,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
reset();
}
} catch (Exception e) {
- LOG.warn("Error during evaluation of completion predicate {}. This exception is ignored.",
- e.getMessage(), e);
+ LOG.warn("Error during evaluation of completion predicate " + e.getMessage()
+ + ". This exception is ignored.",
+ e);
}
}
}
@@ -494,7 +503,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
}
} else {
- LOG.info("Shutdown signal received - rolling back batch");
+ LOG.info("Shutdown signal received - rolling back {} pending in batch from destination {}",
+ messageCount, destinationName);
session.rollback();
}
}
@@ -569,7 +579,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
if (LOG.isDebugEnabled()) {
long total = MESSAGE_RECEIVED.get() + batchSize;
- LOG.debug("Processing batch[{}]:size={}:total={}", id, batchSize, total);
+ LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total);
}
if ("timeout".equals(completedBy)) {
@@ -587,7 +597,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
long total = MESSAGE_PROCESSED.addAndGet(batchSize);
LOG.debug("Completed processing[{}]:total={}", id, total);
} catch (Exception e) {
- getExceptionHandler().handleException("Error processing exchange", exchange, e);
+ getExceptionHandler().handleException("Error processing exchange from " + destinationName, exchange, e);
}
}