You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/13 09:32:02 UTC
[4/5] camel git commit: CAMEL-9957: Use a worker pool for processing
kafka callbacks to not streal its io thread to do camel routing. See the
kafka javadoc/documentation.
CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/1fd632cf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/1fd632cf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/1fd632cf
Branch: refs/heads/camel-2.17.x
Commit: 1fd632cfc3741c86cc58cfa69f5935005ddb5cfa
Parents: 81e1f8a
Author: Claus Ibsen <da...@apache.org>
Authored: Fri May 13 11:20:03 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri May 13 11:29:33 2016 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaComponent.java | 21 ++++++++
.../component/kafka/KafkaConfiguration.java | 55 ++++++++++++++++++++
.../camel/component/kafka/KafkaEndpoint.java | 32 +++++++++++-
.../camel/component/kafka/KafkaProducer.java | 40 +++++++++++---
4 files changed, 141 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 2981b3f..76ef55f 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -17,8 +17,10 @@
package org.apache.camel.component.kafka;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
import org.apache.camel.impl.UriEndpointComponent;
public class KafkaComponent extends UriEndpointComponent {
@@ -31,6 +33,8 @@ public class KafkaComponent extends UriEndpointComponent {
super(context, KafkaEndpoint.class);
}
+ private ExecutorService workerPool;
+
@Override
protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
@@ -38,8 +42,25 @@ public class KafkaComponent extends UriEndpointComponent {
if (brokers != null) {
endpoint.getConfiguration().setBrokers(brokers);
}
+
+ // configure component options before endpoint properties which can override from params
+ endpoint.getConfiguration().setWorkerPool(workerPool);
+
setProperties(endpoint, params);
return endpoint;
}
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ /**
+ * To use a shared custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ * If using this option then you must handle the lifecycle of the thread pool to shut the pool down when no longer needed.
+ */
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
index 6b67fc5..1a98b28 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaConfiguration.java
@@ -17,7 +17,9 @@
package org.apache.camel.component.kafka;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import org.apache.camel.Exchange;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriParam;
import org.apache.camel.spi.UriParams;
@@ -100,6 +102,13 @@ public class KafkaConfiguration {
@UriParam(label = "producer", defaultValue = "100")
private Integer retryBackoffMs = 100;
+ @UriParam(label = "producer")
+ private ExecutorService workerPool;
+ @UriParam(label = "producer", defaultValue = "10")
+ private Integer workerPoolCoreSize = 10;
+ @UriParam(label = "producer", defaultValue = "20")
+ private Integer workerPoolMaxSize = 20;
+
//Async producer config
@UriParam(label = "producer", defaultValue = "10000")
private Integer queueBufferingMaxMessages = 10000;
@@ -1145,4 +1154,50 @@ public class KafkaConfiguration {
this.valueDeserializer = valueDeserializer;
}
+ public boolean isSeekToBeginning() {
+ return seekToBeginning;
+ }
+
+ /**
+ * If the option is true, then KafkaConsumer will read from beginning on startup.
+ */
+ public void setSeekToBeginning(boolean seekToBeginning) {
+ this.seekToBeginning = seekToBeginning;
+ }
+
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ /**
+ * To use a custom worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ */
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
+ }
+
+ public Integer getWorkerPoolCoreSize() {
+ return workerPoolCoreSize;
+ }
+
+ /**
+ * Number of core threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ */
+ public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+ this.workerPoolCoreSize = workerPoolCoreSize;
+ }
+
+ public Integer getWorkerPoolMaxSize() {
+ return workerPoolMaxSize;
+ }
+
+ /**
+ * Maximum number of threads for the worker pool for continue routing {@link Exchange} after kafka server has acknowledge
+ * the message that was sent to it from {@link KafkaProducer} using asynchronous non-blocking processing.
+ */
+ public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+ this.workerPoolMaxSize = workerPoolMaxSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
index 369537a..d995475 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaEndpoint.java
@@ -86,7 +86,13 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
}
public ExecutorService createExecutor() {
- return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaTopic[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+ return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this, "KafkaConsumer[" + configuration.getTopic() + "]", configuration.getConsumerStreams());
+ }
+
+ public ExecutorService createProducerExecutor() {
+ int core = getConfiguration().getWorkerPoolCoreSize();
+ int max = getConfiguration().getWorkerPoolMaxSize();
+ return getCamelContext().getExecutorServiceManager().newThreadPool(this, "KafkaProducer[" + configuration.getTopic() + "]", core, max);
}
public Exchange createKafkaExchange(ConsumerRecord record) {
@@ -649,4 +655,28 @@ public class KafkaEndpoint extends DefaultEndpoint implements MultipleConsumersS
public void setBridgeEndpoint(boolean bridgeEndpoint) {
this.bridgeEndpoint = bridgeEndpoint;
}
+
+ public void setWorkerPool(ExecutorService workerPool) {
+ configuration.setWorkerPool(workerPool);
+ }
+
+ public void setWorkerPoolMaxSize(Integer workerPoolMaxSize) {
+ configuration.setWorkerPoolMaxSize(workerPoolMaxSize);
+ }
+
+ public Integer getWorkerPoolMaxSize() {
+ return configuration.getWorkerPoolMaxSize();
+ }
+
+ public Integer getWorkerPoolCoreSize() {
+ return configuration.getWorkerPoolCoreSize();
+ }
+
+ public ExecutorService getWorkerPool() {
+ return configuration.getWorkerPool();
+ }
+
+ public void setWorkerPoolCoreSize(Integer workerPoolCoreSize) {
+ configuration.setWorkerPoolCoreSize(workerPoolCoreSize);
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/1fd632cf/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 6c432d6..0f783b5 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.kafka;
import java.util.Properties;
+import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
@@ -33,6 +34,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
+ private ExecutorService workerPool;
+ private boolean shutdownWorkerPool;
public KafkaProducer(KafkaEndpoint endpoint) {
super(endpoint);
@@ -58,11 +61,12 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
this.kafkaProducer = kafkaProducer;
}
- @Override
- protected void doStop() throws Exception {
- if (kafkaProducer != null) {
- kafkaProducer.close();
- }
+ public ExecutorService getWorkerPool() {
+ return workerPool;
+ }
+
+ public void setWorkerPool(ExecutorService workerPool) {
+ this.workerPool = workerPool;
}
@Override
@@ -71,6 +75,23 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
if (kafkaProducer == null) {
kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
}
+
+ // if we are in asynchronous mode we need a worker pool
+ if (!endpoint.isSynchronous() && workerPool == null) {
+ workerPool = endpoint.createProducerExecutor();
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ if (kafkaProducer != null) {
+ kafkaProducer.close();
+ }
+
+ if (shutdownWorkerPool && workerPool != null) {
+ endpoint.getCamelContext().getExecutorServiceManager().shutdown(workerPool);
+ workerPool = null;
+ }
}
@SuppressWarnings("unchecked")
@@ -152,7 +173,14 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
// Just set the exception back
exchange.setException(e);
}
- callback.done(false);
+ // use worker pool to continue routing the exchange
+ // as this thread is from Kafka Callback and should not be used by Camel routing
+ workerPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ callback.done(false);
+ }
+ });
}
}
}