You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/15 08:22:01 UTC
[2/3] camel git commit: CAMEL-9957: Use a worker pool for processing
kafka callbacks to not streal its io thread to do camel routing. See the
kafka javadoc/documentation.
CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d9f7fdab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d9f7fdab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d9f7fdab
Branch: refs/heads/master
Commit: d9f7fdabfe4491b71862687c0684dfdfbf936da1
Parents: cd2f4af
Author: Claus Ibsen <da...@apache.org>
Authored: Sun May 15 10:21:38 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun May 15 10:21:38 2016 +0200
----------------------------------------------------------------------
.../camel/component/kafka/KafkaComponent.java | 4 +-
.../camel/component/kafka/KafkaProducer.java | 39 +++++++++-----------
.../component/kafka/KafkaProducerTest.java | 1 -
3 files changed, 19 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 76ef55f..2e11b22 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -25,6 +25,8 @@ import org.apache.camel.impl.UriEndpointComponent;
public class KafkaComponent extends UriEndpointComponent {
+ private ExecutorService workerPool;
+
public KafkaComponent() {
super(KafkaEndpoint.class);
}
@@ -33,8 +35,6 @@ public class KafkaComponent extends UriEndpointComponent {
super(context, KafkaEndpoint.class);
}
- private ExecutorService workerPool;
-
@Override
protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);
http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0f783b5..2138df1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -20,17 +20,16 @@ import java.util.Properties;
import java.util.concurrent.ExecutorService;
import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelException;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
+public class KafkaProducer extends DefaultAsyncProducer {
private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
private final KafkaEndpoint endpoint;
@@ -79,6 +78,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
// if we are in asynchronous mode we need a worker pool
if (!endpoint.isSynchronous() && workerPool == null) {
workerPool = endpoint.createProducerExecutor();
+ // we create a thread pool so we should also shut it down
+ shutdownWorkerPool = true;
}
}
@@ -127,34 +128,28 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
@SuppressWarnings("unchecked")
public void process(Exchange exchange) throws Exception {
ProducerRecord record = createRecorder(exchange);
- // Just send out the record in the sync way
kafkaProducer.send(record).get();
}
@Override
+ @SuppressWarnings("unchecked")
public boolean process(Exchange exchange, AsyncCallback callback) {
- // force processing synchronously using different api
- if (endpoint.isSynchronous()) {
- try {
+ try {
+ if (endpoint.isSynchronous()) {
+ // force process using synchronous call on kafka
process(exchange);
- } catch (Throwable e) {
- exchange.setException(e);
+ } else {
+ ProducerRecord record = createRecorder(exchange);
+ kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
+ // return false to process asynchronous
+ return false;
}
- callback.done(true);
- return true;
- }
-
- try {
- ProducerRecord record = createRecorder(exchange);
- kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
- // Finishing the processing in an async way
- return false;
} catch (Exception ex) {
- // Just set the exception back to the client
exchange.setException(ex);
- callback.done(true);
- return true;
}
+
+ callback.done(true);
+ return true;
}
private final class KafkaProducerCallBack implements Callback {
@@ -170,7 +165,6 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
- // Just set the exception back
exchange.setException(e);
}
// use worker pool to continue routing the exchange
@@ -183,4 +177,5 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
});
}
}
+
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 40f2113..dcd3365 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -52,7 +52,6 @@ public class KafkaProducerTest {
endpoint.setBrokers("broker1:1234,broker2:4567");
producer = new KafkaProducer(endpoint);
-
RecordMetadata rm = new RecordMetadata(null, 1, 1);
Future future = Mockito.mock(Future.class);
Mockito.when(future.get()).thenReturn(rm);