You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/07/01 06:56:32 UTC

[2/6] camel git commit: Added Unit Tests.

Added Unit Tests.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a48c022c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a48c022c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a48c022c

Branch: refs/heads/master
Commit: a48c022c3d8424d65a6d23b3edf8854f0f38eaca
Parents: 8919d65
Author: Leo Prince <le...@target.com>
Authored: Tue Jun 28 13:04:23 2016 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 1 08:45:24 2016 +0200

----------------------------------------------------------------------
 .../component/kafka/KafkaProducerFullTest.java  | 54 +++++++++++++++++---
 .../component/kafka/KafkaProducerTest.java      | 48 ++++++++++++++++-
 2 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a48c022c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 30f2b13..f1189d5 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -28,13 +28,16 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
 import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -58,7 +61,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
     @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS
             + "&requestRequiredAcks=-1")
     private Endpoint toStrings;
-
+    @EndpointInject(uri = "mock:kafkaAck")
+    private MockEndpoint mockEndpoint;
     @EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1"
             + "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
             + "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
@@ -103,9 +107,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                from("direct:startStrings").to(toStrings);
+                from("direct:startStrings").to(toStrings).to(mockEndpoint);
 
-                from("direct:startBytes").to(toBytes);
+                from("direct:startBytes").to(toBytes).to(mockEndpoint);
             }
         };
     }
@@ -125,9 +129,20 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+
+        List<Exchange> exchangeList= mockEndpoint.getExchanges();
+        assertEquals("Fifteen Exchanges are expected",exchangeList.size(),15);
+        for (Exchange exchange : exchangeList) {
+            List<RecordMetadata> recordMetaData1 =  (List<RecordMetadata>)(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals("One RecordMetadata is expected.",recordMetaData1.size(),1);
+            assertTrue("Offset is positive",recordMetaData1.get(0).offset() >= 0);
+            assertTrue("Topic Name start with 'test'",recordMetaData1.get(0).topic().startsWith("test"));
+
+        }
+
     }
 
-    
+
     @Test
     public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException {
         int messageInTopic = 10;
@@ -139,7 +154,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         for (int x = 0; x < messageInTopic; x++) {
             msgs.add("Message " + x);
         }
-        
+
         sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
         msgs = new ArrayList<String>();
         for (int x = 0; x < messageInOtherTopic; x++) {
@@ -152,8 +167,24 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+        List<Exchange> exchangeList= mockEndpoint.getExchanges();
+        assertEquals("Two Exchanges are expected",exchangeList.size(),2);
+        Exchange e1 = exchangeList.get(0);
+        List<RecordMetadata> recordMetaData1 =  (List<RecordMetadata>)(e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        assertEquals("Ten RecordMetadata is expected.",recordMetaData1.size(),10);
+        for (RecordMetadata recordMeta : recordMetaData1) {
+            assertTrue("Offset is positive",recordMeta.offset() >= 0);
+            assertTrue("Topic Name start with 'test'",recordMeta.topic().startsWith("test"));
+        }
+        Exchange e2 = exchangeList.get(1);
+        List<RecordMetadata> recordMetaData2 =  (List<RecordMetadata>)(e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        assertEquals("Five RecordMetadata is expected.",recordMetaData2.size(),5);
+        for (RecordMetadata recordMeta : recordMetaData2) {
+            assertTrue("Offset is positive",recordMeta.offset() >= 0);
+            assertTrue("Topic Name start with 'test'",recordMeta.topic().startsWith("test"));
+        }
     }
-    
+
     @Test
     public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
         int messageInTopic = 10;
@@ -175,6 +206,17 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
         boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
 
         assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+
+        List<Exchange> exchangeList= mockEndpoint.getExchanges();
+        assertEquals("Fifteen Exchanges are expected",exchangeList.size(),15);
+        for (Exchange exchange : exchangeList) {
+            List<RecordMetadata> recordMetaData1 =  (List<RecordMetadata>)(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+            assertEquals("One RecordMetadata is expected.",recordMetaData1.size(),1);
+            assertTrue("Offset is positive",recordMetaData1.get(0).offset() >= 0);
+            assertTrue("Topic Name start with 'test'",recordMetaData1.get(0).topic().startsWith("test"));
+
+        }
+
     }
 
     private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn,

http://git-wip-us.apache.org/repos/asf/camel/blob/a48c022c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 1a29c4d..2dcd8f8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.component.kafka;
 
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.camel.AsyncCallback;
@@ -35,6 +37,8 @@ import org.mockito.Matchers;
 import org.mockito.Mockito;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 
 public class KafkaProducerTest {
 
@@ -60,6 +64,7 @@ public class KafkaProducerTest {
         Mockito.when(kp.send(Mockito.any(ProducerRecord.class))).thenReturn(future);
 
         producer.setKafkaProducer(kp);
+        producer.setWorkerPool(Executors.newFixedThreadPool(1));
     }
 
     @Test
@@ -79,6 +84,9 @@ public class KafkaProducerTest {
 
         producer.process(exchange);
         Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
+        assertRecordMetadataExists();
+
+
     }
 
     @Test(expected = Exception.class)
@@ -92,8 +100,11 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
 
         producer.process(exchange);
+
+        assertRecordMetadataExists();
     }
 
+
     @Test
     public void processAsyncSendsMessage() throws Exception {
         endpoint.setTopic("sometopic");
@@ -104,7 +115,12 @@ public class KafkaProducerTest {
 
         producer.process(exchange, callback);
 
-        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+        ArgumentCaptor<Callback> callBackCaptor = ArgumentCaptor.forClass(Callback.class);
+        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), callBackCaptor.capture());
+        Callback kafkaCallback = callBackCaptor.getValue();
+        kafkaCallback.onCompletion(new RecordMetadata(null, 1, 1),null);
+        assertRecordMetadataExists();
+
     }
 
     @Test
@@ -122,9 +138,13 @@ public class KafkaProducerTest {
 
         producer.process(exchange, callback);
 
-        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+        ArgumentCaptor<Callback> callBackCaptor = ArgumentCaptor.forClass(Callback.class);
+        Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), callBackCaptor.capture());
         Mockito.verify(exchange).setException(Matchers.isA(ApiException.class));
         Mockito.verify(callback).done(Matchers.eq(true));
+        Callback kafkaCallback = callBackCaptor.getValue();
+        kafkaCallback.onCompletion(new RecordMetadata(null, 1, 1),null);
+        assertRecordMetadataExists();
     }
 
     @Test
@@ -137,6 +157,7 @@ public class KafkaProducerTest {
         producer.process(exchange);
 
         verifySendMessage("anotherTopic");
+        assertRecordMetadataExists();
     }
 
     @Test
@@ -152,6 +173,7 @@ public class KafkaProducerTest {
         producer.process(exchange);
 
         verifySendMessage("4", "anotherTopic", "someKey");
+        assertRecordMetadataExists();
     }
 
     @Test(expected = CamelException.class)
@@ -159,7 +181,10 @@ public class KafkaProducerTest {
         endpoint.setTopic(null);
         Mockito.when(exchange.getIn()).thenReturn(in);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
         producer.process(exchange);
+
+        assertRecordMetadataExists();
     }
 
     @Test
@@ -169,6 +194,8 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getOut()).thenReturn(out);
 
         producer.process(exchange);
+
+        assertRecordMetadataExists();
     }
 
     @Test
@@ -178,8 +205,12 @@ public class KafkaProducerTest {
         Mockito.when(exchange.getOut()).thenReturn(out);
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
         in.setHeader(KafkaConstants.KEY, "someKey");
+
         producer.process(exchange);
+
         verifySendMessage("4", "someTopic", "someKey");
+        assertRecordMetadataExists();
+
     }
 
     @Test
@@ -192,6 +223,8 @@ public class KafkaProducerTest {
         producer.process(exchange);
 
         verifySendMessage("someTopic", "someKey");
+        assertRecordMetadataExists();
+
     }
 
     @Test
@@ -203,9 +236,12 @@ public class KafkaProducerTest {
         in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
         in.setHeader(KafkaConstants.KEY, "someKey");
         in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
         producer.process(exchange);
 
         verifySendMessage("4", "someTopic", "someKey");
+        assertRecordMetadataExists();
+
     }
 
     @Test // Message and Topic Name alone
@@ -217,6 +253,8 @@ public class KafkaProducerTest {
         producer.process(exchange);
 
         verifySendMessage("someTopic");
+        assertRecordMetadataExists();
+
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -243,4 +281,10 @@ public class KafkaProducerTest {
         assertEquals(topic, captor.getValue().topic());
     }
 
+    private void assertRecordMetadataExists() {
+        List<RecordMetadata> recordMetaData1 =  (List<RecordMetadata>)(out.getHeader(KafkaConstants.KAFKA_RECORDMETA));
+        assertTrue(recordMetaData1 != null);
+        assertEquals("Expected one recordMetaData",recordMetaData1.size(),1);
+        assertTrue(recordMetaData1.get(0) !=null);
+    }
 }