You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/12/06 17:24:08 UTC

[camel] branch master updated: CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new e76bf0f  CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch.
e76bf0f is described below

commit e76bf0ffd1738386d580782f3c0c71deda4b2fa3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sun Dec 6 18:21:29 2020 +0100

    CAMEL-15653: camel-sjms - Improve logging for sjms batch recovery task. Thanks to Brad Harvey for the patch.
---
 .../component/sjms/batch/SjmsBatchComponent.java   | 22 -----------
 .../component/sjms/batch/SjmsBatchConsumer.java    | 44 +++++++++++++---------
 2 files changed, 27 insertions(+), 39 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
index 8bdb635..ddb0dce 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchComponent.java
@@ -17,7 +17,6 @@
 package org.apache.camel.component.sjms.batch;
 
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
 
 import javax.jms.ConnectionFactory;
 
@@ -30,8 +29,6 @@ import org.apache.camel.util.ObjectHelper;
 @Component("sjms-batch")
 public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
 
-    private ExecutorService asyncStartStopExecutorService;
-
     @Metadata(label = "advanced")
     private ConnectionFactory connectionFactory;
     @Metadata(label = "advanced")
@@ -97,23 +94,4 @@ public class SjmsBatchComponent extends HeaderFilterStrategyComponent {
         this.recoveryInterval = recoveryInterval;
     }
 
-    @Override
-    protected void doShutdown() throws Exception {
-        if (asyncStartStopExecutorService != null) {
-            getCamelContext().getExecutorServiceManager().shutdownNow(asyncStartStopExecutorService);
-            asyncStartStopExecutorService = null;
-        }
-        super.doShutdown();
-    }
-
-    protected synchronized ExecutorService getAsyncStartStopExecutorService() {
-        if (asyncStartStopExecutorService == null) {
-            // use a cached thread pool for async start tasks as they can run for a while, and we need a dedicated thread
-            // for each task, and the thread pool will shrink when no more tasks running
-            asyncStartStopExecutorService
-                    = getCamelContext().getExecutorServiceManager().newCachedThreadPool(this, "AsyncStartStopListener");
-        }
-        return asyncStartStopExecutorService;
-    }
-
 }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
index a625877..b8c7e95 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/batch/SjmsBatchConsumer.java
@@ -42,6 +42,7 @@ import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.StringHelper;
@@ -138,8 +139,11 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 = new StartConsumerTask(recovery, getEndpoint().getRecoveryInterval(), getEndpoint().getKeepAliveDelay());
 
         if (recovery) {
-            // use a background thread to keep starting the consumer until
-            getEndpoint().getComponent().getAsyncStartStopExecutorService().submit(task);
+            // use a background thread to keep starting the consumer until it can connect successfully
+            String threadNameSuffix = "AsyncStartStopListener[" + destinationName + "]";
+            ExecutorServiceManager executorServiceManager = getEndpoint().getCamelContext().getExecutorServiceManager();
+            Thread thread = executorServiceManager.newThread(threadNameSuffix, task);
+            thread.start();
         } else {
             task.run();
         }
@@ -168,7 +172,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             consumersShutdownLatchRef.set(new CountDownLatch(consumerCount));
 
             if (completionInterval > 0) {
-                LOG.info("Using CompletionInterval to run every {} millis.", completionInterval);
+                LOG.info("Using CompletionInterval to run every {} millis for {}.", completionInterval, destinationName);
                 if (timeoutCheckerExecutorService == null) {
                     setTimeoutCheckerExecutorService(getEndpoint().getCamelContext().getExecutorServiceManager()
                             .newScheduledThreadPool(this, SJMS_BATCH_TIMEOUT_CHECKER, 1));
@@ -208,7 +212,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                     }
 
                     if (attempt > 1) {
-                        LOG.info("Successfully refreshed connection after {} attempts.", attempt);
+                        LOG.info("Successfully refreshed connection to {} after {} attempts.", destinationName, attempt);
                     }
 
                     LOG.info("Started {} consumer(s) for {}:{}", consumerCount, destinationName, completionSize);
@@ -234,7 +238,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
                 // sleeping before next attempt
                 try {
-                    LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover", attempt, recoveryInterval);
+                    LOG.debug("Attempt #{}. Sleeping {} before next attempt to recover {}", attempt, recoveryInterval,
+                            destinationName);
                     Thread.sleep(recoveryInterval);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
@@ -251,18 +256,19 @@ public class SjmsBatchConsumer extends DefaultConsumer {
 
         CountDownLatch consumersShutdownLatch = consumersShutdownLatchRef.get();
         if (consumersShutdownLatch != null) {
-            LOG.info("Stop signalled, waiting on consumers to shut down");
+            LOG.info("Stop signalled, waiting on consumers for {} to shut down", destinationName);
             if (consumersShutdownLatch.await(60, TimeUnit.SECONDS)) {
-                LOG.warn("Timeout waiting on consumer threads to signal completion - shutting down");
+                LOG.warn("Timeout waiting on consumer threads for {} to signal completion - shutting down", destinationName);
             } else {
-                LOG.info("All consumers have been shutdown");
+                LOG.info("All consumers for {} have been shutdown", destinationName);
             }
         } else {
-            LOG.info("Stop signalled while there are no consumers yet, so no need to wait for consumers");
+            LOG.info("Stop signalled while there are no consumers for {} yet, so no need to wait for consumers",
+                    destinationName);
         }
 
         try {
-            LOG.debug("Shutting down JMS connection");
+            LOG.debug("Shutting down JMS connection for {}", destinationName);
             connection.close();
         } catch (Exception e) {
             // ignore
@@ -374,7 +380,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Exception caught closing consumer", ex2);
                 }
-                LOG.warn("Exception caught closing consumer: {}. This exception is ignored.", ex2.getMessage());
+                LOG.warn("Exception caught closing consumer for : {}. This exception is ignored.", destinationName,
+                        ex2.getMessage());
             }
         }
 
@@ -388,7 +395,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Exception caught closing session", ex2);
                 }
-                LOG.warn("Exception caught closing session: {}. This exception is ignored.", ex2.getMessage());
+                LOG.warn("Exception caught closing session for {}: {}. This exception is ignored.", destinationName,
+                        ex2.getMessage());
             }
         }
 
@@ -473,8 +481,9 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                                         reset();
                                     }
                                 } catch (Exception e) {
-                                    LOG.warn("Error during evaluation of completion predicate {}. This exception is ignored.",
-                                            e.getMessage(), e);
+                                    LOG.warn("Error during evaluation of completion predicate " + e.getMessage()
+                                             + ". This exception is ignored.",
+                                            e);
                                 }
                             }
                         }
@@ -494,7 +503,8 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                         }
 
                     } else {
-                        LOG.info("Shutdown signal received - rolling back batch");
+                        LOG.info("Shutdown signal received - rolling back {} pending in batch from destination {}",
+                                messageCount, destinationName);
                         session.rollback();
                     }
                 }
@@ -569,7 +579,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
             int batchSize = exchange.getProperty(Exchange.BATCH_SIZE, Integer.class);
             if (LOG.isDebugEnabled()) {
                 long total = MESSAGE_RECEIVED.get() + batchSize;
-                LOG.debug("Processing batch[{}]:size={}:total={}", id, batchSize, total);
+                LOG.debug("Processing batch[" + id + "]:size=" + batchSize + ":total=" + total);
             }
 
             if ("timeout".equals(completedBy)) {
@@ -587,7 +597,7 @@ public class SjmsBatchConsumer extends DefaultConsumer {
                 long total = MESSAGE_PROCESSED.addAndGet(batchSize);
                 LOG.debug("Completed processing[{}]:total={}", id, total);
             } catch (Exception e) {
-                getExceptionHandler().handleException("Error processing exchange", exchange, e);
+                getExceptionHandler().handleException("Error processing exchange from " + destinationName, exchange, e);
             }
         }