You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2016/06/16 19:27:21 UTC

[1/2] camel git commit: [CAMEL-10065] Update camel-kafka to support Iterable and Iterator

Repository: camel
Updated Branches:
  refs/heads/camel-2.17.x 86cfd9df9 -> c4236142c


[CAMEL-10065] Update camel-kafka to support Iterable and Iterator


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b7eed97f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b7eed97f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b7eed97f

Branch: refs/heads/camel-2.17.x
Commit: b7eed97f16842e4ef4816bd921793c070061e058
Parents: 86cfd9d
Author: Daniel Kulp <dk...@apache.org>
Authored: Thu Jun 16 13:09:04 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jun 16 15:19:31 2016 -0400

----------------------------------------------------------------------
 .../camel/component/kafka/KafkaProducer.java    | 96 ++++++++++++++++----
 1 file changed, 76 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/b7eed97f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 1254e97..e2f25fb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,8 +16,14 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelException;
@@ -96,7 +102,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
     }
 
     @SuppressWarnings("unchecked")
-    protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
+    protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
         String topic = endpoint.getTopic();
         if (!endpoint.isBridgeEndpoint()) {
             topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
@@ -104,14 +110,39 @@ public class KafkaProducer extends DefaultAsyncProducer {
         if (topic == null) {
             throw new CamelExchangeException("No topic key set", exchange);
         }
-        Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
-        boolean hasPartitionKey = partitionKey != null;
+        final Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+        final boolean hasPartitionKey = partitionKey != null;
 
-        Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
-        boolean hasMessageKey = messageKey != null;
+        final Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
+        final boolean hasMessageKey = messageKey != null;
 
         Object msg = exchange.getIn().getBody();
+        Iterator<Object> iterator = null;
+        if (msg instanceof Iterable) {
+            iterator = ((Iterable<Object>)msg).iterator();
+        } else if (msg instanceof Iterator) {
+            iterator = (Iterator<Object>)msg;
+        }
+        if (iterator != null) {
+            final Iterator<Object> msgList = iterator;
+            final String msgTopic = topic;
+            return new Iterator<ProducerRecord>() {
+                @Override
+                public boolean hasNext() {
+                    return msgList.hasNext();
+                }
 
+                @Override
+                public ProducerRecord next() {
+                    if (hasPartitionKey && hasMessageKey) {
+                        return new ProducerRecord(msgTopic, new Integer(partitionKey.toString()), messageKey, msgList.next());
+                    } else if (hasMessageKey) {
+                        return new ProducerRecord(msgTopic, messageKey, msgList.next());
+                    }
+                    return new ProducerRecord(msgTopic, msgList.next());
+                }
+            };
+        }
         ProducerRecord record;
         if (hasPartitionKey && hasMessageKey) {
             record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg);
@@ -121,25 +152,35 @@ public class KafkaProducer extends DefaultAsyncProducer {
             log.warn("No message key or partition key set");
             record = new ProducerRecord(topic, msg);
         }
-        return record;
+        return Collections.singletonList(record).iterator();
     }
 
     @Override
     @SuppressWarnings("unchecked")
     // Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it
     public void process(Exchange exchange) throws Exception {
-        ProducerRecord record = createRecorder(exchange);
-        kafkaProducer.send(record).get();
+        Iterator<ProducerRecord> c = createRecorder(exchange);
+        List<Future<ProducerRecord>> futures = new LinkedList<Future<ProducerRecord>>();
+        while (c.hasNext()) {
+            futures.add(kafkaProducer.send(c.next()));
+        }
+        for (Future<ProducerRecord> f : futures) {
+            //wait for them all to be sent
+            f.get();
+        }
     }
 
     @Override
     @SuppressWarnings("unchecked")
     public boolean process(Exchange exchange, AsyncCallback callback) {
         try {
-            ProducerRecord record = createRecorder(exchange);
-            kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
-            // return false to process asynchronous
-            return false;
+            Iterator<ProducerRecord> c = createRecorder(exchange);
+            KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
+            while (c.hasNext()) {
+                cb.increment();
+                kafkaProducer.send(c.next(), cb);
+            }
+            return cb.allSent();
         } catch (Exception ex) {
             exchange.setException(ex);
         }
@@ -151,25 +192,40 @@ public class KafkaProducer extends DefaultAsyncProducer {
 
         private final Exchange exchange;
         private final AsyncCallback callback;
+        private final AtomicInteger count = new AtomicInteger(1);
 
         KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
             this.exchange = exchange;
             this.callback = callback;
         }
 
+        void increment() {
+            count.incrementAndGet();
+        }
+        boolean allSent() {
+            if (count.decrementAndGet() == 0) {
+                //was able to get all the work done while queuing the requests
+                callback.done(true);
+                return true;
+            }
+            return false;
+        }
+        
         @Override
         public void onCompletion(RecordMetadata recordMetadata, Exception e) {
             if (e != null) {
                 exchange.setException(e);
             }
-            // use worker pool to continue routing the exchange
-            // as this thread is from Kafka Callback and should not be used by Camel routing
-            workerPool.submit(new Runnable() {
-                @Override
-                public void run() {
-                    callback.done(false);
-                }
-            });
+            if (count.decrementAndGet() == 0) {
+                // use worker pool to continue routing the exchange
+                // as this thread is from Kafka Callback and should not be used by Camel routing
+                workerPool.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        callback.done(false);
+                    }
+                });
+            }
         }
     }
 


[2/2] camel git commit: [CAMEL-10065] Add a test case

Posted by dk...@apache.org.
[CAMEL-10065] Add a test case


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4236142
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4236142
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4236142

Branch: refs/heads/camel-2.17.x
Commit: c4236142c8c4e8778343a73db9331e34569a69a5
Parents: b7eed97
Author: Daniel Kulp <dk...@apache.org>
Authored: Thu Jun 16 13:23:37 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jun 16 15:19:38 2016 -0400

----------------------------------------------------------------------
 .../component/kafka/KafkaProducerFullTest.java  | 29 ++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c4236142/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 0bb4740..d5b65fa 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -125,6 +127,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
     }
 
+    
+    @Test
+    public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
+
+        List<String> msgs = new ArrayList<String>();
+        for (int x = 0; x < messageInTopic; x++) {
+            msgs.add("Message " + x);
+        }
+        
+        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
+        msgs = new ArrayList<String>();
+        for (int x = 0; x < messageInOtherTopic; x++) {
+            msgs.add("Other Message " + x);
+        }
+        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+    }
+    
     @Test
     public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
         int messageInTopic = 10;