You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by dk...@apache.org on 2016/06/16 19:27:22 UTC

[2/2] camel git commit: [CAMEL-10065] Add a test case

[CAMEL-10065] Add a test case


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4236142
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4236142
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4236142

Branch: refs/heads/camel-2.17.x
Commit: c4236142c8c4e8778343a73db9331e34569a69a5
Parents: b7eed97
Author: Daniel Kulp <dk...@apache.org>
Authored: Thu Jun 16 13:23:37 2016 -0400
Committer: Daniel Kulp <dk...@apache.org>
Committed: Thu Jun 16 15:19:38 2016 -0400

----------------------------------------------------------------------
 .../component/kafka/KafkaProducerFullTest.java  | 29 ++++++++++++++++++++
 1 file changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c4236142/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 0bb4740..d5b65fa 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.kafka;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
@@ -125,6 +127,33 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
     }
 
+    
+    @Test
+    public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException {
+        int messageInTopic = 10;
+        int messageInOtherTopic = 5;
+
+        CountDownLatch messagesLatch = new CountDownLatch(messageInTopic + messageInOtherTopic);
+
+        List<String> msgs = new ArrayList<String>();
+        for (int x = 0; x < messageInTopic; x++) {
+            msgs.add("Message " + x);
+        }
+        
+        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
+        msgs = new ArrayList<String>();
+        for (int x = 0; x < messageInOtherTopic; x++) {
+            msgs.add("Other Message " + x);
+        }
+        sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1", KafkaConstants.TOPIC, TOPIC_STRINGS_IN_HEADER);
+
+        createKafkaMessageConsumer(stringsConsumerConn, TOPIC_STRINGS, TOPIC_STRINGS_IN_HEADER, messagesLatch);
+
+        boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
+
+        assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+    }
+    
     @Test
     public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
         int messageInTopic = 10;