You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 09:51:10 UTC

[19/50] [abbrv] camel git commit: CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.

CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af2a9677
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af2a9677
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af2a9677

Branch: refs/heads/kube-lb
Commit: af2a9677c59e6199381d0807c9ed2981d6dc3771
Parents: 1428ccf
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 11:20:03 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200

----------------------------------------------------------------------
 components/camel-kafka/src/main/docs/kafka.adoc | 27 +++++++++++-
 .../camel/component/kafka/KafkaComponent.java   | 21 ++++++++++
 .../component/kafka/KafkaConfiguration.java     | 43 ++++++++++++++++++++
 .../camel/component/kafka/KafkaEndpoint.java    | 31 +++++++++++++-
 .../camel/component/kafka/KafkaProducer.java    | 40 +++++++++++++++---
 5 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 7fdec49..2811a62 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -59,16 +59,33 @@ Options (Camel 2.16 or older)
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 
+
+
 // component options: START
-The Kafka component has no options.
+The Kafka component supports 1 options which are listed below.
+
+
+
+{% raw %}
+[width="100%",cols="2s,1m,8",options="header"]
+|=======================================================================
+| Name | Java Type | Description
+| workerPool | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
+|=======================================================================
+{% endraw %}
 // component options: END
 
 
 
 
 
+
+
+
+
+
 // endpoint options: START
-The Kafka component supports 71 endpoint options which are listed below:
+The Kafka component supports 74 endpoint options which are listed below:
 
 {% raw %}
 [width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -143,6 +160,9 @@ The Kafka component supports 71 endpoint options which are listed below:
 | sslTruststoreLocation | producer |  | String | The location of the trust store file.
 | sslTruststorePassword | producer |  | String | The password for the trust store file.
 | sslTruststoreType | producer | JKS | String | The file format of the trust store file. Default value is JKS.
+| workerPool | producer |  | ExecutorService | To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing.
+| workerPoolCoreSize | producer | 10 | Integer | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing.
+| workerPoolMaxSize | producer | 20 | Integer | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing.
 | exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
 | synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
 |=======================================================================
@@ -152,6 +172,9 @@ The Kafka component supports 71 endpoint options which are listed below:
 
 
 
+
+
+
 For more information about Producer/Consumer configuration:
 
 http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]

http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 2981b3f..76ef55f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
 import org.apache.camel.impl.UriEndpointComponent;
 
 public class KafkaComponent extends UriEndpointComponent {
@@ -31,6 +33,8 @@ public class KafkaComponent extends UriEndpointComponent {
         super(context, KafkaEndpoint.class);
     }
 
+    private ExecutorService workerPool;
+
     @Override
     protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
@@ -38,8 +42,25 @@ public class KafkaComponent extends UriEndpointComponent {
         if (brokers != null) {
             endpoint.getConfiguration().setBrokers(brokers);
         }
+
+        // configure component options before endpoint properties which can override from params
+        endpoint.getConfiguration().setWorkerPool(workerPool);
+
         setProperties(endpoint, params);
         return endpoint;
     }
 
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    /**
+     * To use a shared custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     * If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
+     */
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index e0580e9..1a068c3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -17,7 +17,9 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
 
+import org.apache.camel.Exchange;
 import org.apache.camel.spi.Metadata;
 import org.apache.camel.spi.UriParam;
 import org.apache.camel.spi.UriParams;
@@ -102,6 +104,13 @@ public class KafkaConfiguration {
     @UriParam(label = "producer", defaultValue = "100")
     private Integer retryBackoffMs = 100;
 
+    @UriParam(label = "producer")
+    private ExecutorService workerPool;
+    @UriParam(label = "producer", defaultValue = "10")
+    private Integer workerPoolCoreSize = 10;
+    @UriParam(label = "producer", defaultValue = "20")
+    private Integer workerPoolMaxSize = 20;
+
     //Async producer config
     @UriParam(label = "producer", defaultValue = "10000")
     private Integer queueBufferingMaxMessages = 10000;
@@ -1158,5 +1167,39 @@ public class KafkaConfiguration {
         this.seekToBeginning = seekToBeginning;
     }
 
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    /**
+     * To use a custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     */
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
+    }
+
+    public Integer getWorkerPoolCoreSize() {
+        return workerPoolCoreSize;
+    }
+
+    /**
+     * Number of core threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     */
+    public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+        this.workerPoolCoreSize = workerPoolCoreSize;
+    }
 
+    public Integer getWorkerPoolMaxSize() {
+        return workerPoolMaxSize;
+    }
+
+    /**
+     * Maximum number of threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+     * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+     */
+    public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+        this.workerPoolMaxSize = workerPoolMaxSize;
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1c239c8..aabd020 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -86,7 +86,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
     }
 
     public ExecutorService createExecutor() {
-        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+        return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+    }
+
+    public ExecutorService createProducerExecutor() {
+        int core = getConfiguration().getWorkerPoolCoreSize();
+        int max = getConfiguration().getWorkerPoolMaxSize();
+        return getCamelContext().getExecutorServiceManager().newThreadPool(this, "KafkaProducer[" + configuration.getTopic() + "]", core, max);
     }
 
     public Exchange createKafkaExchange(ConsumerRecord record) {
@@ -658,4 +664,27 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
         this.bridgeEndpoint = bridgeEndpoint;
     }
 
+    public void setWorkerPool(ExecutorService workerPool) {
+        configuration.setWorkerPool(workerPool);
+    }
+
+    public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+        configuration.setWorkerPoolMaxSize(workerPoolMaxSize);
+    }
+
+    public Integer getWorkerPoolMaxSize() {
+        return configuration.getWorkerPoolMaxSize();
+    }
+
+    public Integer getWorkerPoolCoreSize() {
+        return configuration.getWorkerPoolCoreSize();
+    }
+
+    public ExecutorService getWorkerPool() {
+        return configuration.getWorkerPool();
+    }
+
+    public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+        configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6c432d6..0f783b5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -17,6 +17,7 @@
 package org.apache.camel.component.kafka;
 
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
@@ -33,6 +34,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
 
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
+    private ExecutorService workerPool;
+    private boolean shutdownWorkerPool;
 
     public KafkaProducer(KafkaEndpoint endpoint) {
         super(endpoint);
@@ -58,11 +61,12 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         this.kafkaProducer = kafkaProducer;
     }
 
-    @Override
-    protected void doStop() throws Exception {
-        if (kafkaProducer != null) {
-            kafkaProducer.close();
-        }
+    public ExecutorService getWorkerPool() {
+        return workerPool;
+    }
+
+    public void setWorkerPool(ExecutorService workerPool) {
+        this.workerPool = workerPool;
     }
 
     @Override
@@ -71,6 +75,23 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         if (kafkaProducer == null) {
             kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
         }
+
+        // if we are in asynchronous mode we need a worker pool
+        if (!endpoint.isSynchronous() && workerPool == null) {
+            workerPool = endpoint.createProducerExecutor();
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        if (kafkaProducer != null) {
+            kafkaProducer.close();
+        }
+
+        if (shutdownWorkerPool && workerPool != null) {
+            endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+            workerPool = null;
+        }
     }
 
     @SuppressWarnings("unchecked")
@@ -152,7 +173,14 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
                 // Just set the exception back
                 exchange.setException(e);
             }
-            callback.done(false);
+            // use worker pool to continue routing the exchange
+            // as this thread is from Kafka Callback and should not be used by Camel routing
+            workerPool.submit(new Runnable() {
+                @Override
+                public void run() {
+                    callback.done(false);
+                }
+            });
         }
     }
 }