You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/09/02 14:28:37 UTC

[camel] 02/11: CAMEL-13925: camel-seda - SedaConsumer should extend DefaultConsumer

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit eee4e0e5f09bbae7574e3113bd1a612e1892fea3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Sep 2 09:45:25 2019 +0200

    CAMEL-13925: camel-seda - SedaConsumer should extend DefaultConsumer
---
 .../apache/camel/component/seda/SedaConsumer.java  | 85 ++++++++--------------
 1 file changed, 30 insertions(+), 55 deletions(-)

diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index d5517b8..e6c2bde 100644
--- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -24,21 +24,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.ShutdownRunningTask;
 import org.apache.camel.Suspendable;
-import org.apache.camel.spi.ExceptionHandler;
 import org.apache.camel.spi.ShutdownAware;
 import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.DefaultConsumer;
 import org.apache.camel.support.EmptyAsyncCallback;
 import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.LoggingExceptionHandler;
 import org.apache.camel.support.UnitOfWorkHelper;
-import org.apache.camel.support.service.ServiceSupport;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -47,46 +42,23 @@ import org.apache.camel.util.ObjectHelper;
  * In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which
  * the consumer may pickup a newly arrived messages and process it. That period is up till 1 second.
  */
-public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, Suspendable {
+public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownAware, Suspendable {
 
     private final AtomicInteger taskCount = new AtomicInteger();
     private volatile CountDownLatch latch;
     private volatile boolean shutdownPending;
     private volatile boolean forceShutdown;
-    private SedaEndpoint endpoint;
-    private AsyncProcessor processor;
     private ExecutorService executor;
-    private ExceptionHandler exceptionHandler;
     private final int pollTimeout;
 
     public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
-        this.endpoint = endpoint;
-        this.processor = AsyncProcessorConverterHelper.convert(processor);
+        super(endpoint, processor);
         this.pollTimeout = endpoint.getPollTimeout();
-        this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
     }
 
     @Override
-    public String toString() {
-        return "SedaConsumer[" + endpoint + "]";
-    }
-
-    @Override
-    public Endpoint getEndpoint() {
-        return endpoint;
-    }
-
-    public ExceptionHandler getExceptionHandler() {
-        return exceptionHandler;
-    }
-
-    public void setExceptionHandler(ExceptionHandler exceptionHandler) {
-        this.exceptionHandler = exceptionHandler;
-    }
-
-    @Override
-    public Processor getProcessor() {
-        return processor;
+    public SedaEndpoint getEndpoint() {
+        return (SedaEndpoint) super.getEndpoint();
     }
 
     @Override
@@ -100,10 +72,10 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
     public int getPendingExchangesSize() {
         // the route is shutting down, so either we should purge the queue,
         // or return how many exchanges are still on the queue
-        if (endpoint.isPurgeWhenStopping()) {
-            endpoint.purgeQueue();
+        if (getEndpoint().isPurgeWhenStopping()) {
+            getEndpoint().purgeQueue();
         }
-        return endpoint.getQueue().size();
+        return getEndpoint().getQueue().size();
     }
 
     @Override
@@ -159,7 +131,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
     }
 
     protected void doRun() {
-        BlockingQueue<Exchange> queue = endpoint.getQueue();
+        BlockingQueue<Exchange> queue = getEndpoint().getQueue();
         // loop while we are allowed, or if we are stopping loop until the queue is empty
         while (queue != null && isRunAllowed()) {
 
@@ -246,9 +218,9 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
      */
     protected Exchange prepareExchange(Exchange exchange) {
         // send a new copied exchange with new camel context
-        Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext());
+        Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, getEndpoint().getCamelContext());
         // set the from endpoint
-        newExchange.setFromEndpoint(endpoint);
+        newExchange.setFromEndpoint(getEndpoint());
         return newExchange;
     }
 
@@ -265,13 +237,13 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
      */
     protected void sendToConsumers(final Exchange exchange) throws Exception {
         // validate multiple consumers has been enabled
-        int size = endpoint.getConsumers().size();
-        if (size > 1 && !endpoint.isMultipleConsumersSupported()) {
-            throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint);
+        int size = getEndpoint().getConsumers().size();
+        if (size > 1 && !getEndpoint().isMultipleConsumersSupported()) {
+            throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + getEndpoint());
         }
 
         // if there are multiple consumers then multicast to them
-        if (endpoint.isMultipleConsumersSupported()) {
+        if (getEndpoint().isMultipleConsumersSupported()) {
 
             if (log.isTraceEnabled()) {
                 log.trace("Multicasting to {} consumers for Exchange: {}", size, exchange);
@@ -281,7 +253,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
             final List<Synchronization> completions = exchange.handoverCompletions();
 
             // use a multicast processor to process it
-            AsyncProcessor mp = endpoint.getConsumerMulticastProcessor();
+            AsyncProcessor mp = getEndpoint().getConsumerMulticastProcessor();
             ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this);
 
             // and use the asynchronous routing engine to support it
@@ -291,40 +263,43 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
             });
         } else {
             // use the regular processor and use the asynchronous routing engine to support it
-            processor.process(exchange, EmptyAsyncCallback.get());
+            getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
         }
     }
 
     @Override
     protected void doStart() throws Exception {
-        latch = new CountDownLatch(endpoint.getConcurrentConsumers());
+        super.doStart();
+        latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
         shutdownPending = false;
         forceShutdown = false;
 
         setupTasks();
-        endpoint.onStarted(this);
+        getEndpoint().onStarted(this);
     }
 
     @Override
     protected void doSuspend() throws Exception {
-        endpoint.onStopped(this);
+        getEndpoint().onStopped(this);
     }
 
     @Override
     protected void doResume() throws Exception {
-        endpoint.onStarted(this);
+        getEndpoint().onStarted(this);
     }
 
     @Override
     protected void doStop() throws Exception {
         // ensure queue is purged if we stop the consumer
-        if (endpoint.isPurgeWhenStopping()) {
-            endpoint.purgeQueue();
+        if (getEndpoint().isPurgeWhenStopping()) {
+            getEndpoint().purgeQueue();
         }
 
-        endpoint.onStopped(this);
+        getEndpoint().onStopped(this);
         
         shutdownExecutor();
+
+        super.doStop();
     }
 
     @Override
@@ -334,7 +309,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
 
     private void shutdownExecutor() {
         if (executor != null) {
-            endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
             executor = null;
         }
     }
@@ -343,11 +318,11 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
      * Setup the thread pool and ensures tasks gets executed (if needed)
      */
     private void setupTasks() {
-        int poolSize = endpoint.getConcurrentConsumers();
+        int poolSize = getEndpoint().getConcurrentConsumers();
 
         // create thread pool if needed
         if (executor == null) {
-            executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
+            executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
         }
 
         // submit needed number of tasks