You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/13 09:32:02 UTC

[4/5] camel git commit: CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.

CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1fd632cf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1fd632cf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1fd632cf

Branch: refs/heads/camel-2.17.x
Commit: 1fd632cfc3741c86cc58cfa69f5935005ddb5cfa
Parents: 81e1f8a
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 11:20:03 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri May 13 11:29:33 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaComponent.java   | 21 ++++++++
 .../component/kafka/KafkaConfiguration.java     | 55 ++++++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 32 +++++++++++-
 .../camel/component/kafka/KafkaProducer.java    | 40 +++++++++++---
 4 files changed, 141 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 2981b3f..76ef55f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.UriEndpointComponent;
 
 public class KafkaComponent extends UriEndpointComponent {
@@ -31,6 +33,8 @@ public class KafkaComponent extends UriEndpointComponent {
         super(context, KafkaEndpoint.class);
     }
 
+    private ExecutorService workerPool;
+
     @Override
     protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
@@ -38,8 +42,25 @@ public class KafkaComponent extends UriEndpointComponent {
         if (brokers != null) {
             endpoint.getConfiguration().setBrokers(brokers);
         }
+
+        // configure component options before endpoint properties which can override from params
+        endpoint.getConfiguration().setWorkerPool(workerPool);
+
         setProperties(endpoint, params);
         return endpoint;
     }
 
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    /**
+     * To use a shared custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     * If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
+     */
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 6b67fc5..1a98b28 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -100,6 +102,13 @@ public class KafkaConfiguration {
     @UriParam(label = "producer", defaultValue = "100")
     private Integer retryBackoffMs = 100;
 
+    @UriParam(label = "producer")
+    private ExecutorService workerPool;
+    @UriParam(label = "producer", defaultValue = "10")
+    private Integer workerPoolCoreSize = 10;
+    @UriParam(label = "producer", defaultValue = "20")
+    private Integer workerPoolMaxSize = 20;
+
     //Async producer config
     @UriParam(label = "producer", defaultValue = "10000")
     private Integer queueBufferingMaxMessages = 10000;
@@ -1145,4 +1154,50 @@ public class KafkaConfiguration {
         this.valueDeserializer = valueDeserializer;
     }
 
+    public boolean isSeekToBeginning() {
+        return seekToBeginning;
+    }
+
+    /**
+     * If the option is true, then KafkaConsumer will read from beginning on startup.
+     */
+    public void setSeekToBeginning(boolean seekToBeginning) {
+        this.seekToBeginning = seekToBeginning;
+    }
+
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    /**
+     * To use a custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     */
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    public Integer getWorkerPoolCoreSize() {
+        return workerPoolCoreSize;
+    }
+
+    /**
+     * Number of core threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     */
+    public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+        this.workerPoolCoreSize = workerPoolCoreSize;
+    }
+
+    public Integer getWorkerPoolMaxSize() {
+        return workerPoolMaxSize;
+    }
+
+    /**
+     * Maximum number of threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     */
+    public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+        this.workerPoolMaxSize = workerPoolMaxSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 369537a..d995475 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -86,7 +86,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     }
 
     public ExecutorService createExecutor() {
-        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+    }
+
+    public ExecutorService createProducerExecutor() {
+        int core = getConfiguration().getWorkerPoolCoreSize();
+        int max = getConfiguration().getWorkerPoolMaxSize();
+        return getCamelContext().getExecutorServiceManager().newThreadPool(this, "KafkaProducer[" + configuration.getTopic() + "]", core, max);
     }
 
     public Exchange createKafkaExchange(ConsumerRecord record) {
@@ -649,4 +655,28 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     public void setBridgeEndpoint(boolean bridgeEndpoint) {
         this.bridgeEndpoint = bridgeEndpoint;
     }
+
+    public void setWorkerPool(ExecutorService workerPool) {
+        configuration.setWorkerPool(workerPool);
+    }
+
+    public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+        configuration.setWorkerPoolMaxSize(workerPoolMaxSize);
+    }
+
+    public Integer getWorkerPoolMaxSize() {
+        return configuration.getWorkerPoolMaxSize();
+    }
+
+    public Integer getWorkerPoolCoreSize() {
+        return configuration.getWorkerPoolCoreSize();
+    }
+
+    public ExecutorService getWorkerPool() {
+        return configuration.getWorkerPool();
+    }
+
+    public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+        configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6c432d6..0f783b5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -33,6 +34,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
 
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
+    private ExecutorService workerPool;
+    private boolean shutdownWorkerPool;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
         super(endpoint);
@@ -58,11 +61,12 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         this.kafkaProducer = kafkaProducer;
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        if (kafkaProducer != null) {
-            kafkaProducer.close();
-        }
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
     }
 
     @Override
@@ -71,6 +75,23 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         if (kafkaProducer == null) {
             kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
         }
+
+        // if we are in asynchronous mode we need a worker pool
+        if (!endpoint.isSynchronous() && workerPool == null) {
+            workerPool = endpoint.createProducerExecutor();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+        }
+
+        if (shutdownWorkerPool && workerPool != null) {
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+            workerPool = null;
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -152,7 +173,14 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
                 // Just set the exception back
                 exchange.setException(e);
             }
-            callback.done(false);
+            // use worker pool to continue routing the exchange
+            // as this thread is from Kafka Callback and should not be used by Camel routing
+            workerPool.submit(new Runnable() {
+                @Override
+                public void run() {
+                    callback.done(false);
+                }
+            });
         }
     }
 }