You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/07/01 06:56:32 UTC
[2/6] camel git commit: Added Unit Tests.
Added Unit Tests.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a48c022c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a48c022c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a48c022c
Branch: refs/heads/master
Commit: a48c022c3d8424d65a6d23b3edf8854f0f38eaca
Parents: 8919d65
Author: Leo Prince <le...@target.com>
Authored: Tue Jun 28 13:04:23 2016 -0500
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Jul 1 08:45:24 2016 +0200
----------------------------------------------------------------------
.../component/kafka/KafkaProducerFullTest.java | 54 +++++++++++++++++---
.../component/kafka/KafkaProducerTest.java | 48 ++++++++++++++++-
2 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/a48c022c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
index 30f2b13..f1189d5 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerFullTest.java
@@ -28,13 +28,16 @@ import java.util.concurrent.TimeUnit;
import org.apache.camel.Endpoint;
import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -58,7 +61,8 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
@EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_STRINGS
+ "&requestRequiredAcks=-1")
private Endpoint toStrings;
-
+ @EndpointInject(uri = "mock:kafkaAck")
+ private MockEndpoint mockEndpoint;
@EndpointInject(uri = "kafka:localhost:{{karfkaPort}}?topic=" + TOPIC_BYTES + "&requestRequiredAcks=-1"
+ "&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer&"
+ "keySerializerClass=org.apache.kafka.common.serialization.ByteArraySerializer")
@@ -103,9 +107,9 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:startStrings").to(toStrings);
+ from("direct:startStrings").to(toStrings).to(mockEndpoint);
- from("direct:startBytes").to(toBytes);
+ from("direct:startBytes").to(toBytes).to(mockEndpoint);
}
};
}
@@ -125,9 +129,20 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+
+ List<Exchange> exchangeList= mockEndpoint.getExchanges();
+ assertEquals("Fifteen Exchanges are expected",exchangeList.size(),15);
+ for (Exchange exchange : exchangeList) {
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals("One RecordMetadata is expected.",recordMetaData1.size(),1);
+ assertTrue("Offset is positive",recordMetaData1.get(0).offset() >= 0);
+ assertTrue("Topic Name start with 'test'",recordMetaData1.get(0).topic().startsWith("test"));
+
+ }
+
}
-
+
@Test
public void producedStringCollectionMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
@@ -139,7 +154,7 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
for (int x = 0; x < messageInTopic; x++) {
msgs.add("Message " + x);
}
-
+
sendMessagesInRoute(1, stringsTemplate, msgs, KafkaConstants.PARTITION_KEY, "1");
msgs = new ArrayList<String>();
for (int x = 0; x < messageInOtherTopic; x++) {
@@ -152,8 +167,24 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+ List<Exchange> exchangeList= mockEndpoint.getExchanges();
+ assertEquals("Two Exchanges are expected",exchangeList.size(),2);
+ Exchange e1 = exchangeList.get(0);
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)(e1.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals("Ten RecordMetadata is expected.",recordMetaData1.size(),10);
+ for (RecordMetadata recordMeta : recordMetaData1) {
+ assertTrue("Offset is positive",recordMeta.offset() >= 0);
+ assertTrue("Topic Name start with 'test'",recordMeta.topic().startsWith("test"));
+ }
+ Exchange e2 = exchangeList.get(1);
+ List<RecordMetadata> recordMetaData2 = (List<RecordMetadata>)(e2.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals("Five RecordMetadata is expected.",recordMetaData2.size(),5);
+ for (RecordMetadata recordMeta : recordMetaData2) {
+ assertTrue("Offset is positive",recordMeta.offset() >= 0);
+ assertTrue("Topic Name start with 'test'",recordMeta.topic().startsWith("test"));
+ }
}
-
+
@Test
public void producedBytesMessageIsReceivedByKafka() throws InterruptedException, IOException {
int messageInTopic = 10;
@@ -175,6 +206,17 @@ public class KafkaProducerFullTest extends BaseEmbeddedKafkaTest {
boolean allMessagesReceived = messagesLatch.await(200, TimeUnit.MILLISECONDS);
assertTrue("Not all messages were published to the kafka topics. Not received: " + messagesLatch.getCount(), allMessagesReceived);
+
+ List<Exchange> exchangeList= mockEndpoint.getExchanges();
+ assertEquals("Fifteen Exchanges are expected",exchangeList.size(),15);
+ for (Exchange exchange : exchangeList) {
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)(exchange.getIn().getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertEquals("One RecordMetadata is expected.",recordMetaData1.size(),1);
+ assertTrue("Offset is positive",recordMetaData1.get(0).offset() >= 0);
+ assertTrue("Topic Name start with 'test'",recordMetaData1.get(0).topic().startsWith("test"));
+
+ }
+
}
private void createKafkaMessageConsumer(KafkaConsumer<String, String> consumerConn,
http://git-wip-us.apache.org/repos/asf/camel/blob/a48c022c/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
index 1a29c4d..2dcd8f8 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/KafkaProducerTest.java
@@ -16,7 +16,9 @@
*/
package org.apache.camel.component.kafka;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.camel.AsyncCallback;
@@ -35,6 +37,8 @@ import org.mockito.Matchers;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
public class KafkaProducerTest {
@@ -60,6 +64,7 @@ public class KafkaProducerTest {
Mockito.when(kp.send(Mockito.any(ProducerRecord.class))).thenReturn(future);
producer.setKafkaProducer(kp);
+ producer.setWorkerPool(Executors.newFixedThreadPool(1));
}
@Test
@@ -79,6 +84,9 @@ public class KafkaProducerTest {
producer.process(exchange);
Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class));
+ assertRecordMetadataExists();
+
+
}
@Test(expected = Exception.class)
@@ -92,8 +100,11 @@ public class KafkaProducerTest {
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
producer.process(exchange);
+
+ assertRecordMetadataExists();
}
+
@Test
public void processAsyncSendsMessage() throws Exception {
endpoint.setTopic("sometopic");
@@ -104,7 +115,12 @@ public class KafkaProducerTest {
producer.process(exchange, callback);
- Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+ ArgumentCaptor<Callback> callBackCaptor = ArgumentCaptor.forClass(Callback.class);
+ Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), callBackCaptor.capture());
+ Callback kafkaCallback = callBackCaptor.getValue();
+ kafkaCallback.onCompletion(new RecordMetadata(null, 1, 1),null);
+ assertRecordMetadataExists();
+
}
@Test
@@ -122,9 +138,13 @@ public class KafkaProducerTest {
producer.process(exchange, callback);
- Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), Matchers.any(Callback.class));
+ ArgumentCaptor<Callback> callBackCaptor = ArgumentCaptor.forClass(Callback.class);
+ Mockito.verify(producer.getKafkaProducer()).send(Matchers.any(ProducerRecord.class), callBackCaptor.capture());
Mockito.verify(exchange).setException(Matchers.isA(ApiException.class));
Mockito.verify(callback).done(Matchers.eq(true));
+ Callback kafkaCallback = callBackCaptor.getValue();
+ kafkaCallback.onCompletion(new RecordMetadata(null, 1, 1),null);
+ assertRecordMetadataExists();
}
@Test
@@ -137,6 +157,7 @@ public class KafkaProducerTest {
producer.process(exchange);
verifySendMessage("anotherTopic");
+ assertRecordMetadataExists();
}
@Test
@@ -152,6 +173,7 @@ public class KafkaProducerTest {
producer.process(exchange);
verifySendMessage("4", "anotherTopic", "someKey");
+ assertRecordMetadataExists();
}
@Test(expected = CamelException.class)
@@ -159,7 +181,10 @@ public class KafkaProducerTest {
endpoint.setTopic(null);
Mockito.when(exchange.getIn()).thenReturn(in);
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
producer.process(exchange);
+
+ assertRecordMetadataExists();
}
@Test
@@ -169,6 +194,8 @@ public class KafkaProducerTest {
Mockito.when(exchange.getOut()).thenReturn(out);
producer.process(exchange);
+
+ assertRecordMetadataExists();
}
@Test
@@ -178,8 +205,12 @@ public class KafkaProducerTest {
Mockito.when(exchange.getOut()).thenReturn(out);
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
in.setHeader(KafkaConstants.KEY, "someKey");
+
producer.process(exchange);
+
verifySendMessage("4", "someTopic", "someKey");
+ assertRecordMetadataExists();
+
}
@Test
@@ -192,6 +223,8 @@ public class KafkaProducerTest {
producer.process(exchange);
verifySendMessage("someTopic", "someKey");
+ assertRecordMetadataExists();
+
}
@Test
@@ -203,9 +236,12 @@ public class KafkaProducerTest {
in.setHeader(KafkaConstants.TOPIC, "anotherTopic");
in.setHeader(KafkaConstants.KEY, "someKey");
in.setHeader(KafkaConstants.PARTITION_KEY, "4");
+
producer.process(exchange);
verifySendMessage("4", "someTopic", "someKey");
+ assertRecordMetadataExists();
+
}
@Test // Message and Topic Name alone
@@ -217,6 +253,8 @@ public class KafkaProducerTest {
producer.process(exchange);
verifySendMessage("someTopic");
+ assertRecordMetadataExists();
+
}
@SuppressWarnings({"unchecked", "rawtypes"})
@@ -243,4 +281,10 @@ public class KafkaProducerTest {
assertEquals(topic, captor.getValue().topic());
}
+ private void assertRecordMetadataExists() {
+ List<RecordMetadata> recordMetaData1 = (List<RecordMetadata>)(out.getHeader(KafkaConstants.KAFKA_RECORDMETA));
+ assertTrue(recordMetaData1 != null);
+ assertEquals("Expected one recordMetaData",recordMetaData1.size(),1);
+ assertTrue(recordMetaData1.get(0) !=null);
+ }
}