You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2019/09/02 14:28:37 UTC
[camel] 02/11: CAMEL-13925: camel-seda - SedaConsumer should extend
DefaultConsumer
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
commit eee4e0e5f09bbae7574e3113bd1a612e1892fea3
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Sep 2 09:45:25 2019 +0200
CAMEL-13925: camel-seda - SedaConsumer should extend DefaultConsumer
---
.../apache/camel/component/seda/SedaConsumer.java | 85 ++++++++--------------
1 file changed, 30 insertions(+), 55 deletions(-)
diff --git a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
index d5517b8..e6c2bde 100644
--- a/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
+++ b/components/camel-seda/src/main/java/org/apache/camel/component/seda/SedaConsumer.java
@@ -24,21 +24,16 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncProcessor;
-import org.apache.camel.Consumer;
-import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ShutdownRunningTask;
import org.apache.camel.Suspendable;
-import org.apache.camel.spi.ExceptionHandler;
import org.apache.camel.spi.ShutdownAware;
import org.apache.camel.spi.Synchronization;
-import org.apache.camel.support.AsyncProcessorConverterHelper;
+import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.EmptyAsyncCallback;
import org.apache.camel.support.ExchangeHelper;
-import org.apache.camel.support.LoggingExceptionHandler;
import org.apache.camel.support.UnitOfWorkHelper;
-import org.apache.camel.support.service.ServiceSupport;
import org.apache.camel.util.ObjectHelper;
/**
@@ -47,46 +42,23 @@ import org.apache.camel.util.ObjectHelper;
* In this implementation there is a little <i>slack period</i> when you suspend/stop the consumer, by which
* the consumer may pickup a newly arrived messages and process it. That period is up till 1 second.
*/
-public class SedaConsumer extends ServiceSupport implements Consumer, Runnable, ShutdownAware, Suspendable {
+public class SedaConsumer extends DefaultConsumer implements Runnable, ShutdownAware, Suspendable {
private final AtomicInteger taskCount = new AtomicInteger();
private volatile CountDownLatch latch;
private volatile boolean shutdownPending;
private volatile boolean forceShutdown;
- private SedaEndpoint endpoint;
- private AsyncProcessor processor;
private ExecutorService executor;
- private ExceptionHandler exceptionHandler;
private final int pollTimeout;
public SedaConsumer(SedaEndpoint endpoint, Processor processor) {
- this.endpoint = endpoint;
- this.processor = AsyncProcessorConverterHelper.convert(processor);
+ super(endpoint, processor);
this.pollTimeout = endpoint.getPollTimeout();
- this.exceptionHandler = new LoggingExceptionHandler(endpoint.getCamelContext(), getClass());
}
@Override
- public String toString() {
- return "SedaConsumer[" + endpoint + "]";
- }
-
- @Override
- public Endpoint getEndpoint() {
- return endpoint;
- }
-
- public ExceptionHandler getExceptionHandler() {
- return exceptionHandler;
- }
-
- public void setExceptionHandler(ExceptionHandler exceptionHandler) {
- this.exceptionHandler = exceptionHandler;
- }
-
- @Override
- public Processor getProcessor() {
- return processor;
+ public SedaEndpoint getEndpoint() {
+ return (SedaEndpoint) super.getEndpoint();
}
@Override
@@ -100,10 +72,10 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
public int getPendingExchangesSize() {
// the route is shutting down, so either we should purge the queue,
// or return how many exchanges are still on the queue
- if (endpoint.isPurgeWhenStopping()) {
- endpoint.purgeQueue();
+ if (getEndpoint().isPurgeWhenStopping()) {
+ getEndpoint().purgeQueue();
}
- return endpoint.getQueue().size();
+ return getEndpoint().getQueue().size();
}
@Override
@@ -159,7 +131,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
}
protected void doRun() {
- BlockingQueue<Exchange> queue = endpoint.getQueue();
+ BlockingQueue<Exchange> queue = getEndpoint().getQueue();
// loop while we are allowed, or if we are stopping loop until the queue is empty
while (queue != null && isRunAllowed()) {
@@ -246,9 +218,9 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
*/
protected Exchange prepareExchange(Exchange exchange) {
// send a new copied exchange with new camel context
- Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, endpoint.getCamelContext());
+ Exchange newExchange = ExchangeHelper.copyExchangeAndSetCamelContext(exchange, getEndpoint().getCamelContext());
// set the from endpoint
- newExchange.setFromEndpoint(endpoint);
+ newExchange.setFromEndpoint(getEndpoint());
return newExchange;
}
@@ -265,13 +237,13 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
*/
protected void sendToConsumers(final Exchange exchange) throws Exception {
// validate multiple consumers has been enabled
- int size = endpoint.getConsumers().size();
- if (size > 1 && !endpoint.isMultipleConsumersSupported()) {
- throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + endpoint);
+ int size = getEndpoint().getConsumers().size();
+ if (size > 1 && !getEndpoint().isMultipleConsumersSupported()) {
+ throw new IllegalStateException("Multiple consumers for the same endpoint is not allowed: " + getEndpoint());
}
// if there are multiple consumers then multicast to them
- if (endpoint.isMultipleConsumersSupported()) {
+ if (getEndpoint().isMultipleConsumersSupported()) {
if (log.isTraceEnabled()) {
log.trace("Multicasting to {} consumers for Exchange: {}", size, exchange);
@@ -281,7 +253,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
final List<Synchronization> completions = exchange.handoverCompletions();
// use a multicast processor to process it
- AsyncProcessor mp = endpoint.getConsumerMulticastProcessor();
+ AsyncProcessor mp = getEndpoint().getConsumerMulticastProcessor();
ObjectHelper.notNull(mp, "ConsumerMulticastProcessor", this);
// and use the asynchronous routing engine to support it
@@ -291,40 +263,43 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
});
} else {
// use the regular processor and use the asynchronous routing engine to support it
- processor.process(exchange, EmptyAsyncCallback.get());
+ getAsyncProcessor().process(exchange, EmptyAsyncCallback.get());
}
}
@Override
protected void doStart() throws Exception {
- latch = new CountDownLatch(endpoint.getConcurrentConsumers());
+ super.doStart();
+ latch = new CountDownLatch(getEndpoint().getConcurrentConsumers());
shutdownPending = false;
forceShutdown = false;
setupTasks();
- endpoint.onStarted(this);
+ getEndpoint().onStarted(this);
}
@Override
protected void doSuspend() throws Exception {
- endpoint.onStopped(this);
+ getEndpoint().onStopped(this);
}
@Override
protected void doResume() throws Exception {
- endpoint.onStarted(this);
+ getEndpoint().onStarted(this);
}
@Override
protected void doStop() throws Exception {
// ensure queue is purged if we stop the consumer
- if (endpoint.isPurgeWhenStopping()) {
- endpoint.purgeQueue();
+ if (getEndpoint().isPurgeWhenStopping()) {
+ getEndpoint().purgeQueue();
}
- endpoint.onStopped(this);
+ getEndpoint().onStopped(this);
shutdownExecutor();
+
+ super.doStop();
}
@Override
@@ -334,7 +309,7 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
private void shutdownExecutor() {
if (executor != null) {
- endpoint.getCamelContext().getExecutorServiceManager().shutdownNow(executor);
+ getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executor);
executor = null;
}
}
@@ -343,11 +318,11 @@ public class SedaConsumer extends ServiceSupport implements Consumer, Runnable,
* Setup the thread pool and ensures tasks gets executed (if needed)
*/
private void setupTasks() {
- int poolSize = endpoint.getConcurrentConsumers();
+ int poolSize = getEndpoint().getConcurrentConsumers();
// create thread pool if needed
if (executor == null) {
- executor = endpoint.getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, endpoint.getEndpointUri(), poolSize);
+ executor = getEndpoint().getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, getEndpoint().getEndpointUri(), poolSize);
}
// submit needed number of tasks