You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/05/15 08:22:01 UTC

[2/3] camel git commit: CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.

CAMEL-9957: Use a worker pool for processing kafka callbacks to not streal its io thread to do camel routing. See the kafka javadoc/documentation.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d9f7fdab
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d9f7fdab
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d9f7fdab

Branch: refs/heads/master
Commit: d9f7fdabfe4491b71862687c0684dfdfbf936da1
Parents: cd2f4af
Author: Claus Ibsen <da...@apache.org>
Authored: Sun May 15 10:21:38 2016 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Sun May 15 10:21:38 2016 +0200

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaComponent.java   |  4 +-
 .../camel/component/kafka/KafkaProducer.java    | 39 +++++++++-----------
 .../component/kafka/KafkaProducerTest.java      |  1 -
 3 files changed, 19 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
index 76ef55f..2e11b22 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaComponent.java
@@ -25,6 +25,8 @@ import org.apache.camel.impl.UriEndpointComponent;
 
 public class KafkaComponent extends UriEndpointComponent {
 
+    private ExecutorService workerPool;
+
     public KafkaComponent() {
         super(KafkaEndpoint.class);
     }
@@ -33,8 +35,6 @@ public class KafkaComponent extends UriEndpointComponent {
         super(context, KafkaEndpoint.class);
     }
 
-    private ExecutorService workerPool;
-
     @Override
     protected KafkaEndpoint createEndpoint(String uri, String remaining, Map<String, Object> params) throws Exception {
         KafkaEndpoint endpoint = new KafkaEndpoint(uri, this);

http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 0f783b5..2138df1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -20,17 +20,16 @@ import java.util.Properties;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
-import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelException;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
-import org.apache.camel.impl.DefaultProducer;
+import org.apache.camel.impl.DefaultAsyncProducer;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 
-public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
+public class KafkaProducer extends DefaultAsyncProducer {
 
     private org.apache.kafka.clients.producer.KafkaProducer kafkaProducer;
     private final KafkaEndpoint endpoint;
@@ -79,6 +78,8 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         // if we are in asynchronous mode we need a worker pool
         if (!endpoint.isSynchronous() && workerPool == null) {
             workerPool = endpoint.createProducerExecutor();
+            // we create a thread pool so we should also shut it down
+            shutdownWorkerPool = true;
         }
     }
 
@@ -127,34 +128,28 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
     @SuppressWarnings("unchecked")
     public void process(Exchange exchange) throws Exception {
         ProducerRecord record = createRecorder(exchange);
-        // Just send out the record in the sync way
         kafkaProducer.send(record).get();
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        // force processing synchronously using different api
-        if (endpoint.isSynchronous()) {
-            try {
+        try {
+            if (endpoint.isSynchronous()) {
+                // force process using synchronous call on kafka
                 process(exchange);
-            } catch (Throwable e) {
-                exchange.setException(e);
+            } else {
+                ProducerRecord record = createRecorder(exchange);
+                kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
+                // return false to process asynchronous
+                return false;
             }
-            callback.done(true);
-            return true;
-        }
-
-        try {
-            ProducerRecord record = createRecorder(exchange);
-            kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
-            // Finishing the processing in an async way
-            return false;
         } catch (Exception ex) {
-            // Just set the exception back to the client
             exchange.setException(ex);
-            callback.done(true);
-            return true;
         }
+
+        callback.done(true);
+        return true;
     }
 
     private final class KafkaProducerCallBack implements Callback {
@@ -170,7 +165,6 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
         @Override
         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
             if (e != null) {
-                // Just set the exception back
                 exchange.setException(e);
             }
             // use worker pool to continue routing the exchange
@@ -183,4 +177,5 @@ public class KafkaProducer extends DefaultProducer implements AsyncProcessor {
             });
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/d9f7fdab/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 40f2113..dcd3365 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -52,7 +52,6 @@ public class KafkaProducerTest {
         endpoint.setBrokers("broker1:1234,broker2:4567");
         producer = new KafkaProducer(endpoint);
 
-
         RecordMetadata rm = new RecordMetadata(null, 1, 1);
         Future future = Mockito.mock(Future.class);
         Mockito.when(future.get()).thenReturn(rm);