You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/03 19:36:29 UTC
[06/14] camel git commit: CAMEL-10927 Added complete
KafkaIdempotentRepository with supporting tests.
CAMEL-10927 Added complete KafkaIdempotentRepository with supporting tests.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/726890f6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/726890f6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/726890f6
Branch: refs/heads/master
Commit: 726890f60f5b7a74d66061aa673dd37faa60930f
Parents: 4b7f0c7
Author: Jakub Korab <ja...@gmail.com>
Authored: Thu Mar 2 15:14:07 2017 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Mar 3 20:03:08 2017 +0100
----------------------------------------------------------------------
.../kafka/KafkaIdempotentRepository.java | 278 +++++++++++++++++--
.../kafka/embedded/EmbeddedKafkaBroker.java | 3 +-
.../KafkaIdempotentRepositoryEagerTest.java | 105 +++++++
.../KafkaIdempotentRepositoryLazyTest.java | 95 +++++++
.../kafka/KafkaIdempotentRepositoryTest.java | 45 ---
.../src/test/resources/log4j2.properties | 9 +-
6 files changed, 456 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 960202c..8a1b8e1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -1,94 +1,312 @@
package org.apache.camel.processor.idempotent.kafka;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.api.management.ManagedOperation;
import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.processor.idempotent.FileIdempotentRepository;
+import org.apache.camel.spi.ExecutorServiceManager;
import org.apache.camel.spi.IdempotentRepository;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
+import java.util.Collections;
import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
/**
- * A Kafka topic based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
+ * A Kafka topic-based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
*
+ * Uses a local cache of previously seen Message IDs. All mutations of the cache are via a Kafka topic, on which
+ * additions and removals are broadcast. The topic used must be unique per repository instance. This class makes no
+ * assumptions about number of partitions (it is designed to consume from all at the same time), or replication factor.
+ * Each repository instance that uses the topic (e.g. on different machines running in parallel) controls its own
+ * consumer group.
+ *
+ * On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the
+ * latest state.
+ *
+ * To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in
+ * Spring/Blueprint, as it is CamelContext aware.
*
* @author jkorab
*/
@ManagedResource(description = "Kafka IdempotentRepository")
-public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
+public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository<String>, CamelContextAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaIdempotentRepository.class);
+ private final Map<String, Object> cache = Collections.synchronizedMap(new LRUCache<>(1000));
+
+ private final AtomicLong duplicateCount = new AtomicLong(0);
+ private final String topic;
+ private final Properties producerConfig;
+ private final Properties consumerConfig;
- private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
- private Map<String, Object> cache;
+ private Consumer<String, String> consumer;
+ private Producer<String, String> producer;
+ private TopicPoller topicPoller;
- public KafkaIdempotentRepository() {
- // default use a 1st level cache
- this.cache = new LRUCache<String, Object>(1000);
+ private CamelContext camelContext;
+ private ExecutorService executorService;
+ private CountDownLatch cacheReadyLatch;
+
+ enum CacheAction {
+ add,
+ remove,
+ clear
+ }
+
+ public KafkaIdempotentRepository(String topic, String bootstrapServers) {
+ StringHelper.notEmpty(topic, "topic");
+ StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
+ Properties consumerConfig = new Properties();
+ consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+ Properties producerConfig = new Properties();
+ producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+ this.topic = topic;
+ this.consumerConfig = consumerConfig;
+ this.producerConfig = producerConfig;
+ }
+
+ public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig) {
+ StringHelper.notEmpty(topic, "topic");
+ this.topic = topic;
+ ObjectHelper.notNull(consumerConfig, "consumerConfig");
+ this.consumerConfig = consumerConfig;
+ ObjectHelper.notNull(producerConfig, "producerConfig");
+ this.producerConfig = producerConfig;
}
@Override
- public void start() throws Exception {
+ protected void doStart() throws Exception {
+ LOG.info("Context: {}", camelContext);
+
+ // each consumer instance must have control over its own offset, so assign a groupID at random
+ String groupId = UUID.randomUUID().toString();
+ LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+ consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
+ consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+ consumer = new KafkaConsumer<>(consumerConfig);
+
+ producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ // set up the producer to remove all batching on send
+ producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
+ producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
+ producer = new KafkaProducer<>(producerConfig);
+
+ cacheReadyLatch = new CountDownLatch(1);
+ topicPoller = new TopicPoller(consumer, cacheReadyLatch);
}
@Override
- public void stop() throws Exception {
+ public void setCamelContext(CamelContext camelContext) {
+ // doStart() has already been called at this point
+ this.camelContext = camelContext;
+ ExecutorServiceManager executorServiceManager = camelContext.getExecutorServiceManager();
+ executorService = executorServiceManager.newFixedThreadPool(this, "KafkaIdempotentRepository", 1);
+ executorService.submit(topicPoller);
+ LOG.info("Warming up cache");
+ try {
+ if (cacheReadyLatch.await(30, TimeUnit.SECONDS)) {
+ LOG.info("Cache OK");
+ } else {
+ LOG.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway. " +
+ "Duplicate records may not be detected.", topic);
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
}
@Override
- protected void doStart() throws Exception {
- // per set up the consumer
+ public CamelContext getCamelContext() {
+ return this.camelContext;
}
@Override
- protected void doStop() throws Exception {
- // empty
+ protected void doStop() {
+ // stop the thread
+ topicPoller.setRunning(false);
+ try {
+ if (topicPoller.getShutdownLatch().await(30, TimeUnit.SECONDS)) {
+ LOG.info("Expired waiting on topicPoller to shut down");
+ }
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted waiting on latch: {}", e.getMessage());
+ }
+ executorService.shutdown();
+
+ try {
+ consumer.close();
+ } finally {
+ producer.close();
+ }
}
@Override
public boolean add(String key) {
- synchronized (cache) {
- if (cache.containsKey(key)) {
- return false;
- } else {
- cache.put(key, key);
- broadcastInsert(key);
-
- return true;
- }
+ if (cache.containsKey(key)) {
+ duplicateCount.incrementAndGet();
+ return false;
+ } else {
+ broadcastAction(key, CacheAction.add);
+ return true;
}
}
- private void broadcastInsert(String key) {
- // TODO implement
+ private void broadcastAction(String key, CacheAction action) {
+ try {
+ LOG.debug("Broadcasting action:{} for key:{}", action, key);
+ producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync send
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
@Override
@ManagedOperation(description = "Does the store contain the given key")
public boolean contains(String key) {
+ LOG.debug("Checking cache for key:{}", key);
synchronized (cache) {
- return cache.containsKey(key);
+ boolean containsKey = cache.containsKey(key);
+ if (containsKey) {
+ duplicateCount.incrementAndGet();
+ }
+ return containsKey;
}
}
@Override
@ManagedOperation(description = "Remove the key from the store")
public boolean remove(String key) {
- return false;
+ broadcastAction(key, CacheAction.remove);
+ return true;
}
@Override
public boolean confirm(String key) {
- return false;
+ return true; // no-op
}
@Override
public void clear() {
- // TODO mark all keys on the topic as having been consumed
+ broadcastAction(null, CacheAction.clear);
}
+ @ManagedOperation(description = "Number of times duplicate messages have been detected")
+ public long getDuplicateCount() {
+ return duplicateCount.get();
+ }
+
+ @ManagedOperation(description = "Number of times duplicate messages have been detected")
+ public boolean isPollerRunning() {
+ return topicPoller.getRunning();
+ }
+
+ private class TopicPoller implements Runnable {
+
+ public static final int POLL_DURATION_MS = 10;
+
+ private final Logger LOG = LoggerFactory.getLogger(this.getClass());
+ private final Consumer<String, String> consumer;
+ private final CountDownLatch cacheReadyLatch;
+
+ private CountDownLatch shutdownLatch = new CountDownLatch(1);
+ private AtomicBoolean running = new AtomicBoolean(true);
+
+ TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch) {
+ this.consumer = consumer;
+ this.cacheReadyLatch = cacheReadyLatch;
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("Subscribing consumer to {}", topic);
+ consumer.subscribe(Collections.singleton(topic));
+ LOG.debug("Seeking to beginning");
+ consumer.seekToBeginning(consumer.assignment());
+
+ POLL_LOOP: while (running.get()) {
+ LOG.trace("Polling");
+ ConsumerRecords<String, String> consumerRecords = consumer.poll(POLL_DURATION_MS);
+ if (consumerRecords.isEmpty()) {
+ // the first time this happens, we can assume that we have consumed all
+ // messages up to this point
+ LOG.trace("0 messages fetched on poll");
+ if (cacheReadyLatch.getCount() > 0) {
+ LOG.debug("Cache warmed up");
+ cacheReadyLatch.countDown();
+ }
+ }
+ for (ConsumerRecord<String, String> consumerRecord: consumerRecords) {
+ CacheAction action;
+ try {
+ action = CacheAction.valueOf(consumerRecord.value());
+ } catch (IllegalArgumentException iax) {
+ LOG.error("Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.",
+ consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
+ setRunning(false);
+ continue POLL_LOOP;
+ }
+ String messageId = consumerRecord.key();
+ if (action == CacheAction.add) {
+ LOG.debug("Adding to cache messageId:{}", messageId);
+ cache.put(messageId, messageId);
+ } else if (action == CacheAction.remove) {
+ LOG.debug("Removing from cache messageId:{}", messageId);
+ cache.remove(messageId);
+ } else if (action == CacheAction.clear) {
+ cache.clear();
+ } else {
+ // this should never happen
+ LOG.error("No idea how to {} a record. Shutting down.", action);
+ setRunning(false);
+ continue POLL_LOOP;
+ }
+ }
+
+ }
+ LOG.debug("TopicPoller finished - triggering shutdown latch");
+ shutdownLatch.countDown();
+ }
+
+ public CountDownLatch getShutdownLatch() {
+ return shutdownLatch;
+ }
+
+ public void setRunning(boolean running) {
+ this.running.set(running);
+ }
+
+ public boolean getRunning() {
+ return running.get();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
index 8ab50ac..3254bc7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
@@ -61,6 +61,7 @@ public class EmbeddedKafkaBroker extends ExternalResource {
this.zkConnection = zkConnection;
this.baseProperties = baseProperties;
+ log.info("Starting broker[{}] on port {}", brokerId, port);
this.brokerList = "localhost:" + this.port;
}
@@ -69,12 +70,10 @@ public class EmbeddedKafkaBroker extends ExternalResource {
}
public void createTopic(String topic, int partitionCount) {
- log.info("createTopic");
AdminUtils.createTopic(getZkUtils(), topic, partitionCount, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
}
public void before() {
- log.info("before");
logDir = constructTempDir(perTest("kafka-log"));
Properties properties = new Properties();
http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
new file mode 100644
index 0000000..9fc7122
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
@@ -0,0 +1,105 @@
+package org.apache.camel.processor.idempotent.kafka;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
+import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class KafkaIdempotentRepositoryEagerTest extends CamelTestSupport {
+
+ @Rule
+ public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
+
+ @Rule
+ public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
+
+ private KafkaIdempotentRepository kafkaIdempotentRepository;
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+
+ kafkaIdempotentRepository = new KafkaIdempotentRepository("TEST_IDEM", kafkaBroker.getBrokerList());
+ registry.put("kafkaIdempotentRepository", kafkaIdempotentRepository);
+
+ return new DefaultCamelContext(registry);
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:in")
+ .to("mock:before")
+ .idempotentConsumer(header("id")).messageIdRepositoryRef("kafkaIdempotentRepository")
+ .to("mock:out")
+ .end();
+ }
+ };
+ }
+
+ @EndpointInject(uri = "mock:out")
+ MockEndpoint mockOut;
+
+ @EndpointInject(uri = "mock:before")
+ MockEndpoint mockBefore;
+
+ @Test
+ public void testRemovesDuplicates() throws InterruptedException {
+ for (int i = 0; i < 10; i++) {
+ template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+ }
+
+ assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+
+ assertEquals(5, mockOut.getReceivedCounter());
+ assertEquals(10, mockBefore.getReceivedCounter());
+ }
+
+ @Test
+ public void testRollsBackOnException() throws InterruptedException {
+ mockOut.whenAnyExchangeReceived((exchange -> {
+ int id = exchange.getIn().getHeader("id", Integer.class);
+ if (id == 0) {
+ throw new IllegalArgumentException("Boom!");
+ }
+ }));
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+ } catch (CamelExecutionException cex) {
+ // no-op; expected
+ }
+ }
+
+ assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // id{0} is not a duplicate
+
+ assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through the idempotency check twice
+ assertEquals(10, mockBefore.getReceivedCounter());
+ }
+
+ @Test
+ public void testClear() throws InterruptedException {
+ mockOut.setExpectedMessageCount(2);
+
+ template.sendBodyAndHeader("direct:in", "Test message", "id", 0);
+ kafkaIdempotentRepository.clear();
+ template.sendBodyAndHeader("direct:in", "Test message", "id", 0);
+
+ assertMockEndpointsSatisfied();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java
new file mode 100644
index 0000000..5c80b38
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java
@@ -0,0 +1,95 @@
+package org.apache.camel.processor.idempotent.kafka;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
+import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class KafkaIdempotentRepositoryLazyTest extends CamelTestSupport {
+
+ @Rule
+ public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
+
+ @Rule
+ public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
+
+ private KafkaIdempotentRepository kafkaIdempotentRepository;
+
+ @Override
+ protected CamelContext createCamelContext() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+
+ kafkaIdempotentRepository = new KafkaIdempotentRepository("TEST_IDEM", kafkaBroker.getBrokerList());
+ registry.put("kafkaIdempotentRepository", kafkaIdempotentRepository);
+
+ return new DefaultCamelContext(registry);
+ }
+
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:in")
+ .to("mock:before")
+ .idempotentConsumer(header("id"))
+ .messageIdRepositoryRef("kafkaIdempotentRepository")
+ .eager(false)
+ .to("mock:out")
+ .end();
+ }
+ };
+ }
+
+ @EndpointInject(uri = "mock:out")
+ MockEndpoint mockOut;
+
+ @EndpointInject(uri = "mock:before")
+ MockEndpoint mockBefore;
+
+ @Test
+ public void testRemovesDuplicates() throws InterruptedException {
+ for (int i = 0; i < 10; i++) {
+ template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+ }
+
+ assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+
+ assertEquals(5, mockOut.getReceivedCounter());
+ assertEquals(10, mockBefore.getReceivedCounter());
+ }
+
+ @Test
+ public void testRollsBackOnException() throws InterruptedException {
+ mockOut.whenAnyExchangeReceived((exchange -> {
+ int id = exchange.getIn().getHeader("id", Integer.class);
+ if (id == 0) {
+ throw new IllegalArgumentException("Boom!");
+ }
+ }));
+
+ for (int i = 0; i < 10; i++) {
+ try {
+ template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+ } catch (CamelExecutionException cex) {
+ // no-op; expected
+ }
+ }
+
+ assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // id{0} is not a duplicate
+
+ assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through the idempotency check twice
+ assertEquals(10, mockBefore.getReceivedCounter());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
deleted file mode 100644
index 90b89a0..0000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.camel.processor.idempotent.kafka;
-
-import org.apache.camel.RoutesBuilder;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
-import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
-import org.apache.camel.test.AvailablePortFinder;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-
-/**
- * @author jkorab
- */
-public class KafkaIdempotentRepositoryTest extends CamelTestSupport {
-
- @Rule
- public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
-
- @Rule
- public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
-
- @Override
- protected RoutesBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
- @Override
- public void configure() throws Exception {
- KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository();
-
- from("direct:in")
- .idempotentConsumer(header("id"), kafkaIdempotentRepository)
- .to("mock:out");
- }
- };
- }
-
- @Test
- public void testRemovesDuplicate() {
-
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties
index b0f366d..07bcc08 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -28,7 +28,12 @@ appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
rootLogger.level = WARN
rootLogger.appenderRef.out.ref = stdout
-logger.stdout.name=org.apache.camel.component.kafka
-logger.stdout.level=INFO
+logger.camel.name=org.apache.camel
+logger.camel.level=INFO
+logger.camelKafka.name=org.apache.camel.component.kafka
+logger.camelKafka.level=INFO
+
+logger.idem.name=org.apache.camel.processor.idempotent
+logger.idem.level=INFO