You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/16 09:51:10 UTC
[19/50] [abbrv] camel git commit: CAMEL-9957: Use a worker pool for
processing kafka callbacks to not streal its io thread to do camel routing.
See the kafka javadoc/documentation.
CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/af2a9677
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/af2a9677
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/af2a9677
Branch: refs/heads/kube-lb
Commit: af2a9677c59e6199381d0807c9ed2981d6dc3771
Parents: 1428ccf
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 11:20:03 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Mon May 16 09:59:33 2016 +0200
----------------------------------------------------------------------
components/camel-kafka/src/main/docs/kafka.adoc | 27 +++++++++++-
.../camel/component/kafka/KafkaComponent.java | 21 ++++++++++
.../component/kafka/KafkaConfiguration.java | 43 ++++++++++++++++++++
.../camel/component/kafka/KafkaEndpoint.java | 31 +++++++++++++-
.../camel/component/kafka/KafkaProducer.java | 40 +++++++++++++++---
5 files changed, 153 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/docs/kafka.adoc
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/docs/kafka.adoc b/components/camel-kafka/src/main/docs/kafka.adoc
index 7fdec49..2811a62 100644
--- a/components/camel-kafka/src/main/docs/kafka.adoc
+++ b/components/camel-kafka/src/main/docs/kafka.adoc
@@ -59,16 +59,33 @@ Options (Camel 2.16 or older)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+
// component options: START
-The Kafka component has no options.
+The Kafka component supports 1 options which are listed below.
+
+
+
+{% raw %}
+[width="100%",cols="2s,1m,8",options="header"]
+|=======================================================================
+| Name | Java Type | Description
+| workerPool | ExecutorService | To use a shared custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing. If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
+|=======================================================================
+{% endraw %}
// component options: END
+
+
+
+
+
// endpoint options: START
-The Kafka component supports 71 endpoint options which are listed below:
+The Kafka component supports 74 endpoint options which are listed below:
{% raw %}
[width="100%",cols="2s,1,1m,1m,5",options="header"]
@@ -143,6 +160,9 @@ The Kafka component supports 71 endpoint options which are listed below:
| sslTruststoreLocation | producer | | String | The location of the trust store file.
| sslTruststorePassword | producer | | String | The password for the trust store file.
| sslTruststoreType | producer | JKS | String | The file format of the trust store file. Default value is JKS.
+| workerPool | producer | | ExecutorService | To use a custom worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing.
+| workerPoolCoreSize | producer | 10 | Integer | Number of core threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing.
+| workerPoolMaxSize | producer | 20 | Integer | Maximum number of threads for the worker pool for continue routing Exchange after kafka server has acknowledge the message that was sent to it from KafkaProducer using asynchronous non-blocking processing.
| exchangePattern | advanced | InOnly | ExchangePattern | Sets the default exchange pattern when creating an exchange
| synchronous | advanced | false | boolean | Sets whether synchronous processing should be strictly used or Camel is allowed to use asynchronous processing (if supported).
|=======================================================================
@@ -152,6 +172,9 @@ The Kafka component supports 71 endpoint options which are listed below:
+
+
+
For more information about Producer/Consumer configuration:
http://kafka.apache.org/documentation.html#newconsumerconfigs[http://kafka.apache.org/documentation.html#newconsumerconfigs]
http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 2981b3f..76ef55f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,8 +17,10 @@
package org.apache.camel.component.kafka;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.UriEndpointComponent;
public class KafkaComponent extends UriEndpointComponent {
@@ -31,6 +33,8 @@ public class KafkaComponent extends UriEndpointComponent {
super(context, KafkaEndpoint.class);
}
+ private ExecutorService workerPool;
+
@Override
protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
@@ -38,8 +42,25 @@ public class KafkaComponent extends UriEndpointComponent {
if (brokers != null) {
endpoint.getConfiguration().setBrokers(brokers);
}
+
+ // configure component options before endpoint properties which can override from params
+ endpoint.getConfiguration().setWorkerPool(workerPool);
+
setProperties(endpoint, params);
return endpoint;
}
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ /**
+ * To use a shared custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ * If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
+ */
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index e0580e9..1a068c3 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -17,7 +17,9 @@
package org.apache.camel.component.kafka;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import org.apache.camel.Exchange;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
@@ -102,6 +104,13 @@ public class KafkaConfiguration {
@UriParam(label = "producer", defaultValue = "100")
private Integer retryBackoffMs = 100;
+ @UriParam(label = "producer")
+ private ExecutorService workerPool;
+ @UriParam(label = "producer", defaultValue = "10")
+ private Integer workerPoolCoreSize = 10;
+ @UriParam(label = "producer", defaultValue = "20")
+ private Integer workerPoolMaxSize = 20;
+
//Async producer config
@UriParam(label = "producer", defaultValue = "10000")
private Integer queueBufferingMaxMessages = 10000;
@@ -1158,5 +1167,39 @@ public class KafkaConfiguration {
this.seekToBeginning = seekToBeginning;
}
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ /**
+ * To use a custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ */
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ public Integer getWorkerPoolCoreSize() {
+ return workerPoolCoreSize;
+ }
+
+ /**
+ * Number of core threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ */
+ public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+ this.workerPoolCoreSize = workerPoolCoreSize;
+ }
+ public Integer getWorkerPoolMaxSize() {
+ return workerPoolMaxSize;
+ }
+
+ /**
+ * Maximum number of threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ */
+ public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+ this.workerPoolMaxSize = workerPoolMaxSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 1c239c8..aabd020 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -86,7 +86,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
}
public ExecutorService createExecutor() {
- return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+ return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+ }
+
+ public ExecutorService createProducerExecutor() {
+ int core = getConfiguration().getWorkerPoolCoreSize();
+ int max = getConfiguration().getWorkerPoolMaxSize();
+ return getCamelContext().getExecutorServiceManager().newThreadPool(this, "KafkaProducer[" + configuration.getTopic() + "]", core, max);
}
public Exchange createKafkaExchange(ConsumerRecord record) {
@@ -658,4 +664,27 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
this.bridgeEndpoint = bridgeEndpoint;
}
+ public void setWorkerPool(ExecutorService workerPool) {
+ configuration.setWorkerPool(workerPool);
+ }
+
+ public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+ configuration.setWorkerPoolMaxSize(workerPoolMaxSize);
+ }
+
+ public Integer getWorkerPoolMaxSize() {
+ return configuration.getWorkerPoolMaxSize();
+ }
+
+ public Integer getWorkerPoolCoreSize() {
+ return configuration.getWorkerPoolCoreSize();
+ }
+
+ public ExecutorService getWorkerPool() {
+ return configuration.getWorkerPool();
+ }
+
+ public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+ configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/af2a9677/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6c432d6..0f783b5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -33,6 +34,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
+ private ExecutorService workerPool;
+ private boolean shutdownWorkerPool;
public KafkaProducer(KafkaEndpoint endpoint) {
super(endpoint);
@@ -58,11 +61,12 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
this.kafkaProducer = kafkaProducer;
}
- @Override
- protected void doStop() throws Exception {
- if (kafkaProducer != null) {
- kafkaProducer.close();
- }
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
}
@Override
@@ -71,6 +75,23 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
if (kafkaProducer == null) {
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
+
+ // if we are in asynchronous mode we need a worker pool
+ if (!endpoint.isSynchronous() && workerPool == null) {
+ workerPool = endpoint.createProducerExecutor();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (kafkaProducer != null) {
+ kafkaProducer.close();
+ }
+
+ if (shutdownWorkerPool && workerPool != null) {
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+ workerPool = null;
+ }
}
@SuppressWarnings("unchecked")
@@ -152,7 +173,14 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
// Just set the exception back
exchange.setException(e);
}
- callback.done(false);
+ // use worker pool to continue routing the exchange
+ // as this thread is from Kafka Callback and should not be used by Camel routing
+ workerPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ callback.done(false);
+ }
+ });
}
}
}