You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2016/06/16 19:27:21 UTC
[1/2] camel git commit: [CAMEL-10065] Update camel-kafka to support
Iterable and Iterator
Repository: camel
Updated Branches:
refs/heads/camel-2.17.x 86cfd9df9 -> c4236142c
[CAMEL-10065] Update camel-kafka to support Iterable and Iterator
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b7eed97f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b7eed97f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b7eed97f
Branch: refs/heads/camel-2.17.x
Commit: b7eed97f16842e4ef4816bd921793c070061e058
Parents: 86cfd9d
Author: Daniel Kulp <dk...@apache.org>
Authored: Thu Jun 16 13:09:04 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jun 16 15:19:31 2016 -0400
----------------------------------------------------------------------
.../camel/component/kafka/KafkaProducer.java | 96 ++++++++++++++++----
1 file changed, 76 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b7eed97f/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
index 1254e97..e2f25fb 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaProducer.java
@@ -16,8 +16,14 @@
*/
package org.apache.camel.component.kafka;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelException;
@@ -96,7 +102,7 @@ public class KafkaProducer extends DefaultAsyncProducer {
}
@SuppressWarnings("unchecked")
- protected ProducerRecord createRecorder(Exchange exchange) throws CamelException {
+ protected Iterator<ProducerRecord> createRecorder(Exchange exchange) throws CamelException {
String topic = endpoint.getTopic();
if (!endpoint.isBridgeEndpoint()) {
topic = exchange.getIn().getHeader(KafkaConstants.TOPIC, topic, String.class);
@@ -104,14 +110,39 @@ public class KafkaProducer extends DefaultAsyncProducer {
if (topic == null) {
throw new CamelExchangeException("No topic key set", exchange);
}
- Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
- boolean hasPartitionKey = partitionKey != null;
+ final Object partitionKey = exchange.getIn().getHeader(KafkaConstants.PARTITION_KEY);
+ final boolean hasPartitionKey = partitionKey != null;
- Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
- boolean hasMessageKey = messageKey != null;
+ final Object messageKey = exchange.getIn().getHeader(KafkaConstants.KEY);
+ final boolean hasMessageKey = messageKey != null;
Object msg = exchange.getIn().getBody();
+ Iterator<Object> iterator = null;
+ if (msg instanceof Iterable) {
+ iterator = ((Iterable<Object>)msg).iterator();
+ } else if (msg instanceof Iterator) {
+ iterator = (Iterator<Object>)msg;
+ }
+ if (iterator != null) {
+ final Iterator<Object> msgList = iterator;
+ final String msgTopic = topic;
+ return new Iterator<ProducerRecord>() {
+ @Override
+ public boolean hasNext() {
+ return msgList.hasNext();
+ }
+ @Override
+ public ProducerRecord next() {
+ if (hasPartitionKey && hasMessageKey) {
+ return new ProducerRecord(msgTopic, new Integer(partitionKey.toString()), messageKey, msgList.next());
+ } else if (hasMessageKey) {
+ return new ProducerRecord(msgTopic, messageKey, msgList.next());
+ }
+ return new ProducerRecord(msgTopic, msgList.next());
+ }
+ };
+ }
ProducerRecord record;
if (hasPartitionKey && hasMessageKey) {
record = new ProducerRecord(topic, new Integer(partitionKey.toString()), messageKey, msg);
@@ -121,25 +152,35 @@ public class KafkaProducer extends DefaultAsyncProducer {
log.warn("No message key or partition key set");
record = new ProducerRecord(topic, msg);
}
- return record;
+ return Collections.singletonList(record).iterator();
}
@Override
@SuppressWarnings("unchecked")
// Camel calls this method if the endpoint isSynchronous(), as the KafkaEndpoint creates a SynchronousDelegateProducer for it
public void process(Exchange exchange) throws Exception {
- ProducerRecord record = createRecorder(exchange);
- kafkaProducer.send(record).get();
+ Iterator<ProducerRecord> c = createRecorder(exchange);
+ List<Future<ProducerRecord>> futures = new LinkedList<Future<ProducerRecord>>();
+ while (c.hasNext()) {
+ futures.add(kafkaProducer.send(c.next()));
+ }
+ for (Future<ProducerRecord> f : futures) {
+ //wait for them all to be sent
+ f.get();
+ }
}
@Override
@SuppressWarnings("unchecked")
public boolean process(Exchange exchange, AsyncCallback callback) {
try {
- ProducerRecord record = createRecorder(exchange);
- kafkaProducer.send(record, new KafkaProducerCallBack(exchange, callback));
- // return false to process asynchronous
- return false;
+ Iterator<ProducerRecord> c = createRecorder(exchange);
+ KafkaProducerCallBack cb = new KafkaProducerCallBack(exchange, callback);
+ while (c.hasNext()) {
+ cb.increment();
+ kafkaProducer.send(c.next(), cb);
+ }
+ return cb.allSent();
} catch (Exception ex) {
exchange.setException(ex);
}
@@ -151,25 +192,40 @@ public class KafkaProducer extends DefaultAsyncProducer {
private final Exchange exchange;
private final AsyncCallback callback;
+ private final AtomicInteger count = new AtomicInteger(1);
KafkaProducerCallBack(Exchange exchange, AsyncCallback callback) {
this.exchange = exchange;
this.callback = callback;
}
+ void increment() {
+ count.incrementAndGet();
+ }
+ boolean allSent() {
+ if (count.decrementAndGet() == 0) {
+ //was able to get all the work done while queuing the requests
+ callback.done(true);
+ return true;
+ }
+ return false;
+ }
+
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
exchange.setException(e);
}
- // use worker pool to continue routing the exchange
- // as this thread is from Kafka Callback and should not be used by Camel routing
- workerPool.submit(new Runnable() {
- @Override
- public void run() {
- callback.done(false);
- }
- });
+ if (count.decrementAndGet() == 0) {
+ // use worker pool to continue routing the exchange
+ // as this thread is from Kafka Callback and should not be used by Camel routing
+ workerPool.submit(new Runnable() {
+ @Override
+ public void run() {
+ callback.done(false);
+ }
+ });
+ }
}
}
[2/2] camel git commit: [CAMEL-10065] Add a test case
Posted by dk...@apache.org.
[CAMEL-10065] Add a test case
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4236142
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4236142
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4236142
Branch: refs/heads/camel-2.17.x
Commit: c4236142c8c4e8778343a73db9331e34569a69a5
Parents: b7eed97
Author: Daniel Kulp <dk...@apache.org>
Authored: Thu Jun 16 13:23:37 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jun 16 15:19:38 2016 -0400
----------------------------------------------------------------------
.../component/kafka/KafkaProducerFullTest.java | 29 ++++++++++++++++++++
1 file changed, 29 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/c4236142/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 0bb4740..d5b65fa 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,8 +17,10 @@
package org.apache.camel.component.kafka;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
@@ -125,6 +127,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
}
+
+ @Test
+ public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException {
+ int messageInTopic = 10;
+ int messageInOtherTopic = 5;
+
+ CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
+
+ List<String> msgs = new ArrayList<String>();
+ for (int x = 0; x < messageInTopic; x++) {
+ msgs.add("Message " + x);
+ }
+
+ sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
+ msgs = new ArrayList<String>();
+ for (int x = 0; x < messageInOtherTopic; x++) {
+ msgs.add("Other Message " + x);
+ }
+ sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+
+ createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+ boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+ assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+ }
+
@Test
public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;