You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/03/03 19:36:29 UTC

[06/14] camel git commit: CAMEL-10927 Added complete KafkaIdempotentRepository with supporting tests.

CAMEL-10927 Added complete KafkaIdempotentRepository with supporting tests.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/726890f6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/726890f6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/726890f6

Branch: refs/heads/master
Commit: 726890f60f5b7a74d66061aa673dd37faa60930f
Parents: 4b7f0c7
Author: Jakub Korab <ja...@gmail.com>
Authored: Thu Mar 2 15:14:07 2017 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Fri Mar 3 20:03:08 2017 +0100

----------------------------------------------------------------------
 .../kafka/KafkaIdempotentRepository.java        | 278 +++++++++++++++++--
 .../kafka/embedded/EmbeddedKafkaBroker.java     |   3 +-
 .../KafkaIdempotentRepositoryEagerTest.java     | 105 +++++++
 .../KafkaIdempotentRepositoryLazyTest.java      |  95 +++++++
 .../kafka/KafkaIdempotentRepositoryTest.java    |  45 ---
 .../src/test/resources/log4j2.properties        |   9 +-
 6 files changed, 456 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
index 960202c..8a1b8e1 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepository.java
@@ -1,94 +1,312 @@
 package org.apache.camel.processor.idempotent.kafka;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.api.management.ManagedOperation;
 import org.apache.camel.api.management.ManagedResource;
-import org.apache.camel.processor.idempotent.FileIdempotentRepository;
+import org.apache.camel.spi.ExecutorServiceManager;
 import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.LRUCache;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.StringHelper;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
+import java.util.Collections;
 import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
- * A Kafka topic based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
+ * A Kafka topic-based implementation of {@link org.apache.camel.spi.IdempotentRepository}.
  *
+ * Uses a local cache of previously seen Message IDs. All mutations of the cache are via a Kafka topic, on which
+ * additions and removals are broadcast. The topic used must be unique per repository instance. This class makes no
+ * assumptions about number of partitions (it is designed to consume from all at the same time), or replication factor.
+ * Each repository instance that uses the topic (e.g. on different machines running in parallel) controls its own
+ * consumer group.
+ *
+ * On startup, the instance subscribes to the topic and rewinds the offset to the beginning, rebuilding the cache to the
+ * latest state.
+ *
+ * To use, this repository must be placed in the Camel registry, either manually or by registration as a bean in
+ * Spring/Blueprint, as it is CamelContext aware.
  *
  * @author jkorab
  */
 @ManagedResource(description = "Kafka IdempotentRepository")
-public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository<String> {
+public class KafkaIdempotentRepository extends ServiceSupport implements IdempotentRepository<String>, CamelContextAware {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaIdempotentRepository.class);
+    private final Map<String, Object> cache = Collections.synchronizedMap(new LRUCache<>(1000));
+
+    private final AtomicLong duplicateCount = new AtomicLong(0);
+    private final String topic;
+    private final Properties producerConfig;
+    private final Properties consumerConfig;
 
-    private static final Logger LOG = LoggerFactory.getLogger(FileIdempotentRepository.class);
-    private Map<String, Object> cache;
+    private Consumer<String, String> consumer;
+    private Producer<String, String> producer;
+    private TopicPoller topicPoller;
 
-    public KafkaIdempotentRepository() {
-        // default use a 1st level cache
-        this.cache = new LRUCache<String, Object>(1000);
+    private CamelContext camelContext;
+    private ExecutorService executorService;
+    private CountDownLatch cacheReadyLatch;
+
+    enum CacheAction {
+        add,
+        remove,
+        clear
+    }
+
+    public KafkaIdempotentRepository(String topic, String bootstrapServers) {
+        StringHelper.notEmpty(topic, "topic");
+        StringHelper.notEmpty(bootstrapServers, "bootstrapServers");
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+        this.topic = topic;
+        this.consumerConfig = consumerConfig;
+        this.producerConfig = producerConfig;
+    }
+
+    public KafkaIdempotentRepository(String topic, Properties consumerConfig, Properties producerConfig) {
+        StringHelper.notEmpty(topic, "topic");
+        this.topic = topic;
+        ObjectHelper.notNull(consumerConfig, "consumerConfig");
+        this.consumerConfig = consumerConfig;
+        ObjectHelper.notNull(producerConfig, "producerConfig");
+        this.producerConfig = producerConfig;
     }
 
     @Override
-    public void start() throws Exception {
+    protected void doStart() throws Exception {
+        LOG.info("Context: {}", camelContext);
+
+        // each consumer instance must have control over its own offset, so assign a groupID at random
+        String groupId = UUID.randomUUID().toString();
+        LOG.debug("Creating consumer with {}[{}]", ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE.toString());
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+
+        consumer = new KafkaConsumer<>(consumerConfig);
+
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        // set up the producer to remove all batching on send
+        producerConfig.putIfAbsent(ProducerConfig.ACKS_CONFIG, "1");
+        producerConfig.putIfAbsent(ProducerConfig.BATCH_SIZE_CONFIG, "0");
+        producer = new KafkaProducer<>(producerConfig);
+
+        cacheReadyLatch = new CountDownLatch(1);
+        topicPoller = new TopicPoller(consumer, cacheReadyLatch);
     }
 
     @Override
-    public void stop() throws Exception {
+    public void setCamelContext(CamelContext camelContext) {
+        // doStart() has already been called at this point
+        this.camelContext = camelContext;
+        ExecutorServiceManager executorServiceManager = camelContext.getExecutorServiceManager();
+        executorService = executorServiceManager.newFixedThreadPool(this, "KafkaIdempotentRepository", 1);
+        executorService.submit(topicPoller);
+        LOG.info("Warming up cache");
+        try {
+            if (cacheReadyLatch.await(30, TimeUnit.SECONDS)) {
+                LOG.info("Cache OK");
+            } else {
+                LOG.warn("Timeout waiting for cache warm-up from topic {}. Proceeding anyway. " +
+                        "Duplicate records may not be detected.", topic);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
 
     }
 
     @Override
-    protected void doStart() throws Exception {
-        // per set up the consumer
+    public CamelContext getCamelContext() {
+        return this.camelContext;
     }
 
     @Override
-    protected void doStop() throws Exception {
-        // empty
+    protected void doStop() {
+        // stop the thread
+        topicPoller.setRunning(false);
+        try {
+            if (topicPoller.getShutdownLatch().await(30, TimeUnit.SECONDS)) {
+                LOG.info("Expired waiting on topicPoller to shut down");
+            }
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted waiting on latch: {}", e.getMessage());
+        }
+        executorService.shutdown();
+
+        try {
+            consumer.close();
+        } finally {
+            producer.close();
+        }
     }
 
     @Override
     public boolean add(String key) {
-        synchronized (cache) {
-            if (cache.containsKey(key)) {
-                return false;
-            } else {
-                cache.put(key, key);
-                broadcastInsert(key);
-
-                return true;
-            }
+        if (cache.containsKey(key)) {
+            duplicateCount.incrementAndGet();
+            return false;
+        } else {
+            broadcastAction(key, CacheAction.add);
+            return true;
         }
     }
 
-    private void broadcastInsert(String key) {
-        // TODO implement
+    private void broadcastAction(String key, CacheAction action) {
+        try {
+            LOG.debug("Broadcasting action:{} for key:{}", action, key);
+            producer.send(new ProducerRecord<>(topic, key, action.toString())).get(); // sync send
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     @Override
     @ManagedOperation(description = "Does the store contain the given key")
     public boolean contains(String key) {
+        LOG.debug("Checking cache for key:{}", key);
         synchronized (cache) {
-            return cache.containsKey(key);
+            boolean containsKey = cache.containsKey(key);
+            if (containsKey) {
+                duplicateCount.incrementAndGet();
+            }
+            return containsKey;
         }
     }
 
     @Override
     @ManagedOperation(description = "Remove the key from the store")
     public boolean remove(String key) {
-        return false;
+        broadcastAction(key, CacheAction.remove);
+        return true;
     }
 
     @Override
     public boolean confirm(String key) {
-        return false;
+        return true; // no-op
     }
 
     @Override
     public void clear() {
-        // TODO mark all keys on the topic as having been consumed
+        broadcastAction(null, CacheAction.clear);
     }
 
+    @ManagedOperation(description = "Number of times duplicate messages have been detected")
+    public long getDuplicateCount() {
+        return duplicateCount.get();
+    }
+
+    @ManagedOperation(description = "Number of times duplicate messages have been detected")
+    public boolean isPollerRunning() {
+        return topicPoller.getRunning();
+    }
+
+    private class TopicPoller implements Runnable {
+
+        public static final int POLL_DURATION_MS = 10;
+
+        private final Logger LOG = LoggerFactory.getLogger(this.getClass());
+        private final Consumer<String, String> consumer;
+        private final CountDownLatch cacheReadyLatch;
+
+        private CountDownLatch shutdownLatch = new CountDownLatch(1);
+        private AtomicBoolean running = new AtomicBoolean(true);
+
+        TopicPoller(Consumer<String, String> consumer, CountDownLatch cacheReadyLatch) {
+            this.consumer = consumer;
+            this.cacheReadyLatch = cacheReadyLatch;
+        }
+
+        @Override
+        public void run() {
+            LOG.debug("Subscribing consumer to {}", topic);
+            consumer.subscribe(Collections.singleton(topic));
+            LOG.debug("Seeking to beginning");
+            consumer.seekToBeginning(consumer.assignment());
+
+            POLL_LOOP: while (running.get()) {
+                LOG.trace("Polling");
+                ConsumerRecords<String, String> consumerRecords = consumer.poll(POLL_DURATION_MS);
+                if (consumerRecords.isEmpty()) {
+                    // the first time this happens, we can assume that we have consumed all
+                    // messages up to this point
+                    LOG.trace("0 messages fetched on poll");
+                    if (cacheReadyLatch.getCount() > 0) {
+                        LOG.debug("Cache warmed up");
+                        cacheReadyLatch.countDown();
+                    }
+                }
+                for (ConsumerRecord<String, String> consumerRecord: consumerRecords) {
+                    CacheAction action;
+                    try {
+                        action = CacheAction.valueOf(consumerRecord.value());
+                    } catch (IllegalArgumentException iax) {
+                        LOG.error("Unexpected action value:\"{}\" received on [topic:{}, partition:{}, offset:{}]. Shutting down.",
+                                consumerRecord.key(), consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
+                        setRunning(false);
+                        continue POLL_LOOP;
+                    }
+                    String messageId = consumerRecord.key();
+                    if (action == CacheAction.add) {
+                        LOG.debug("Adding to cache messageId:{}", messageId);
+                        cache.put(messageId, messageId);
+                    } else if (action == CacheAction.remove) {
+                        LOG.debug("Removing from cache messageId:{}", messageId);
+                        cache.remove(messageId);
+                    } else if (action == CacheAction.clear) {
+                        cache.clear();
+                    } else {
+                        // this should never happen
+                        LOG.error("No idea how to {} a record. Shutting down.", action);
+                        setRunning(false);
+                        continue POLL_LOOP;
+                    }
+                }
+
+            }
+            LOG.debug("TopicPoller finished - triggering shutdown latch");
+            shutdownLatch.countDown();
+        }
+
+        public CountDownLatch getShutdownLatch() {
+            return shutdownLatch;
+        }
+
+        public void setRunning(boolean running) {
+            this.running.set(running);
+        }
+
+        public boolean getRunning() {
+            return running.get();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
index 8ab50ac..3254bc7 100644
--- a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/embedded/EmbeddedKafkaBroker.java
@@ -61,6 +61,7 @@ public class EmbeddedKafkaBroker extends ExternalResource {
         this.zkConnection = zkConnection;
         this.baseProperties = baseProperties;
 
+        log.info("Starting broker[{}] on port {}", brokerId, port);
         this.brokerList = "localhost:" + this.port;
     }
 
@@ -69,12 +70,10 @@ public class EmbeddedKafkaBroker extends ExternalResource {
     }
 
     public void createTopic(String topic, int partitionCount) {
-        log.info("createTopic");
         AdminUtils.createTopic(getZkUtils(), topic, partitionCount, 1, new Properties(), RackAwareMode.Enforced$.MODULE$);
     }
 
     public void before() {
-        log.info("before");
         logDir = constructTempDir(perTest("kafka-log"));
 
         Properties properties = new Properties();

http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
new file mode 100644
index 0000000..9fc7122
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryEagerTest.java
@@ -0,0 +1,105 @@
+package org.apache.camel.processor.idempotent.kafka;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
+import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class KafkaIdempotentRepositoryEagerTest extends CamelTestSupport {
+
+    @Rule
+    public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
+
+    @Rule
+    public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
+
+    private KafkaIdempotentRepository kafkaIdempotentRepository;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+
+        kafkaIdempotentRepository = new KafkaIdempotentRepository("TEST_IDEM", kafkaBroker.getBrokerList());
+        registry.put("kafkaIdempotentRepository", kafkaIdempotentRepository);
+
+        return new DefaultCamelContext(registry);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:in")
+                    .to("mock:before")
+                    .idempotentConsumer(header("id")).messageIdRepositoryRef("kafkaIdempotentRepository")
+                        .to("mock:out")
+                    .end();
+            }
+        };
+    }
+
+    @EndpointInject(uri = "mock:out")
+    MockEndpoint mockOut;
+
+    @EndpointInject(uri = "mock:before")
+    MockEndpoint mockBefore;
+
+    @Test
+    public void testRemovesDuplicates() throws InterruptedException {
+        for (int i = 0; i < 10; i++) {
+            template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+        }
+
+        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+
+        assertEquals(5, mockOut.getReceivedCounter());
+        assertEquals(10, mockBefore.getReceivedCounter());
+    }
+
+    @Test
+    public void testRollsBackOnException() throws InterruptedException {
+        mockOut.whenAnyExchangeReceived((exchange -> {
+            int id = exchange.getIn().getHeader("id", Integer.class);
+            if (id == 0) {
+                throw new IllegalArgumentException("Boom!");
+            }
+        }));
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+            } catch (CamelExecutionException cex) {
+                // no-op; expected
+            }
+        }
+
+        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // id{0} is not a duplicate
+
+        assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through the idempotency check twice
+        assertEquals(10, mockBefore.getReceivedCounter());
+    }
+
+    @Test
+    public void testClear() throws InterruptedException {
+        mockOut.setExpectedMessageCount(2);
+
+        template.sendBodyAndHeader("direct:in", "Test message", "id", 0);
+        kafkaIdempotentRepository.clear();
+        template.sendBodyAndHeader("direct:in", "Test message", "id", 0);
+
+        assertMockEndpointsSatisfied();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java
new file mode 100644
index 0000000..5c80b38
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryLazyTest.java
@@ -0,0 +1,95 @@
+package org.apache.camel.processor.idempotent.kafka;
+
+import org.apache.camel.*;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
+import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Rule;
+import org.junit.Test;
+
+/**
+ * @author jkorab
+ */
+public class KafkaIdempotentRepositoryLazyTest extends CamelTestSupport {
+
+    @Rule
+    public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
+
+    @Rule
+    public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
+
+    private KafkaIdempotentRepository kafkaIdempotentRepository;
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        SimpleRegistry registry = new SimpleRegistry();
+
+        kafkaIdempotentRepository = new KafkaIdempotentRepository("TEST_IDEM", kafkaBroker.getBrokerList());
+        registry.put("kafkaIdempotentRepository", kafkaIdempotentRepository);
+
+        return new DefaultCamelContext(registry);
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:in")
+                    .to("mock:before")
+                    .idempotentConsumer(header("id"))
+                            .messageIdRepositoryRef("kafkaIdempotentRepository")
+                            .eager(false)
+                        .to("mock:out")
+                    .end();
+            }
+        };
+    }
+
+    @EndpointInject(uri = "mock:out")
+    MockEndpoint mockOut;
+
+    @EndpointInject(uri = "mock:before")
+    MockEndpoint mockBefore;
+
+    @Test
+    public void testRemovesDuplicates() throws InterruptedException {
+        for (int i = 0; i < 10; i++) {
+            template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+        }
+
+        assertEquals(5, kafkaIdempotentRepository.getDuplicateCount());
+
+        assertEquals(5, mockOut.getReceivedCounter());
+        assertEquals(10, mockBefore.getReceivedCounter());
+    }
+
+    @Test
+    public void testRollsBackOnException() throws InterruptedException {
+        mockOut.whenAnyExchangeReceived((exchange -> {
+            int id = exchange.getIn().getHeader("id", Integer.class);
+            if (id == 0) {
+                throw new IllegalArgumentException("Boom!");
+            }
+        }));
+
+        for (int i = 0; i < 10; i++) {
+            try {
+                template.sendBodyAndHeader("direct:in", "Test message", "id", i % 5);
+            } catch (CamelExecutionException cex) {
+                // no-op; expected
+            }
+        }
+
+        assertEquals(4, kafkaIdempotentRepository.getDuplicateCount()); // id{0} is not a duplicate
+
+        assertEquals(6, mockOut.getReceivedCounter()); // id{0} goes through the idempotency check twice
+        assertEquals(10, mockBefore.getReceivedCounter());
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java b/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
deleted file mode 100644
index 90b89a0..0000000
--- a/components/camel-kafka/src/test/java/org/apache/camel/processor/idempotent/kafka/KafkaIdempotentRepositoryTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.camel.processor.idempotent.kafka;
-
-import org.apache.camel.RoutesBuilder;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.embedded.EmbeddedKafkaBroker;
-import org.apache.camel.component.kafka.embedded.EmbeddedZookeeper;
-import org.apache.camel.test.AvailablePortFinder;
-import org.apache.camel.test.junit4.CamelTestSupport;
-import org.junit.Rule;
-import org.junit.Test;
-
-import java.util.Properties;
-
-import static org.junit.Assert.*;
-
-/**
- * @author jkorab
- */
-public class KafkaIdempotentRepositoryTest extends CamelTestSupport {
-
-    @Rule
-    public EmbeddedZookeeper zookeeper = new EmbeddedZookeeper();
-
-    @Rule
-    public EmbeddedKafkaBroker kafkaBroker = new EmbeddedKafkaBroker(0, zookeeper.getConnection());
-
-    @Override
-    protected RoutesBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            @Override
-            public void configure() throws Exception {
-                KafkaIdempotentRepository kafkaIdempotentRepository = new KafkaIdempotentRepository();
-
-                from("direct:in")
-                    .idempotentConsumer(header("id"), kafkaIdempotentRepository)
-                    .to("mock:out");
-            }
-        };
-    }
-
-    @Test
-    public void testRemovesDuplicate() {
-        
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/726890f6/components/camel-kafka/src/test/resources/log4j2.properties
----------------------------------------------------------------------
diff --git a/components/camel-kafka/src/test/resources/log4j2.properties b/components/camel-kafka/src/test/resources/log4j2.properties
index b0f366d..07bcc08 100644
--- a/components/camel-kafka/src/test/resources/log4j2.properties
+++ b/components/camel-kafka/src/test/resources/log4j2.properties
@@ -28,7 +28,12 @@ appender.stdout.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 rootLogger.level = WARN
 rootLogger.appenderRef.out.ref = stdout
 
-logger.stdout.name=org.apache.camel.component.kafka
-logger.stdout.level=INFO
+logger.camel.name=org.apache.camel
+logger.camel.level=INFO
 
+logger.camelKafka.name=org.apache.camel.component.kafka
+logger.camelKafka.level=INFO
+
+logger.idem.name=org.apache.camel.processor.idempotent
+logger.idem.level=INFO