You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/13 19:23:42 UTC

[1/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of Copycat configs.

Repository: kafka
Updated Branches:
  refs/heads/trunk e2ec02e1d -> 36d446932


http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
index 6a3eec3..69d9ab4 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStoreTest.java
@@ -18,23 +18,16 @@
 package org.apache.kafka.copycat.storage;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.MockConsumer;
-import org.apache.kafka.clients.consumer.OffsetResetStrategy;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.Node;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.TestFuture;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -49,70 +42,56 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.apache.kafka.copycat.util.ByteArrayProducerRecordEquals.eqProducerRecord;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(KafkaOffsetBackingStore.class)
 @PowerMockIgnore("javax.management.*")
 public class KafkaOffsetBackingStoreTest {
     private static final String TOPIC = "copycat-offsets";
-    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
-    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
     private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
     static {
         DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
         DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
     }
-    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1));
     private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
     static {
         FIRST_SET.put(buffer("key"), buffer("value"));
         FIRST_SET.put(null, null);
     }
 
-
-    private static final Node LEADER = new Node(1, "broker1", 9092);
-    private static final Node REPLICA = new Node(1, "broker2", 9093);
-
-    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
-    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
-
     private static final ByteBuffer TP0_KEY = buffer("TP0KEY");
     private static final ByteBuffer TP1_KEY = buffer("TP1KEY");
+    private static final ByteBuffer TP2_KEY = buffer("TP2KEY");
     private static final ByteBuffer TP0_VALUE = buffer("VAL0");
     private static final ByteBuffer TP1_VALUE = buffer("VAL1");
+    private static final ByteBuffer TP2_VALUE = buffer("VAL2");
     private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
     private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
 
+    @Mock
+    KafkaBasedLog<byte[], byte[]> storeLog;
     private KafkaOffsetBackingStore store;
 
-    @Mock private KafkaProducer<byte[], byte[]> producer;
-    private MockConsumer<byte[], byte[]> consumer;
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
 
     @Before
     public void setUp() throws Exception {
-        store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createConsumer", "createProducer"});
-        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
-        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
-        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
-        beginningOffsets.put(TP0, 0L);
-        beginningOffsets.put(TP1, 0L);
-        consumer.updateBeginningOffsets(beginningOffsets);
+        store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"});
     }
 
     @Test(expected = CopycatException.class)
@@ -123,119 +102,121 @@ public class KafkaOffsetBackingStoreTest {
 
     @Test
     public void testStartStop() throws Exception {
-        expectStart();
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
         expectStop();
 
         PowerMock.replayAll();
 
         store.configure(DEFAULT_PROPS);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
 
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
         store.start();
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
     @Test
     public void testReloadOnStart() throws Exception {
-        expectStart();
+        expectConfigure();
+        expectStart(Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()),
+                new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()),
+                new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array())
+        ));
         expectStop();
 
         PowerMock.replayAll();
 
         store.configure(DEFAULT_PROPS);
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 1L);
-        endOffsets.put(TP1, 1L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Needs to seek to end to find end offsets
-                consumer.waitForPoll(10000);
-
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
-                    }
-                }, 10000);
-
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
         HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
-        assertEquals(TP0_VALUE, data.get(TP0_KEY));
-        assertEquals(TP1_VALUE, data.get(TP1_KEY));
+        assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
+        assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
     @Test
     public void testGetSet() throws Exception {
-        expectStart();
-        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY.array(), TP0_VALUE.array());
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
+        expectStop();
+
+        // First get() against an empty store
+        final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                firstGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp0Record), EasyMock.capture(callback1))).andReturn(tp0Future);
-        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY.array(), TP1_VALUE.array());
-        Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp1Record), EasyMock.capture(callback2))).andReturn(tp1Future);
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
 
-        expectStop();
+        // Second get() should get the produced data and return the new values
+        final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
+                secondGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
+
+        // Third get() should pick up data produced by someone else and return those values
+        final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
+        storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+                capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+                thirdGetReadToEndCallback.getValue().onCompletion(null, null);
+                return null;
+            }
+        });
 
         PowerMock.replayAll();
 
+
+
         store.configure(DEFAULT_PROPS);
+        store.start();
 
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+        // Getting from empty store should return nulls
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
             @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
+            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
+                // Since we didn't read them yet, these will be null
+                assertEquals(null, result.get(TP0_KEY));
+                assertEquals(null, result.get(TP1_KEY));
+                getInvokedAndPassed.set(true);
             }
-        };
-        startConsumerOpsThread.start();
-        store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        }).get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
 
+        // Set some offsets
         Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
         toSet.put(TP0_KEY, TP0_VALUE);
         toSet.put(TP1_KEY, TP1_VALUE);
@@ -247,208 +228,126 @@ public class KafkaOffsetBackingStoreTest {
             }
         });
         assertFalse(setFuture.isDone());
-        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing
-        assertFalse(setFuture.isDone());
-        tp0Future.resolve((RecordMetadata) null);
         // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
         // for the store's set callback
-        callback2.getValue().onCompletion(null, null);
-        assertFalse(invoked.get());
         callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback0.getValue().onCompletion(null, null);
         setFuture.get(10000, TimeUnit.MILLISECONDS);
         assertTrue(invoked.get());
 
-        // Getting data should continue to return old data...
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
         store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
-                // Since we didn't read them yet, these will be null
-                assertEquals(null, result.get(TP0_KEY));
-                assertEquals(null, result.get(TP1_KEY));
-                getInvokedAndPassed.set(true);
+                assertEquals(TP0_VALUE, result.get(TP0_KEY));
+                assertEquals(TP1_VALUE, result.get(TP1_KEY));
+                secondGetInvokedAndPassed.set(true);
             }
         }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(getInvokedAndPassed.get());
-
-        // Until the consumer gets the new data
-        Thread readNewDataThread = new Thread("read-new-data-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                    }
-                }, 10000);
-
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE_NEW.array()));
-                    }
-                }, 10000);
-            }
-        };
-        readNewDataThread.start();
-        readNewDataThread.join(10000);
-        assertFalse(readNewDataThread.isAlive());
+        assertTrue(secondGetInvokedAndPassed.get());
 
-        // And now the new data should be returned
-        final AtomicBoolean finalGetInvokedAndPassed = new AtomicBoolean(false);
+        // Getting data should read to end of our published data and return it
+        final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false);
         store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
                 assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
                 assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
-                finalGetInvokedAndPassed.set(true);
+                thirdGetInvokedAndPassed.set(true);
             }
         }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(finalGetInvokedAndPassed.get());
+        assertTrue(thirdGetInvokedAndPassed.get());
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
     @Test
-    public void testConsumerError() throws Exception {
-        expectStart();
+    public void testSetFailure() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST);
         expectStop();
 
-        PowerMock.replayAll();
-
-        store.configure(DEFAULT_PROPS);
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 1L);
-        endOffsets.put(TP1, 1L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Trigger exception
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
-                    }
-                }, 10000);
-
-                // Should keep polling until it reaches current log end offset for all partitions
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY.array(), TP0_VALUE_NEW.array()));
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
-        store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
-
-        store.stop();
-
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testProducerError() throws Exception {
-        expectStart();
-        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY.array(), TP0_VALUE.array());
+        // Set offsets
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+        PowerMock.expectLastCall();
         Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp0Record), EasyMock.capture(callback1))).andReturn(tp0Future);
-        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
-        ProducerRecord<byte[], byte[]> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY.array(), TP1_VALUE.array());
+        storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+        PowerMock.expectLastCall();
         Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
-        EasyMock.expect(producer.send(eqProducerRecord(tp1Record), EasyMock.capture(callback2))).andReturn(tp1Future);
-
-        expectStop();
+        storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        store.configure(DEFAULT_PROPS);
 
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        endOffsets.put(TP0, 0L);
-        endOffsets.put(TP1, 0L);
-        consumer.updateEndOffsets(endOffsets);
-        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
-            @Override
-            public void run() {
-                // Should keep polling until it has partition info
-                consumer.waitForPollThen(new Runnable() {
-                    @Override
-                    public void run() {
-                        consumer.seek(TP0, 0);
-                        consumer.seek(TP1, 0);
-                    }
-                }, 10000);
-            }
-        };
-        startConsumerOpsThread.start();
+
+        store.configure(DEFAULT_PROPS);
         store.start();
-        startConsumerOpsThread.join(10000);
-        assertFalse(startConsumerOpsThread.isAlive());
-        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
 
+        // Set some offsets
         Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
         toSet.put(TP0_KEY, TP0_VALUE);
         toSet.put(TP1_KEY, TP1_VALUE);
-        final AtomicReference<Throwable> setException = new AtomicReference<>();
+        toSet.put(TP2_KEY, TP2_VALUE);
+        final AtomicBoolean invoked = new AtomicBoolean(false);
+        final AtomicBoolean invokedFailure = new AtomicBoolean(false);
         Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
             @Override
             public void onCompletion(Throwable error, Void result) {
-                assertNull(setException.get()); // Should only be invoked once
-                setException.set(error);
+                invoked.set(true);
+                if (error != null)
+                    invokedFailure.set(true);
             }
         });
         assertFalse(setFuture.isDone());
-        KafkaException exc = new LeaderNotAvailableException("Error");
-        tp1Future.resolve(exc);
-        callback2.getValue().onCompletion(null, exc);
-        // One failure should resolve the future immediately
+        // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
+        // for the store's set callback
+        callback1.getValue().onCompletion(null, null);
+        assertFalse(invoked.get());
+        callback2.getValue().onCompletion(null, new KafkaException("bogus error"));
+        assertTrue(invoked.get());
+        assertTrue(invokedFailure.get());
+        callback0.getValue().onCompletion(null, null);
         try {
             setFuture.get(10000, TimeUnit.MILLISECONDS);
-            fail("Should have see ExecutionException");
+            fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future");
         } catch (ExecutionException e) {
             // expected
+            assertNotNull(e.getCause());
+            assertTrue(e.getCause() instanceof KafkaException);
         }
-        assertNotNull(setException.get());
-
-        // Callbacks can continue to arrive
-        tp0Future.resolve((RecordMetadata) null);
-        callback1.getValue().onCompletion(null, null);
 
         store.stop();
 
-        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
-        assertTrue(consumer.closed());
         PowerMock.verifyAll();
     }
 
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
 
-    private void expectStart() throws Exception {
-        PowerMock.expectPrivate(store, "createProducer")
-                .andReturn(producer);
-        PowerMock.expectPrivate(store, "createConsumer")
-                .andReturn(consumer);
+    private void expectStart(final List<ConsumerRecord<byte[], byte[]>> preexistingRecords) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, rec);
+                return null;
+            }
+        });
     }
 
     private void expectStop() {
-        producer.close();
+        storeLog.stop();
         PowerMock.expectLastCall();
-        // MockConsumer close is checked after test.
     }
 
     private static ByteBuffer buffer(String v) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
new file mode 100644
index 0000000..96c4bcc
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/KafkaBasedLogTest.java
@@ -0,0 +1,463 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaBasedLogTest {
+
+    private static final String TOPIC = "copycat-log";
+    private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
+    private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
+    private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>();
+    static {
+        PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+    }
+    private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>();
+    static {
+        CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+    }
+
+    private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1));
+    private static final Map<String, String> FIRST_SET = new HashMap<>();
+    static {
+        FIRST_SET.put("key", "value");
+        FIRST_SET.put(null, null);
+    }
+
+    private static final Node LEADER = new Node(1, "broker1", 9092);
+    private static final Node REPLICA = new Node(1, "broker2", 9093);
+
+    private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+    private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+
+    private static final String TP0_KEY = "TP0KEY";
+    private static final String TP1_KEY = "TP1KEY";
+    private static final String TP0_VALUE = "VAL0";
+    private static final String TP1_VALUE = "VAL1";
+    private static final String TP0_VALUE_NEW = "VAL0_NEW";
+    private static final String TP1_VALUE_NEW = "VAL1_NEW";
+
+    private Time time = new MockTime();
+    private KafkaBasedLog<String, String> store;
+
+    @Mock
+    private KafkaProducer<String, String> producer;
+    private MockConsumer<String, String> consumer;
+
+    private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();
+    private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
+            consumedRecords.add(record);
+        }
+    };
+
+    @Before
+    public void setUp() throws Exception {
+        store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
+                TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time);
+        consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+        consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
+        Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+        beginningOffsets.put(TP0, 0L);
+        beginningOffsets.put(TP1, 0L);
+        consumer.updateBeginningOffsets(beginningOffsets);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        store.start();
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testReloadOnStart() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Needs to seek to end to find end offsets
+                consumer.waitForPoll(10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+                    }
+                }, 10000);
+
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+        assertEquals(2, consumedRecords.size());
+        assertEquals(TP0_VALUE, consumedRecords.get(0).value());
+        assertEquals(TP1_VALUE, consumedRecords.get(1).value());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testSendAndReadToEnd() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
+        TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
+        ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future);
+
+        // Producer flushes when read to log end is called
+        producer.flush();
+        PowerMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it has partition info
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.seek(TP0, 0);
+                        consumer.seek(TP1, 0);
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        // Set some keys
+        final AtomicInteger invoked = new AtomicInteger(0);
+        org.apache.kafka.clients.producer.Callback producerCallback = new org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                invoked.incrementAndGet();
+            }
+        };
+        store.send(TP0_KEY, TP0_VALUE, producerCallback);
+        store.send(TP1_KEY, TP1_VALUE, producerCallback);
+        assertEquals(0, invoked.get());
+        tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing
+        callback1.getValue().onCompletion(null, null);
+        assertEquals(1, invoked.get());
+        tp0Future.resolve((RecordMetadata) null);
+        callback0.getValue().onCompletion(null, null);
+        assertEquals(2, invoked.get());
+
+        // Now we should have to wait for the records to be read back when we call readToEnd()
+        final CountDownLatch startOffsetUpdateLatch = new CountDownLatch(1);
+        Thread readNewDataThread = new Thread("read-new-data-thread") {
+            @Override
+            public void run() {
+                // Needs to be woken up after calling readToEnd()
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            startOffsetUpdateLatch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException("Interrupted");
+                        }
+                    }
+                }, 10000);
+
+                // Needs to seek to end to find end offsets
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            startOffsetUpdateLatch.await();
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException("Interrupted");
+                        }
+
+                        Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
+                        newEndOffsets.put(TP0, 2L);
+                        newEndOffsets.put(TP1, 2L);
+                        consumer.updateEndOffsets(newEndOffsets);
+                    }
+                }, 10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+                    }
+                }, 10000);
+
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW));
+                    }
+                }, 10000);
+            }
+        };
+        readNewDataThread.start();
+        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+        FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+                assertEquals(4, consumedRecords.size());
+                assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
+                assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+                getInvokedAndPassed.set(true);
+            }
+        });
+        store.readToEnd(readEndFutureCallback);
+        startOffsetUpdateLatch.countDown();
+        readNewDataThread.join(10000);
+        assertFalse(readNewDataThread.isAlive());
+        readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
+        assertTrue(getInvokedAndPassed.get());
+
+        // Cleanup
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testConsumerError() throws Exception {
+        expectStart();
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 1L);
+        endOffsets.put(TP1, 1L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Trigger exception
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.setException(Errors.CONSUMER_COORDINATOR_NOT_AVAILABLE.exception());
+                    }
+                }, 10000);
+
+                // Should keep polling until it reaches current log end offset for all partitions
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+                        consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW));
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testProducerError() throws Exception {
+        expectStart();
+        TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+        ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
+        Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+        EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        endOffsets.put(TP0, 0L);
+        endOffsets.put(TP1, 0L);
+        consumer.updateEndOffsets(endOffsets);
+        Thread startConsumerOpsThread = new Thread("start-consumer-ops-thread") {
+            @Override
+            public void run() {
+                // Should keep polling until it has partition info
+                consumer.waitForPollThen(new Runnable() {
+                    @Override
+                    public void run() {
+                        consumer.seek(TP0, 0);
+                        consumer.seek(TP1, 0);
+                    }
+                }, 10000);
+            }
+        };
+        startConsumerOpsThread.start();
+        store.start();
+        startConsumerOpsThread.join(10000);
+        assertFalse(startConsumerOpsThread.isAlive());
+        assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+        final AtomicReference<Throwable> setException = new AtomicReference<>();
+        store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                assertNull(setException.get()); // Should only be invoked once
+                setException.set(exception);
+            }
+        });
+        KafkaException exc = new LeaderNotAvailableException("Error");
+        tp0Future.resolve(exc);
+        callback0.getValue().onCompletion(null, exc);
+        assertNotNull(setException.get());
+
+        store.stop();
+
+        assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+        assertTrue(consumer.closed());
+        PowerMock.verifyAll();
+    }
+
+
+    private void expectStart() throws Exception {
+        PowerMock.expectPrivate(store, "createProducer")
+                .andReturn(producer);
+        PowerMock.expectPrivate(store, "createConsumer")
+                .andReturn(consumer);
+    }
+
+    private void expectStop() {
+        producer.close();
+        PowerMock.expectLastCall();
+        // MockConsumer close is checked after test.
+    }
+
+    private static ByteBuffer buffer(String v) {
+        return ByteBuffer.wrap(v.getBytes());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
index 8143c44..c5978ec 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/TestFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.copycat.util;
 
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -26,20 +27,35 @@ public class TestFuture<T> implements Future<T> {
     private volatile boolean resolved;
     private T result;
     private Throwable exception;
+    private CountDownLatch getCalledLatch;
+
+    private volatile boolean resolveOnGet;
+    private T resolveOnGetResult;
+    private Throwable resolveOnGetException;
 
     public TestFuture() {
         resolved = false;
+        getCalledLatch = new CountDownLatch(1);
+
+        resolveOnGet = false;
+        resolveOnGetResult = null;
+        resolveOnGetException = null;
     }
 
     public void resolve(T val) {
         this.result = val;
         resolved = true;
-
+        synchronized (this) {
+            this.notifyAll();
+        }
     }
 
     public void resolve(Throwable t) {
         exception = t;
         resolved = true;
+        synchronized (this) {
+            this.notifyAll();
+        }
     }
 
     @Override
@@ -59,6 +75,7 @@ public class TestFuture<T> implements Future<T> {
 
     @Override
     public T get() throws InterruptedException, ExecutionException {
+        getCalledLatch.countDown();
         while (true) {
             try {
                 return get(Integer.MAX_VALUE, TimeUnit.DAYS);
@@ -70,12 +87,69 @@ public class TestFuture<T> implements Future<T> {
 
     @Override
     public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
-        while (!resolved) {
-            this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+        getCalledLatch.countDown();
+
+        if (resolveOnGet) {
+            if (resolveOnGetException != null)
+                resolve(resolveOnGetException);
+            else
+                resolve(resolveOnGetResult);
+        }
+
+        synchronized (this) {
+            while (!resolved) {
+                this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+            }
         }
 
         if (exception != null)
             throw new ExecutionException(exception);
         return result;
     }
+
+    /**
+     * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately.
+     * @param val the value to return from the future
+     */
+    public void resolveOnGet(T val) {
+        resolveOnGet = true;
+        resolveOnGetResult = val;
+    }
+
+    /**
+     * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately.
+     * @param t the exception to return from the future
+     */
+    public void resolveOnGet(Throwable t) {
+        resolveOnGet = true;
+        resolveOnGetException = t;
+    }
+
+    /**
+     * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with
+     * the specified value.
+     * @param val the value to return from the future
+     */
+    public void waitForGetAndResolve(T val) {
+        waitForGet();
+        resolve(val);
+    }
+
+    /**
+     * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with
+     * the specified value.
+     * @param t the exception to use to resolve the future
+     */
+    public void waitForGetAndResolve(Throwable t) {
+        waitForGet();
+        resolve(t);
+    }
+
+    private void waitForGet() {
+        try {
+            getCalledLatch.await();
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Unexpected interruption: ", e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/services/copycat.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py
index ea5753b..4e2ab40 100644
--- a/tests/kafkatest/services/copycat.py
+++ b/tests/kafkatest/services/copycat.py
@@ -104,9 +104,10 @@ class CopycatStandaloneService(CopycatServiceBase):
 class CopycatDistributedService(CopycatServiceBase):
     """Runs Copycat in distributed mode."""
 
-    def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offsets"):
+    def __init__(self, context, num_nodes, kafka, files, offsets_topic="copycat-offsets", configs_topic="copycat-configs"):
         super(CopycatDistributedService, self).__init__(context, num_nodes, kafka, files)
         self.offsets_topic = offsets_topic
+        self.configs_topic = configs_topic
 
     def set_configs(self, config_template, connector_config_template):
         """

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/tests/copycat_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/copycat_distributed_test.py b/tests/kafkatest/tests/copycat_distributed_test.py
index 3afcd57..9d00334 100644
--- a/tests/kafkatest/tests/copycat_distributed_test.py
+++ b/tests/kafkatest/tests/copycat_distributed_test.py
@@ -29,6 +29,7 @@ class CopycatDistributedFileTest(KafkaTest):
 
     TOPIC = "test"
     OFFSETS_TOPIC = "copycat-offsets"
+    CONFIG_TOPIC = "copycat-configs"
 
     FIRST_INPUT_LISTS = [["foo", "bar", "baz"], ["foo2", "bar2", "baz2"]]
     FIRST_INPUTS = ["\n".join(input_list) + "\n" for input_list in FIRST_INPUT_LISTS]
@@ -42,8 +43,13 @@ class CopycatDistributedFileTest(KafkaTest):
             'test' : { 'partitions': 1, 'replication-factor': 1 }
         })
 
-        self.source = CopycatDistributedService(test_context, 2, self.kafka, [self.INPUT_FILE])
-        self.sink = CopycatDistributedService(test_context, 2, self.kafka, [self.OUTPUT_FILE])
+        # FIXME these should have multiple nodes. However, currently the connectors are submitted via command line,
+        # which means we would get duplicates. Both would run, but they would have conflicting keys for offsets and
+        # configs. Until we have real distributed coordination of workers with unified connector submission, we need
+        # to restrict each of these to a single node.
+        self.num_nodes = 1
+        self.source = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.INPUT_FILE])
+        self.sink = CopycatDistributedService(test_context, self.num_nodes, self.kafka, [self.OUTPUT_FILE])
 
     def test_file_source_and_sink(self, converter="org.apache.kafka.copycat.json.JsonConverter", schemas=True):
         assert converter != None, "converter type must be set"
@@ -62,7 +68,7 @@ class CopycatDistributedFileTest(KafkaTest):
         # Generating data on the source node should generate new records and create new output on the sink node
         for node, input in zip(self.source.nodes, self.FIRST_INPUTS):
             node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Data added to input file was not seen in the output file in a reasonable amount of time.")
 
         # Restarting both should result in them picking up where they left off,
         # only processing new data.
@@ -71,7 +77,7 @@ class CopycatDistributedFileTest(KafkaTest):
 
         for node, input in zip(self.source.nodes, self.SECOND_INPUTS):
             node.account.ssh("echo -e -n " + repr(input) + " >> " + self.INPUT_FILE)
-        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS + self.SECOND_INPUT_LISTS), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
+        wait_until(lambda: self.validate_output(self.FIRST_INPUT_LISTS[:self.num_nodes] + self.SECOND_INPUT_LISTS[:self.num_nodes]), timeout_sec=60, err_msg="Sink output file never converged to the same state as the input file")
 
     def validate_output(self, inputs):
         try:

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/tests/templates/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-distributed.properties b/tests/kafkatest/tests/templates/copycat-distributed.properties
index da47423..31f9901 100644
--- a/tests/kafkatest/tests/templates/copycat-distributed.properties
+++ b/tests/kafkatest/tests/templates/copycat-distributed.properties
@@ -24,9 +24,10 @@ key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 {% endif %}
 
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
+config.storage.topic={{ CONFIG_TOPIC }}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/tests/kafkatest/tests/templates/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/copycat-standalone.properties b/tests/kafkatest/tests/templates/copycat-standalone.properties
index 39db6ce..c89490a 100644
--- a/tests/kafkatest/tests/templates/copycat-standalone.properties
+++ b/tests/kafkatest/tests/templates/copycat-standalone.properties
@@ -24,9 +24,9 @@ key.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 value.converter.schemas.enable={{ schemas|default(True)|string|lower }}
 {% endif %}
 
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
 
 offset.storage.file.filename={{ OFFSETS_FILE }}


[2/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of Copycat configs.

Posted by gw...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
index a74b39c..b5af1fe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -17,39 +17,22 @@
 
 package org.apache.kafka.copycat.storage;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConvertingFutureCallback;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -70,99 +53,42 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
     public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
 
-    private final static long CREATE_TOPIC_TIMEOUT_MS = 30000;
-
-    private Time time;
-    private Map<String, ?> configs;
-    private String topic;
-    private Consumer<byte[], byte[]> consumer;
-    private Producer<byte[], byte[]> producer;
+    private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
 
-    private Thread thread;
-    private boolean stopRequested;
-    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-
-    public KafkaOffsetBackingStore() {
-        this(new SystemTime());
-    }
-
-    public KafkaOffsetBackingStore(Time time) {
-        this.time = time;
-    }
-
     @Override
     public void configure(Map<String, ?> configs) {
-        this.configs = configs;
-        topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
+        String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
         if (topic == null)
             throw new CopycatException("Offset storage topic must be specified");
 
         data = new HashMap<>();
-        stopRequested = false;
-        readLogEndOffsetCallbacks = new ArrayDeque<>();
-    }
-
-    @Override
-    public void start() {
-        log.info("Starting KafkaOffsetBackingStore with topic " + topic);
-
-        producer = createProducer();
-        consumer = createConsumer();
-        List<TopicPartition> partitions = new ArrayList<>();
-
-        // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
-        // we rely on topic auto-creation
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
-            partitionInfos = consumer.partitionsFor(topic);
-            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
-        }
-        if (partitionInfos == null)
-            throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
-                    " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
-                    " this is your first use of the topic it may have taken too long to create.");
 
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
-        consumer.assign(partitions);
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
 
-        readToLogEnd();
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
-        thread = new WorkThread();
-        thread.start();
+        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
+    }
 
+    @Override
+    public void start() {
+        log.info("Starting KafkaOffsetBackingStore");
+        offsetLog.start();
         log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
     }
 
     @Override
     public void stop() {
         log.info("Stopping KafkaOffsetBackingStore");
-
-        synchronized (this) {
-            stopRequested = true;
-            consumer.wakeup();
-        }
-
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            throw new CopycatException("Failed to stop KafkaOffsetBackingStore. Exiting without cleanly shutting " +
-                    "down it's producer and consumer.", e);
-        }
-
-        try {
-            producer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to close KafkaOffsetBackingStore producer", e);
-        }
-
-        try {
-            consumer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to close KafkaOffsetBackingStore consumer", e);
-        }
+        offsetLog.stop();
+        log.info("Stopped KafkaOffsetBackingStore");
     }
 
     @Override
@@ -172,15 +98,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
             @Override
             public Map<ByteBuffer, ByteBuffer> convert(Void result) {
                 Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
-                synchronized (KafkaOffsetBackingStore.this) {
-                    for (ByteBuffer key : keys)
-                        values.put(key, data.get(key));
-                }
+                for (ByteBuffer key : keys)
+                    values.put(key, data.get(key));
                 return values;
             }
         };
-        readLogEndOffsetCallbacks.add(future);
-        consumer.wakeup();
+        // This operation may be relatively (but not too) expensive since it always requires checking end offsets, even
+        // if we've already read up to the end. However, it also should not be common (offsets should only be read when
+        // resetting a task). Always requiring that we read to the end is simpler than trying to differentiate when it
+        // is safe not to (which should only be if we *know* we've maintained ownership since the last write).
+        offsetLog.readToEnd(future);
         return future;
     }
 
@@ -188,95 +115,26 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
         SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
 
-        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
-            producer.send(new ProducerRecord<>(topic, entry.getKey().array(), entry.getValue().array()), producerCallback);
-        }
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+            offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback);
 
         return producerCallback;
     }
 
-
-
-    private Producer<byte[], byte[]> createProducer() {
-        Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        return new KafkaProducer<>(producerProps);
-    }
-
-    private Consumer<byte[], byte[]> createConsumer() {
-        Map<String, Object> consumerConfig = new HashMap<>();
-        consumerConfig.putAll(configs);
-        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        return new KafkaConsumer<>(consumerConfig);
-    }
-
-    private void poll(long timeoutMs) {
-        try {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(timeoutMs);
-            for (ConsumerRecord record : records) {
-                ByteBuffer key = record.key() != null ? ByteBuffer.wrap((byte[]) record.key()) : null;
-                ByteBuffer value = record.value() != null ? ByteBuffer.wrap((byte[]) record.value()) : null;
-                data.put(key, value);
-            }
-        } catch (ConsumerWakeupException e) {
-            // Expected on get() or stop(). The calling code should handle this
-            throw e;
-        } catch (KafkaException e) {
-            log.error("Error polling: " + e);
-        }
-    }
-
-    private void readToLogEnd() {
-        log.trace("Reading to end of offset log");
-
-        Set<TopicPartition> assignment = consumer.assignment();
-
-        // This approach to getting the current end offset is hacky until we have an API for looking these up directly
-        Map<TopicPartition, Long> offsets = new HashMap<>();
-        for (TopicPartition tp : assignment) {
-            long offset = consumer.position(tp);
-            offsets.put(tp, offset);
-            consumer.seekToEnd(tp);
-        }
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        try {
-            poll(0);
-        } finally {
-            // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
-            // the consumers position is reset or it'll get into an inconsistent state.
-            for (TopicPartition tp : assignment) {
-                long startOffset = offsets.get(tp);
-                long endOffset = consumer.position(tp);
-                if (endOffset > startOffset) {
-                    endOffsets.put(tp, endOffset);
-                    consumer.seek(tp, startOffset);
-                }
-                log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
-            }
+    private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
+            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
+            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+            data.put(key, value);
         }
+    };
 
-        while (!endOffsets.isEmpty()) {
-            poll(Integer.MAX_VALUE);
-
-            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
-            while (it.hasNext()) {
-                Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
-                    it.remove();
-                else
-                    break;
-            }
-        }
+    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
+                                                              Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
     }
 
-
     private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
         private int numLeft;
         private boolean completed = false;
@@ -349,45 +207,5 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         }
     }
 
-    private class WorkThread extends Thread {
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    int numCallbacks;
-                    synchronized (KafkaOffsetBackingStore.this) {
-                        if (stopRequested)
-                            break;
-                        numCallbacks = readLogEndOffsetCallbacks.size();
-                    }
-
-                    if (numCallbacks > 0) {
-                        try {
-                            readToLogEnd();
-                        } catch (ConsumerWakeupException e) {
-                            // Either received another get() call and need to retry reading to end of log or stop() was
-                            // called. Both are handled by restarting this loop.
-                            continue;
-                        }
-                    }
 
-                    synchronized (KafkaOffsetBackingStore.this) {
-                        for (int i = 0; i < numCallbacks; i++) {
-                            Callback<Void> cb = readLogEndOffsetCallbacks.poll();
-                            cb.onCompletion(null, null);
-                        }
-                    }
-
-                    try {
-                        poll(Integer.MAX_VALUE);
-                    } catch (ConsumerWakeupException e) {
-                        // See previous comment, both possible causes of this wakeup are handled by starting this loop again
-                        continue;
-                    }
-                }
-            } catch (Throwable t) {
-                log.error("Unexpected exception in KafkaOffsetBackingStore's work thread", t);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index dbb3d0d..84229a5 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -92,7 +92,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
                     continue;
                 }
                 Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
-                SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
+                SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue() != null ? rawEntry.getValue().array() : null);
                 Object deserializedValue = deserializedSchemaAndValue.value();
                 OffsetUtils.validateFormat(deserializedValue);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
index bd3a87b..8d78a57 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -25,6 +25,9 @@ import java.util.Map;
 
 public class OffsetUtils {
     public static void validateFormat(Object offsetData) {
+        if (offsetData == null)
+            return;
+
         if (!(offsetData instanceof Map))
             throw new DataException("Offsets must be specified as a Map");
         validateFormat((Map<Object, Object>) offsetData);

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
new file mode 100644
index 0000000..5e860d9
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ *     KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all
+ *     clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
+ * </p>
+ * <p>
+ *     This functionality is useful for storing different types of data that all clients may need to agree on --
+ *     offsets or config for example. This class runs a consumer in a background thread to continuously tail the target
+ *     topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful
+ *     utilities like checking the current log end offset and waiting until the current end of the log is reached.
+ * </p>
+ * <p>
+ *     To support different use cases, this class works with either single- or multi-partition topics.
+ * </p>
+ * <p>
+ *     Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
+ *     record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
+ *     calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
+ *     and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
+ * </p>
+ */
+public class KafkaBasedLog<K, V> {
+    private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
+    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+
+    private Time time;
+    private final String topic;
+    private final Map<String, Object> producerConfigs;
+    private final Map<String, Object> consumerConfigs;
+    private final Callback<ConsumerRecord<K, V>> consumedCallback;
+    private Consumer<K, V> consumer;
+    private Producer<K, V> producer;
+
+    private Thread thread;
+    private boolean stopRequested;
+    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
+
+    /**
+     * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
+     * {@link #start()} is invoked.
+     *
+     * @param topic the topic to treat as a log
+     * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
+     *                        contain compatible serializer settings for the generic types used on this class. Some
+     *                        setting, such as the number of acks, will be overridden to ensure correct behavior of this
+     *                        class.
+     * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
+     *                        contain compatible serializer settings for the generic types used on this class. Some
+     *                        setting, such as the auto offset reset policy, will be overridden to ensure correct
+     *                        behavior of this class.
+     * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
+     * @param time Time interface
+     */
+    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
+                         Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+        this.topic = topic;
+        this.producerConfigs = producerConfigs;
+        this.consumerConfigs = consumerConfigs;
+        this.consumedCallback = consumedCallback;
+        this.stopRequested = false;
+        this.readLogEndOffsetCallbacks = new ArrayDeque<>();
+        this.time = time;
+    }
+
+    public void start() {
+        log.info("Starting KafkaBasedLog with topic " + topic);
+
+        producer = createProducer();
+        consumer = createConsumer();
+
+        List<TopicPartition> partitions = new ArrayList<>();
+
+        // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
+        // we rely on topic auto-creation
+        List<PartitionInfo> partitionInfos = null;
+        long started = time.milliseconds();
+        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+            partitionInfos = consumer.partitionsFor(topic);
+            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
+        }
+        if (partitionInfos == null)
+            throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
+                    " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
+                    " this is your first use of the topic it may have taken too long to create.");
+
+        for (PartitionInfo partition : partitionInfos)
+            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        consumer.assign(partitions);
+
+        readToLogEnd();
+
+        thread = new WorkThread();
+        thread.start();
+
+        log.info("Finished reading KafakBasedLog for topic " + topic);
+
+        log.info("Started KafakBasedLog for topic " + topic);
+    }
+
+    public void stop() {
+        log.info("Stopping KafkaBasedLog for topic " + topic);
+
+        synchronized (this) {
+            stopRequested = true;
+        }
+        consumer.wakeup();
+
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            throw new CopycatException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " +
+                    "down it's producer and consumer.", e);
+        }
+
+        try {
+            producer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to stop KafkaBasedLog producer", e);
+        }
+
+        try {
+            consumer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to stop KafkaBasedLog consumer", e);
+        }
+
+        log.info("Stopped KafkaBasedLog for topic " + topic);
+    }
+
+    /**
+     * Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.
+     * Note that this checks the current, offsets, reads to them, and invokes the callback regardless of whether
+     * additional records have been written to the log. If the caller needs to ensure they have truly reached the end
+     * of the log, they must ensure there are no other writers during this period.
+     *
+     * This waits until the end of all partitions has been reached.
+     *
+     * This method is asynchronous. If you need a synchronous version, pass an instance of
+     * {@link org.apache.kafka.copycat.util.FutureCallback} as the {@param callback} parameter and wait on it to block.
+     *
+     * @param callback the callback to invoke once the end of the log has been reached.
+     */
+    public void readToEnd(Callback<Void> callback) {
+        producer.flush();
+        synchronized (this) {
+            readLogEndOffsetCallbacks.add(callback);
+        }
+        consumer.wakeup();
+    }
+
+    /**
+     * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
+     * @return the future associated with the operation
+     */
+    public Future<Void> readToEnd() {
+        FutureCallback<Void> future = new FutureCallback<>(null);
+        readToEnd(future);
+        return future;
+    }
+
+    public void send(K key, V value) {
+        send(key, value, null);
+    }
+
+    public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+        producer.send(new ProducerRecord<>(topic, key, value), callback);
+    }
+
+
+    private Producer<K, V> createProducer() {
+        // Always require producer acks to all to ensure durable writes
+        producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+        return new KafkaProducer<>(producerConfigs);
+    }
+
+    private Consumer<K, V> createConsumer() {
+        // Always force reset to the beginning of the log since this class wants to consume all available log data
+        consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return new KafkaConsumer<>(consumerConfigs);
+    }
+
+    private void poll(long timeoutMs) {
+        try {
+            ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
+            for (ConsumerRecord<K, V> record : records)
+                consumedCallback.onCompletion(null, record);
+        } catch (ConsumerWakeupException e) {
+            // Expected on get() or stop(). The calling code should handle this
+            throw e;
+        } catch (KafkaException e) {
+            log.error("Error polling: " + e);
+        }
+    }
+
+    private void readToLogEnd() {
+        log.trace("Reading to end of offset log");
+
+        Set<TopicPartition> assignment = consumer.assignment();
+
+        // This approach to getting the current end offset is hacky until we have an API for looking these up directly
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        for (TopicPartition tp : assignment) {
+            long offset = consumer.position(tp);
+            offsets.put(tp, offset);
+            consumer.seekToEnd(tp);
+        }
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        try {
+            poll(0);
+        } finally {
+            // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
+            // the consumers position is reset or it'll get into an inconsistent state.
+            for (TopicPartition tp : assignment) {
+                long startOffset = offsets.get(tp);
+                long endOffset = consumer.position(tp);
+                if (endOffset > startOffset) {
+                    endOffsets.put(tp, endOffset);
+                    consumer.seek(tp, startOffset);
+                }
+                log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
+            }
+        }
+
+        while (!endOffsets.isEmpty()) {
+            poll(Integer.MAX_VALUE);
+
+            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<TopicPartition, Long> entry = it.next();
+                if (consumer.position(entry.getKey()) >= entry.getValue())
+                    it.remove();
+                else
+                    break;
+            }
+        }
+    }
+
+
+    private class WorkThread extends Thread {
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    int numCallbacks;
+                    synchronized (KafkaBasedLog.this) {
+                        if (stopRequested)
+                            break;
+                        numCallbacks = readLogEndOffsetCallbacks.size();
+                    }
+
+                    if (numCallbacks > 0) {
+                        try {
+                            readToLogEnd();
+                        } catch (ConsumerWakeupException e) {
+                            // Either received another get() call and need to retry reading to end of log or stop() was
+                            // called. Both are handled by restarting this loop.
+                            continue;
+                        }
+                    }
+
+                    synchronized (KafkaBasedLog.this) {
+                        // Only invoke exactly the number of callbacks we found before triggering the read to log end
+                        // since it is possible for another write + readToEnd to sneak in in the meantime
+                        for (int i = 0; i < numCallbacks; i++) {
+                            Callback<Void> cb = readLogEndOffsetCallbacks.poll();
+                            cb.onCompletion(null, null);
+                        }
+                    }
+
+                    try {
+                        poll(Integer.MAX_VALUE);
+                    } catch (ConsumerWakeupException e) {
+                        // See previous comment, both possible causes of this wakeup are handled by starting this loop again
+                        continue;
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("Unexpected exception in KafkaBasedLog's work thread", t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index e4d1d8e..d33f846 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -97,10 +97,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
-        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
         workerConfig = new WorkerConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 3ff3a62..13d5228 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -92,10 +92,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
-        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
         config = new WorkerConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index e75d2f9..4a30992 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -57,10 +57,10 @@ public class WorkerTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
-        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
         config = new WorkerConfig(workerProps);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
new file mode 100644
index 0000000..0463b85
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceConnector;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DistributedHerder.class})
+@PowerMockIgnore("javax.management.*")
+public class DistributedHerderTest {
+    private static final List<String> CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2", "sink-test3");
+    private static final List<String> SOURCE_CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2");
+    private static final List<String> SINK_CONNECTOR_NAMES = Arrays.asList("sink-test3");
+    private static final String TOPICS_LIST_STR = "topic1,topic2";
+
+    private static final Map<String, String> CONFIG_STORAGE_CONFIG = Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+
+    @Mock private KafkaConfigStorage configStorage;
+    private DistributedHerder herder;
+    @Mock private Worker worker;
+    @Mock private Callback<String> createCallback;
+
+    private Map<String, Map<String, String>> connectorProps;
+    private Map<String, Class<? extends Connector>> connectorClasses;
+    private Map<String, Class<? extends Task>> connectorTaskClasses;
+    private Map<String, Connector> connectors;
+    private Properties taskProps;
+
+    @Before
+    public void setUp() {
+        worker = PowerMock.createMock(Worker.class);
+        herder = new DistributedHerder(worker, configStorage);
+
+        connectorProps = new HashMap<>();
+        connectorClasses = new HashMap<>();
+        connectorTaskClasses = new HashMap<>();
+        connectors = new HashMap<>();
+        for (String connectorName : CONNECTOR_NAMES) {
+            Class<? extends Connector> connectorClass = connectorName.contains("source") ? BogusSourceConnector.class : BogusSinkConnector.class;
+            Class<? extends Task> taskClass = connectorName.contains("source") ? BogusSourceTask.class : BogusSinkTask.class;
+            Connector connector = connectorName.contains("source") ? PowerMock.createMock(BogusSourceConnector.class) : PowerMock.createMock(BogusSinkConnector.class);
+
+            Map<String, String> props = new HashMap<>();
+            props.put(ConnectorConfig.NAME_CONFIG, connectorName);
+            props.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+            props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
+
+            connectorProps.put(connectorName, props);
+            connectorClasses.put(connectorName, connectorClass);
+            connectorTaskClasses.put(connectorName, taskClass);
+            connectors.put(connectorName, connector);
+        }
+
+        PowerMock.mockStatic(DistributedHerder.class);
+
+        // These can be anything since connectors can pass along whatever they want.
+        taskProps = new Properties();
+        taskProps.setProperty("foo", "bar");
+    }
+
+    @Test
+    public void testCreateSourceConnector() throws Exception {
+        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateSinkConnector() throws Exception {
+        String connectorName = SINK_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDestroyConnector() throws Exception {
+        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        expectDestroy(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+
+            }
+        });
+        herder.deleteConnector(CONNECTOR_NAMES.get(0), futureCb);
+        futureCb.get(1000L, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateAndStop() throws Exception {
+        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestoreAndStop() throws Exception {
+        String restoreConnectorName1 = SOURCE_CONNECTOR_NAMES.get(0);
+        String restoreConnectorName2 = SINK_CONNECTOR_NAMES.get(0);
+        String additionalConnectorName = SOURCE_CONNECTOR_NAMES.get(1);
+
+        expectConfigStorageConfigureStart();
+        expectRestore(Arrays.asList(restoreConnectorName1, restoreConnectorName2));
+        expectAdd(additionalConnectorName);
+        // Stopping the herder should correctly stop all restored and new connectors
+        expectStop(restoreConnectorName1);
+        expectStop(restoreConnectorName2);
+        expectStop(additionalConnectorName);
+        configStorage.stop();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(additionalConnectorName), createCallback);
+        herder.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigStorageConfigureStart() {
+        configStorage.configure(CONFIG_STORAGE_CONFIG);
+        PowerMock.expectLastCall();
+        configStorage.start();
+        PowerMock.expectLastCall();
+    }
+
+    private void expectAdd(String connectorName) throws Exception {
+        configStorage.putConnectorConfig(connectorName, connectorProps.get(connectorName));
+        PowerMock.expectLastCall();
+        expectInstantiateConnector(connectorName, true);
+    }
+
+    private void expectEmptyRestore() throws Exception {
+        expectRestore(Collections.<String>emptyList());
+    }
+
+    private void expectRestore(List<String> connectorNames) throws Exception {
+        Map<String, Integer> rootConfig = new HashMap<>();
+        Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+        for (String connName : connectorNames) {
+            rootConfig.put(connName, 0);
+            connectorConfigs.put(connName, connectorProps.get(connName));
+        }
+        EasyMock.expect(configStorage.snapshot())
+                .andReturn(new ClusterConfigState(1, rootConfig, connectorConfigs, Collections.EMPTY_MAP, Collections.EMPTY_SET));
+
+        // Restore never uses a callback
+        for (String connectorName : connectorNames)
+            expectInstantiateConnector(connectorName, false);
+    }
+
+    private void expectInstantiateConnector(String connectorName, boolean expectCallback) throws Exception {
+        PowerMock.expectPrivate(DistributedHerder.class, "instantiateConnector", connectorClasses.get(connectorName).getName())
+                .andReturn(connectors.get(connectorName));
+        if (expectCallback) {
+            createCallback.onCompletion(null, connectorName);
+            PowerMock.expectLastCall();
+        }
+
+        Connector connector = connectors.get(connectorName);
+        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
+        PowerMock.expectLastCall();
+        connector.start(new Properties());
+        PowerMock.expectLastCall();
+
+        // Just return the connector properties for the individual task we generate by default
+        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(connectorTaskClasses.get(connectorName));
+
+        EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
+                .andReturn(Arrays.asList(taskProps));
+        // And we should instantiate the tasks. For a sink task, we should see added properties for
+        // the input topic partitions
+        Properties generatedTaskProps = new Properties();
+        generatedTaskProps.putAll(taskProps);
+        if (connectorName.contains("sink"))
+            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+        ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
+        worker.addTask(taskId, connectorTaskClasses.get(connectorName).getName(), generatedTaskProps);
+        PowerMock.expectLastCall();
+    }
+
+    private void expectStop(String connectorName) {
+        worker.stopTask(new ConnectorTaskId(connectorName, 0));
+        EasyMock.expectLastCall();
+        Connector connector = connectors.get(connectorName);
+        connector.stop();
+        EasyMock.expectLastCall();
+    }
+
+    private void expectDestroy(String connectorName) {
+        expectStop(connectorName);
+        configStorage.putConnectorConfig(connectorName, null);
+        PowerMock.expectLastCall();
+    }
+
+    // We need to use a real class here due to some issue with mocking java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
+    }
+
+    private abstract class BogusSourceTask extends SourceTask {
+    }
+
+    private abstract class BogusSinkConnector extends SinkConnector {
+    }
+
+    private abstract class BogusSinkTask extends SourceTask {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
index 477893b..606b94d 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.copycat.runtime.standalone;
 import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -39,6 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -54,7 +57,7 @@ public class StandaloneHerderTest {
     private Connector connector;
     @Mock protected Callback<String> createCallback;
 
-    private Properties connectorProps;
+    private Map<String, String> connectorProps;
     private Properties taskProps;
 
     @Before
@@ -62,9 +65,9 @@ public class StandaloneHerderTest {
         worker = PowerMock.createMock(Worker.class);
         herder = new StandaloneHerder(worker);
 
-        connectorProps = new Properties();
-        connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
-        connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+        connectorProps = new HashMap<>();
+        connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+        connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
         PowerMock.mockStatic(StandaloneHerder.class);
 
         // These can be anything since connectors can pass along whatever they want.
@@ -74,8 +77,8 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceClass.class);
-        expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
         PowerMock.replayAll();
 
         herder.addConnector(connectorProps, createCallback);
@@ -85,8 +88,8 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateSinkConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSinkClass.class);
-        expectAdd(BogusSinkClass.class, BogusSinkTask.class, true);
+        connector = PowerMock.createMock(BogusSinkConnector.class);
+        expectAdd(BogusSinkConnector.class, BogusSinkTask.class, true);
 
         PowerMock.replayAll();
 
@@ -97,8 +100,8 @@ public class StandaloneHerderTest {
 
     @Test
     public void testDestroyConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceClass.class);
-        expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
         expectDestroy();
         PowerMock.replayAll();
 
@@ -114,32 +117,31 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCreateAndStop() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectStop();
+        PowerMock.replayAll();
 
-    private void expectAdd(Class<? extends Connector> connClass,
-                             Class<? extends Task> taskClass,
-                             boolean sink) throws Exception {
-        expectCreate(connClass, taskClass, sink, true);
-    }
+        herder.addConnector(connectorProps, createCallback);
+        herder.stop();
 
-    private void expectRestore(Class<? extends Connector> connClass,
-                                 Class<? extends Task> taskClass) throws Exception {
-        // Restore never uses a callback. These tests always use sources
-        expectCreate(connClass, taskClass, false, false);
+        PowerMock.verifyAll();
     }
 
-    private void expectCreate(Class<? extends Connector> connClass,
-                                Class<? extends Task> taskClass,
-                                boolean sink, boolean expectCallback) throws Exception {
-        connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+    private void expectAdd(Class<? extends Connector> connClass,
+                           Class<? extends Task> taskClass,
+                           boolean sink) throws Exception {
+        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
 
         PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
                 .andReturn(connector);
-        if (expectCallback) {
-            createCallback.onCompletion(null, CONNECTOR_NAME);
-            PowerMock.expectLastCall();
-        }
 
-        connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
+        createCallback.onCompletion(null, CONNECTOR_NAME);
+        PowerMock.expectLastCall();
+
+        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
         PowerMock.expectLastCall();
         connector.start(new Properties());
         PowerMock.expectLastCall();
@@ -171,13 +173,13 @@ public class StandaloneHerderTest {
     }
 
     // We need to use a real class here due to some issue with mocking java.lang.Class
-    private abstract class BogusSourceClass extends SourceConnector {
+    private abstract class BogusSourceConnector extends SourceConnector {
     }
 
     private abstract class BogusSourceTask extends SourceTask {
     }
 
-    private abstract class BogusSinkClass extends SinkConnector {
+    private abstract class BogusSinkConnector extends SinkConnector {
     }
 
     private abstract class BogusSinkTask extends SourceTask {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
new file mode 100644
index 0000000..b02b752
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.copycat.data.Field;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
+import org.apache.kafka.copycat.util.TestFuture;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConfigStorage.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaConfigStorageTest {
+    private static final String TOPIC = "copycat-configs";
+    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+
+    static {
+        DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
+        DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+    }
+
+    private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
+    private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
+    private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
+
+    // Need a) connector with multiple tasks and b) multiple connectors
+    private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
+            new ConnectorTaskId("connector1", 0),
+            new ConnectorTaskId("connector1", 1),
+            new ConnectorTaskId("connector2", 0)
+    );
+    private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
+
+    // Need some placeholders -- the contents don't matter here, just that they are restored properly
+    private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
+            Collections.singletonMap("config-key-one", "config-value-one"),
+            Collections.singletonMap("config-key-two", "config-value-two"),
+            Collections.singletonMap("config-key-three", "config-value-three")
+    );
+    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
+    );
+    private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
+    );
+
+    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
+            = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
+
+    // The exact format doesn't matter here since both conversions are mocked
+    private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
+            "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
+            "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
+            "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
+    );
+
+    @Mock
+    private Converter converter;
+    @Mock
+    private Callback<String> connectorReconfiguredCallback;
+    @Mock
+    private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback;
+    @Mock
+    KafkaBasedLog<String, byte[]> storeLog;
+    private KafkaConfigStorage configStorage;
+
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+
+    private long logOffset = 0;
+
+    @Before
+    public void setUp() {
+        configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"},
+                converter, connectorReconfiguredCallback, tasksReconfiguredCallback);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+        configStorage.start();
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configState = configStorage.snapshot();
+        assertEquals(0, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Second should also block and all configs should still be available
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configState = configStorage.snapshot();
+        assertEquals(1, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigs() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        // Task configs should read to end, write to the log, read to end, write root, then read to end again
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 2); // Starts with 0 tasks, after update has 2
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Bootstrap as if we had already added the connector, but no tasks had been added yet
+        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+        // Writing task task configs should block until all the writes have been performed and the root record update
+        // has completed
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configStorage.putTaskConfigs(taskConfigs);
+
+        // Validate root config by listing all connectors and tasks
+        configState = configStorage.snapshot();
+        assertEquals(2, configState.offset());
+        String connectorName = CONNECTOR_IDS.get(0);
+        assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        // Restoring data should notify only of the latest values after loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                // Connector after root update should make it through, task update shouldn't
+                new ConsumerRecord<>(TOPIC, 0, 5, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+                new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 7;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be last read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
+        // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
+        // We start out by loading an initial configuration where we started to write a task update and failed before
+        // writing an the commit, and then compaction cleaned up the earlier record.
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                // This is the record that has been compacted:
+                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+
+        // One failed attempt to write new task configs
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+
+        // Successful attempt to write new task config
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 1); // Updated to just 1 task
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0)));
+        EasyMock.expectLastCall();
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+        // After reading the log, it should have been in an inconsistent state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be last read, not last committed
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
+        assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_IDS.get(0))), configState.inconsistentConnectors());
+
+        // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
+        try {
+            configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
+            fail("Should have failed due to incomplete task set.");
+        } catch (KafkaException e) {
+            // expected
+        }
+
+        // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
+        // we are going to shrink the number of tasks to 1
+        configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        // Validate updated config
+        configState = configStorage.snapshot();
+        // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
+        // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
+        assertEquals(7, configState.offset());
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
+                EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
+
+    // If non-empty, deserializations should be a LinkedHashMap
+    private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
+                             final Map<byte[], Struct> deserializations) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, rec);
+                return null;
+            }
+        });
+        for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
+            // Note null schema because default settings for internal serialization are schema-less
+            EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
+                    .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
+        }
+    }
+
+    private void expectStop() {
+        storeLog.stop();
+        PowerMock.expectLastCall();
+    }
+
+    // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
+    // from the log. Validate the data that is captured when the conversion is performed matches the specified data
+    // (by checking a single field's value)
+    private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                        final String dataFieldName, final Object dataFieldValue) {
+        final Capture<Struct> capturedRecord = EasyMock.newCapture();
+        EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+                .andReturn(serialized);
+        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+        PowerMock.expectLastCall();
+        EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
+                .andAnswer(new IAnswer<SchemaAndValue>() {
+                    @Override
+                    public SchemaAndValue answer() throws Throwable {
+                        assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
+                        // Note null schema because default settings for internal serialization are schema-less
+                        return new SchemaAndValue(null, structToMap(capturedRecord.getValue()));
+                    }
+                });
+    }
+
+    // This map needs to maintain ordering
+    private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
+        EasyMock.expect(storeLog.readToEnd())
+                .andAnswer(new IAnswer<Future<Void>>() {
+                    @Override
+                    public Future<Void> answer() throws Throwable {
+                        TestFuture<Void> future = new TestFuture<Void>();
+                        for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
+                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue()));
+                        future.resolveOnGet((Void) null);
+                        return future;
+                    }
+                });
+    }
+
+
+    private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                                                 final String dataFieldName, final Object dataFieldValue) {
+        expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(configKey, serialized);
+        expectReadToEnd(recordsToRead);
+    }
+
+    // Manually insert a connector into config storage, updating the task configs, connector config, and root config
+    private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
+        Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
+        for (int i = 0; i < taskConfigs.size(); i++)
+            storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
+
+        Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
+        connectorConfigs.put(connectorName, connectorConfig);
+
+        Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
+    }
+
+    // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
+    private Map<String, Object> structToMap(Struct struct) {
+        HashMap<String, Object> result = new HashMap<>();
+        for (Field field : struct.schema().fields())
+            result.put(field.name(), struct.get(field));
+        return result;
+    }
+
+}


[3/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of Copycat configs.

Posted by gw...@apache.org.
KAFKA-2372: Add Kafka-backed storage of Copycat configs.

This also adds some other needed infrastructure for distributed Copycat, most
importantly the DistributedHerder, and refactors some code for handling
Kafka-backed logs into KafkaBasedLog since this is shared betweeen offset and
config storage.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira, James Cheng

Closes #241 from ewencp/kafka-2372-copycat-distributed-config


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36d44693
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36d44693
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36d44693

Branch: refs/heads/trunk
Commit: 36d4469326fe20c3f0657315321e6ad515530a3e
Parents: e2ec02e
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Tue Oct 13 10:23:21 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Tue Oct 13 10:23:21 2015 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/common/utils/Utils.java    |  13 +
 config/copycat-distributed.properties           |  13 +-
 config/copycat-standalone.properties            |  12 +-
 .../kafka/copycat/cli/CopycatDistributed.java   |   8 +-
 .../kafka/copycat/cli/CopycatStandalone.java    |   2 +-
 .../apache/kafka/copycat/cli/WorkerConfig.java  |  20 +-
 .../kafka/copycat/runtime/ConnectorConfig.java  |   7 +-
 .../apache/kafka/copycat/runtime/Herder.java    |  19 +-
 .../copycat/runtime/HerderConnectorContext.java |  42 ++
 .../apache/kafka/copycat/runtime/Worker.java    |  23 +-
 .../runtime/distributed/ClusterConfigState.java | 122 +++++
 .../runtime/distributed/DistributedHerder.java  | 320 +++++++++++
 .../standalone/StandaloneConnectorContext.java  |  42 --
 .../runtime/standalone/StandaloneHerder.java    |  34 +-
 .../copycat/storage/KafkaConfigStorage.java     | 546 +++++++++++++++++++
 .../storage/KafkaOffsetBackingStore.java        | 258 ++-------
 .../storage/OffsetStorageReaderImpl.java        |   2 +-
 .../kafka/copycat/storage/OffsetUtils.java      |   3 +
 .../kafka/copycat/util/KafkaBasedLog.java       | 331 +++++++++++
 .../copycat/runtime/WorkerSinkTaskTest.java     |   8 +-
 .../copycat/runtime/WorkerSourceTaskTest.java   |   8 +-
 .../kafka/copycat/runtime/WorkerTest.java       |   8 +-
 .../distributed/DistributedHerderTest.java      | 289 ++++++++++
 .../standalone/StandaloneHerderTest.java        |  62 ++-
 .../copycat/storage/KafkaConfigStorageTest.java | 508 +++++++++++++++++
 .../storage/KafkaOffsetBackingStoreTest.java    | 399 +++++---------
 .../kafka/copycat/util/KafkaBasedLogTest.java   | 463 ++++++++++++++++
 .../apache/kafka/copycat/util/TestFuture.java   |  80 ++-
 tests/kafkatest/services/copycat.py             |   3 +-
 .../kafkatest/tests/copycat_distributed_test.py |  14 +-
 .../templates/copycat-distributed.properties    |   9 +-
 .../templates/copycat-standalone.properties     |   8 +-
 32 files changed, 3043 insertions(+), 633 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index fa7c92f..aee379a 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -26,9 +26,11 @@ import java.nio.MappedByteBuffer;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.Properties;
@@ -420,6 +422,17 @@ public class Utils {
     }
 
     /**
+     * Converts a Properties object to a Map<String, String>, calling {@link #toString} to ensure all keys and values
+     * are Strings.
+     */
+    public static Map<String, String> propsToStringMap(Properties props) {
+        Map<String, String> result = new HashMap<>();
+        for (Map.Entry<Object, Object> entry : props.entrySet())
+            result.put(entry.getKey().toString(), entry.getValue().toString());
+        return result;
+    }
+
+    /**
      * Get the stack trace from an exception as a string
      */
     public static String stackTrace(Throwable e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/config/copycat-distributed.properties
----------------------------------------------------------------------
diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties
index 654ed24..b122413 100644
--- a/config/copycat-distributed.properties
+++ b/config/copycat-distributed.properties
@@ -27,13 +27,14 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter
 key.converter.schemas.enable=true
 value.converter.schemas.enable=true
 
-# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
-# Offset data is never visible outside of Copcyat.
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+# The internal converter used for offsets and config data is configurable and must be specified, but most users will
+# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
 
 offset.storage.topic=copycat-offsets
 # Flush much faster than normal, which is useful for testing/debugging
 offset.flush.interval.ms=10000
+config.storage.topic=copycat-configs
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/config/copycat-standalone.properties
----------------------------------------------------------------------
diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties
index fd264b5..ebf689f 100644
--- a/config/copycat-standalone.properties
+++ b/config/copycat-standalone.properties
@@ -25,12 +25,12 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter
 key.converter.schemas.enable=true
 value.converter.schemas.enable=true
 
-# The offset converter is configurable and must be specified, but most users will always want to use the built-in default.
-# Offset data is never visible outside of Copcyat.
-offset.key.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.value.converter=org.apache.kafka.copycat.json.JsonConverter
-offset.key.converter.schemas.enable=false
-offset.value.converter.schemas.enable=false
+# The internal converter used for offsets and config data is configurable and must be specified, but most users will
+# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format.
+internal.key.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.value.converter=org.apache.kafka.copycat.json.JsonConverter
+internal.key.converter.schemas.enable=false
+internal.value.converter.schemas.enable=false
 
 offset.storage.file.filename=/tmp/copycat.offsets
 # Flush much faster than normal, which is useful for testing/debugging

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index b5e8896..b0230b2 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -20,9 +20,8 @@ package org.apache.kafka.copycat.cli;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.runtime.Copycat;
-import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.Worker;
-import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
+import org.apache.kafka.copycat.runtime.distributed.DistributedHerder;
 import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.FutureCallback;
@@ -59,7 +58,8 @@ public class CopycatDistributed {
 
         WorkerConfig workerConfig = new WorkerConfig(workerProps);
         Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
-        Herder herder = new StandaloneHerder(worker);
+        DistributedHerder herder = new DistributedHerder(worker);
+        herder.configure(workerConfig.originals());
         final Copycat copycat = new Copycat(worker, herder);
         copycat.start();
 
@@ -73,7 +73,7 @@ public class CopycatDistributed {
                             log.error("Failed to create job for {}", connectorPropsFile);
                     }
                 });
-                herder.addConnector(connectorProps, cb);
+                herder.addConnector(Utils.propsToStringMap(connectorProps), cb);
                 cb.get();
             }
         } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
index 12ec154..65a15e4 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -75,7 +75,7 @@ public class CopycatStandalone {
                             log.error("Failed to create job for {}", connectorPropsFile);
                     }
                 });
-                herder.addConnector(connectorProps, cb);
+                herder.addConnector(Utils.propsToStringMap(connectorProps), cb);
                 cb.get();
             }
         } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
index a976d90..2a3f539 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
@@ -57,13 +57,13 @@ public class WorkerConfig extends AbstractConfig {
     public static final String VALUE_CONVERTER_CLASS_DOC =
             "Converter class for value Copycat data that implements the <code>Converter</code> interface.";
 
-    public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter";
-    public static final String OFFSET_KEY_CONVERTER_CLASS_DOC =
-            "Converter class for offset key Copycat data that implements the <code>Converter</code> interface.";
+    public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
+    public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
+            "Converter class for internal key Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
 
-    public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter";
-    public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC =
-            "Converter class for offset value Copycat data that implements the <code>Converter</code> interface.";
+    public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
+    public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for offset value Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
 
     public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
             = "task.shutdown.graceful.timeout.ms";
@@ -95,10 +95,10 @@ public class WorkerConfig extends AbstractConfig {
                         Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
-                .define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC)
-                .define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC)
+                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
+                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
                 .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
                         TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
                         TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
index 336597e..767c88b 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java
@@ -22,7 +22,8 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 
-import java.util.Properties;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * <p>
@@ -64,10 +65,10 @@ public class ConnectorConfig extends AbstractConfig {
     }
 
     public ConnectorConfig() {
-        this(new Properties());
+        this(new HashMap<String, String>());
     }
 
-    public ConnectorConfig(Properties props) {
+    public ConnectorConfig(Map<String, String> props) {
         super(config, props);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
index 7f8b7c2..31e68ef 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
@@ -19,7 +19,7 @@ package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.copycat.util.Callback;
 
-import java.util.Properties;
+import java.util.Map;
 
 /**
  * <p>
@@ -53,15 +53,24 @@ public interface Herder {
      * the leader herder if necessary.
      *
      * @param connectorProps user-specified properties for this job
-     * @param callback callback to invoke when the request completes
+     * @param callback       callback to invoke when the request completes
      */
-    void addConnector(Properties connectorProps, Callback<String> callback);
+    void addConnector(Map<String, String> connectorProps, Callback<String> callback);
 
     /**
      * Delete a connector job by name.
      *
-     * @param name name of the connector job to shutdown and delete
+     * @param name     name of the connector job to shutdown and delete
      * @param callback callback to invoke when the request completes
      */
     void deleteConnector(String name, Callback<Void> callback);
-}
+
+    /**
+     * Requests reconfiguration of the task. This should only be triggered by
+     * {@link HerderConnectorContext}.
+     *
+     * @param connName name of the connector that should be reconfigured
+     */
+    void requestTaskReconfiguration(String connName);
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
new file mode 100644
index 0000000..7a64bd5
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.connector.ConnectorContext;
+
+/**
+ * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
+ * in a single process.
+ */
+public class HerderConnectorContext implements ConnectorContext {
+
+    private Herder herder;
+    private String connectorName;
+
+    public HerderConnectorContext(Herder herder, String connectorName) {
+        this.herder = herder;
+        this.connectorName = connectorName;
+    }
+
+    @Override
+    public void requestTaskReconfiguration() {
+        // This is trivial to forward since there is only one herder and it's in memory in this
+        // process
+        herder.requestTaskReconfiguration(connectorName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index a34a014..0fdab4c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -52,8 +52,8 @@ public class Worker {
     private WorkerConfig config;
     private Converter keyConverter;
     private Converter valueConverter;
-    private Converter offsetKeyConverter;
-    private Converter offsetValueConverter;
+    private Converter internalKeyConverter;
+    private Converter internalValueConverter;
     private OffsetBackingStore offsetBackingStore;
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
     private KafkaProducer<byte[], byte[]> producer;
@@ -71,10 +71,10 @@ public class Worker {
         this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
         this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
         this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
-        this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
-        this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true);
-        this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
-        this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false);
+        this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
+        this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
 
         this.offsetBackingStore = offsetBackingStore;
         this.offsetBackingStore.configure(config.originals());
@@ -157,9 +157,9 @@ public class Worker {
         if (task instanceof SourceTask) {
             SourceTask sourceTask = (SourceTask) task;
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
-                    offsetKeyConverter, offsetValueConverter);
+                    internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
-                    offsetKeyConverter, offsetValueConverter);
+                    internalKeyConverter, internalValueConverter);
             workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
                     offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {
@@ -201,4 +201,11 @@ public class Worker {
         return task;
     }
 
+    public Converter getInternalKeyConverter() {
+        return internalKeyConverter;
+    }
+
+    public Converter getInternalValueConverter() {
+        return internalValueConverter;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
new file mode 100644
index 0000000..719dd09
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster.
+ */
+public class ClusterConfigState {
+    private final long offset;
+    private final Map<String, Integer> connectorTaskCounts;
+    private final Map<String, Map<String, String>> connectorConfigs;
+    private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
+    private final Set<String> inconsistentConnectors;
+
+    public ClusterConfigState(long offset,
+                              Map<String, Integer> connectorTaskCounts,
+                              Map<String, Map<String, String>> connectorConfigs,
+                              Map<ConnectorTaskId, Map<String, String>> taskConfigs,
+                              Set<String> inconsistentConnectors) {
+        this.offset = offset;
+        this.connectorTaskCounts = connectorTaskCounts;
+        this.connectorConfigs = connectorConfigs;
+        this.taskConfigs = taskConfigs;
+        this.inconsistentConnectors = inconsistentConnectors;
+    }
+
+    /**
+     * Get the last offset read to generate this config state. This offset is not guaranteed to be perfectly consistent
+     * with the recorded state because some partial updates to task configs may have been read.
+     * @return the latest config offset
+     */
+    public long offset() {
+        return offset;
+    }
+
+    /**
+     * Get a list of the connectors in this configuration
+     */
+    public Collection<String> connectors() {
+        return connectorTaskCounts.keySet();
+    }
+
+    /**
+     * Get the configuration for a connector.
+     * @param connector name of the connector
+     * @return a map containing configuration parameters
+     */
+    public Map<String, String> connectorConfig(String connector) {
+        return connectorConfigs.get(connector);
+    }
+
+    /**
+     * Get the configuration for a task.
+     * @param task id of the task
+     * @return a map containing configuration parameters
+     */
+    public Map<String, String> taskConfig(ConnectorTaskId task) {
+        return taskConfigs.get(task);
+    }
+
+    /**
+     * Get the current set of task IDs for the specified connector.
+     * @param connectorName the name of the connector to look up task configs for
+     * @return the current set of connector task IDs
+     */
+    public Collection<ConnectorTaskId> tasks(String connectorName) {
+        if (inconsistentConnectors.contains(connectorName))
+            return Collections.EMPTY_LIST;
+
+        Integer numTasks = connectorTaskCounts.get(connectorName);
+        if (numTasks == null)
+            throw new IllegalArgumentException("Connector does not exist in current configuration.");
+
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
+        for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
+            taskIds.add(taskId);
+        }
+        return taskIds;
+    }
+
+    /**
+     * Get the set of connectors which have inconsistent data in this snapshot. These inconsistencies can occur due to
+     * partially completed writes combined with log compaction.
+     *
+     * Connectors in this set will appear in the output of {@link #connectors()} since their connector configuration is
+     * available, but not in the output of {@link #taskConfig(ConnectorTaskId)} since the task configs are incomplete.
+     *
+     * When a worker detects a connector in this state, it should request that the connector regenerate its task
+     * configurations.
+     *
+     * @return the set of inconsistent connectors
+     */
+    public Set<String> inconsistentConnectors() {
+        return inconsistentConnectors;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
new file mode 100644
index 0000000..5273658
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java
@@ -0,0 +1,320 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Distributed "herder" that coordinates with other workers to spread work across multiple processes.
+ */
+public class DistributedHerder implements Herder {
+    private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
+
+    private Worker worker;
+    private KafkaConfigStorage configStorage;
+    private ClusterConfigState configState;
+    private HashMap<String, ConnectorState> connectors = new HashMap<>();
+
+    public DistributedHerder(Worker worker) {
+        this.worker = worker;
+        this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(),
+                new ConnectorConfigCallback(), new TaskConfigCallback());
+    }
+
+    // Public for testing (mock KafkaConfigStorage)
+    public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) {
+        this.worker = worker;
+        this.configStorage = configStorage;
+    }
+
+    public synchronized void configure(Map<String, ?> configs) {
+        configStorage.configure(configs);
+    }
+
+    public synchronized void start() {
+        log.info("Herder starting");
+
+        configStorage.start();
+
+        log.info("Restoring connectors from stored configs");
+        restoreConnectors();
+
+        log.info("Herder started");
+    }
+
+    public synchronized void stop() {
+        log.info("Herder stopping");
+
+        // There's no coordination/hand-off to do here since this is all standalone. Instead, we
+        // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
+        // the tasks.
+        for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
+            ConnectorState state = entry.getValue();
+            stopConnector(state);
+        }
+        connectors.clear();
+
+        if (configStorage != null) {
+            configStorage.stop();
+            configStorage = null;
+        }
+
+        log.info("Herder stopped");
+    }
+
+    @Override
+    public synchronized void addConnector(Map<String, String> connectorProps,
+                                          Callback<String> callback) {
+        try {
+            // Ensure the config is written to storage first
+            ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+            String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+            configStorage.putConnectorConfig(connName, connectorProps);
+
+            ConnectorState connState = createConnector(connConfig);
+            if (callback != null)
+                callback.onCompletion(null, connState.name);
+            // This should always be a new job, create jobs from scratch
+            createConnectorTasks(connState);
+        } catch (CopycatException e) {
+            if (callback != null)
+                callback.onCompletion(e, null);
+        }
+    }
+
+    @Override
+    public synchronized void deleteConnector(String name, Callback<Void> callback) {
+        try {
+            destroyConnector(name);
+            if (callback != null)
+                callback.onCompletion(null, null);
+        } catch (CopycatException e) {
+            if (callback != null)
+                callback.onCompletion(e, null);
+        }
+    }
+
+    @Override
+    public synchronized void requestTaskReconfiguration(String connName) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            log.error("Task that requested reconfiguration does not exist: {}", connName);
+            return;
+        }
+        updateConnectorTasks(state);
+    }
+
+    // Creates and configures the connector. Does not setup any tasks
+    private ConnectorState createConnector(ConnectorConfig connConfig) {
+        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+        String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+        log.info("Creating connector {} of type {}", connName, className);
+        int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+        List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
+        Properties configs = connConfig.unusedProperties();
+
+        if (connectors.containsKey(connName)) {
+            log.error("Ignoring request to create connector due to conflicting connector name");
+            throw new CopycatException("Connector with name " + connName + " already exists");
+        }
+
+        final Connector connector;
+        try {
+            connector = instantiateConnector(className);
+        } catch (Throwable t) {
+            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+            // may be caused by user code
+            throw new CopycatException("Failed to create connector instance", t);
+        }
+        connector.initialize(new HerderConnectorContext(this, connName));
+        try {
+            connector.start(configs);
+        } catch (CopycatException e) {
+            throw new CopycatException("Connector threw an exception while starting", e);
+        }
+        ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
+        connectors.put(connName, state);
+
+        log.info("Finished creating connector {}", connName);
+
+        return state;
+    }
+
+    private static Connector instantiateConnector(String className) {
+        try {
+            return Utils.newInstance(className, Connector.class);
+        } catch (ClassNotFoundException e) {
+            throw new CopycatException("Couldn't instantiate connector class", e);
+        }
+    }
+
+    private void destroyConnector(String connName) {
+        log.info("Destroying connector {}", connName);
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            log.error("Failed to destroy connector {} because it does not exist", connName);
+            throw new CopycatException("Connector does not exist");
+        }
+
+        stopConnector(state);
+        configStorage.putConnectorConfig(state.name, null);
+        connectors.remove(state.name);
+
+        log.info("Finished destroying connector {}", connName);
+    }
+
+    // Stops a connectors tasks, then the connector
+    private void stopConnector(ConnectorState state) {
+        removeConnectorTasks(state);
+        try {
+            state.connector.stop();
+        } catch (CopycatException e) {
+            log.error("Error shutting down connector {}: ", state.connector, e);
+        }
+    }
+
+    private void createConnectorTasks(ConnectorState state) {
+        String taskClassName = state.connector.taskClass().getName();
+
+        log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
+
+        List<Properties> taskConfigs = state.connector.taskConfigs(state.maxTasks);
+
+        // Generate the final configs, including framework provided settings
+        Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
+        for (int i = 0; i < taskConfigs.size(); i++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+            Properties config = taskConfigs.get(i);
+            // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
+            // is automatically provided to tasks since it is required by the framework, but this
+            String subscriptionTopics = Utils.join(state.inputTopics, ",");
+            if (state.connector instanceof SinkConnector) {
+                // Make sure we don't modify the original since the connector may reuse it internally
+                Properties configForSink = new Properties();
+                configForSink.putAll(config);
+                configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
+                config = configForSink;
+            }
+            taskProps.put(taskId, config);
+        }
+
+        // And initiate the tasks
+        for (int i = 0; i < taskConfigs.size(); i++) {
+            ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+            Properties config = taskProps.get(taskId);
+            try {
+                worker.addTask(taskId, taskClassName, config);
+                // We only need to store the task IDs so we can clean up.
+                state.tasks.add(taskId);
+            } catch (Throwable e) {
+                log.error("Failed to add task {}: ", taskId, e);
+                // Swallow this so we can continue updating the rest of the tasks
+                // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
+                // that died after starting successfully.
+            }
+        }
+    }
+
+    private void removeConnectorTasks(ConnectorState state) {
+        Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
+        while (taskIter.hasNext()) {
+            ConnectorTaskId taskId = taskIter.next();
+            try {
+                worker.stopTask(taskId);
+                taskIter.remove();
+            } catch (CopycatException e) {
+                log.error("Failed to stop task {}: ", taskId, e);
+                // Swallow this so we can continue stopping the rest of the tasks
+                // FIXME: Forcibly kill the task?
+            }
+        }
+    }
+
+    private void updateConnectorTasks(ConnectorState state) {
+        removeConnectorTasks(state);
+        createConnectorTasks(state);
+    }
+
+    private void restoreConnectors() {
+        configState = configStorage.snapshot();
+        Collection<String> connNames = configState.connectors();
+        for (String connName : connNames) {
+            log.info("Restoring connector {}", connName);
+            Map<String, String> connProps = configState.connectorConfig(connName);
+            ConnectorConfig connConfig = new ConnectorConfig(connProps);
+            ConnectorState connState = createConnector(connConfig);
+            // Because this coordinator is standalone, connectors are only restored when this process
+            // starts and we know there can't be any existing tasks. So in this special case we're able
+            // to just create the tasks rather than having to check for existing tasks and sort out
+            // whether they need to be reconfigured.
+            createConnectorTasks(connState);
+        }
+    }
+
+
+
+    private static class ConnectorState {
+        public String name;
+        public Connector connector;
+        public int maxTasks;
+        public List<String> inputTopics;
+        Set<ConnectorTaskId> tasks;
+
+        public ConnectorState(String name, Connector connector, int maxTasks,
+                              List<String> inputTopics) {
+            this.name = name;
+            this.connector = connector;
+            this.maxTasks = maxTasks;
+            this.inputTopics = inputTopics;
+            this.tasks = new HashSet<>();
+        }
+    }
+
+    private class ConnectorConfigCallback implements Callback<String> {
+        @Override
+        public void onCompletion(Throwable error, String result) {
+            configState = configStorage.snapshot();
+            // FIXME
+        }
+    }
+
+    private class TaskConfigCallback implements Callback<List<ConnectorTaskId>> {
+        @Override
+        public void onCompletion(Throwable error, List<ConnectorTaskId> result) {
+            configState = configStorage.snapshot();
+            // FIXME
+        }
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
deleted file mode 100644
index 0e14015..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.runtime.standalone;
-
-import org.apache.kafka.copycat.connector.ConnectorContext;
-
-/**
- * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
- * in a single process.
- */
-class StandaloneConnectorContext implements ConnectorContext {
-
-    private StandaloneHerder herder;
-    private String connectorName;
-
-    public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) {
-        this.herder = herder;
-        this.connectorName = connectorName;
-    }
-
-    @Override
-    public void requestTaskReconfiguration() {
-        // This is trivial to forward since there is only one herder and it's in memory in this
-        // process
-        herder.requestTaskReconfiguration(connectorName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
index 45d428d..d5670fd 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -22,6 +22,7 @@ import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -65,7 +66,7 @@ public class StandaloneHerder implements Herder {
     }
 
     @Override
-    public synchronized void addConnector(Properties connectorProps,
+    public synchronized void addConnector(Map<String, String> connectorProps,
                                           Callback<String> callback) {
         try {
             ConnectorState connState = createConnector(connectorProps);
@@ -91,8 +92,18 @@ public class StandaloneHerder implements Herder {
         }
     }
 
-    // Creates the and configures the connector. Does not setup any tasks
-    private ConnectorState createConnector(Properties connectorProps) {
+    @Override
+    public synchronized void requestTaskReconfiguration(String connName) {
+        ConnectorState state = connectors.get(connName);
+        if (state == null) {
+            log.error("Task that requested reconfiguration does not exist: {}", connName);
+            return;
+        }
+        updateConnectorTasks(state);
+    }
+
+    // Creates and configures the connector. Does not setup any tasks
+    private ConnectorState createConnector(Map<String, String> connectorProps) {
         ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
@@ -114,7 +125,7 @@ public class StandaloneHerder implements Herder {
             // may be caused by user code
             throw new CopycatException("Failed to create connector instance", t);
         }
-        connector.initialize(new StandaloneConnectorContext(this, connName));
+        connector.initialize(new HerderConnectorContext(this, connName));
         try {
             connector.start(configs);
         } catch (CopycatException e) {
@@ -222,21 +233,6 @@ public class StandaloneHerder implements Herder {
         createConnectorTasks(state);
     }
 
-    /**
-     * Requests reconfiguration of the task. This should only be triggered by
-     * {@link StandaloneConnectorContext}.
-     *
-     * @param connName name of the connector that should be reconfigured
-     */
-    public synchronized void requestTaskReconfiguration(String connName) {
-        ConnectorState state = connectors.get(connName);
-        if (state == null) {
-            log.error("Task that requested reconfiguration does not exist: {}", connName);
-            return;
-        }
-        updateConnectorTasks(state);
-    }
-
 
     private static class ConnectorState {
         public String name;

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
new file mode 100644
index 0000000..366bf13
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java
@@ -0,0 +1,546 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.SchemaBuilder;
+import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.errors.DataException;
+import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * <p>
+ * Provides persistent storage of Copycat connector configurations in a Kafka topic.
+ * </p>
+ * <p>
+ * This class manages both connector and task configurations. It tracks three types of configuration entries:
+ * <p/>
+ * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for
+ * expanding this format if necessary. (Kafka key: connector-[connector-id]).
+ * These configs are *not* ephemeral. They represent the source of truth. If the entire Copycat
+ * cluster goes down, this is all that is really needed to recover.
+ * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding
+ * this format if necessary. (Kafka key: task-[connector-id]-[task-id]).
+ * These configs are ephemeral; they are stored here to a) disseminate them to all workers while
+ * ensuring agreement and b) to allow faster cluster/worker recovery since the common case
+ * of recovery (restoring a connector) will simply result in the same configuration as before
+ * the failure.
+ * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task
+ * configs for a connector can be applied. (Kafka key: commit-[connector-id].
+ * This config has two effects. First, it records the number of tasks the connector is currently
+ * running (and can therefore increase/decrease parallelism). Second, because each task config
+ * is stored separately but they need to be applied together to ensure each partition is assigned
+ * to a single task, this record also indicates that task configs for the specified connector
+ * can be "applied" or "committed".
+ * </p>
+ * <p>
+ * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
+ * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
+ * us to clean up outdated configurations over time. However, this combination has some important implications for
+ * the implementation of this class and the configuration state that it may expose.
+ * </p>
+ * <p>
+ * Connector configurations are independent of all other configs, so they are handled easily. Writing a single record
+ * is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any
+ * others, and they do not need to coordinate with the connector's task configuration at all.
+ * </p>
+ * <p>
+ * The most obvious implication for task configs is the need for the commit messages. Because Kafka does not
+ * currently have multi-record transactions or support atomic batch record writes, task commit messages are required
+ * to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs
+ * for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs
+ * were applied immediately you could be using half the old configs and half the new configs. In that condition, some
+ * partitions may be double-assigned because the old config and new config may use completely different assignments.
+ * Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them
+ * once a commit message has been read.
+ * </p>
+ * <p>
+ * However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was
+ * always available, but we would like to be able to enable compaction so our configuration topic does not grow
+ * indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading
+ * from the beginning of the log in order to build up the full current configuration will see task commits, but some
+ * records required for those commits will have been removed because the same keys have subsequently been rewritten.
+ * For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config,
+ * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read
+ * back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
+ * </p>
+ * <p>
+ * Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario
+ * as the previous one, but in this case both the first and second update will write 2 task configs. However, the
+ * second write fails half of the way through:
+ * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction
+ * occurs and we're left with
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't
+ * have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent
+ * state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never
+ * recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second
+ * task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task
+ * configs for connector "foo".
+ * </p>
+ * <p>
+ * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
+ * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
+ * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
+ * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
+ * of updating task configurations).
+ * </p>
+ * <p>
+ * Note that the expectation is that this config storage system has only a single writer at a time.
+ * The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change
+ * requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
+ * </p>
+ * <p>
+ * Since processing of the config log occurs in a background thread, callers must take care when using accessors.
+ * To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster.
+ * Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are
+ * using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require
+ * synchronization across workers to commit offsets and update the configuration, callbacks and updates during the
+ * rebalance must be deferred.
+ * </p>
+ */
+public class KafkaConfigStorage {
+    private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class);
+
+    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
+
+    public static final String CONNECTOR_PREFIX = "connector-";
+
+    public static String CONNECTOR_KEY(String connectorName) {
+        return CONNECTOR_PREFIX + connectorName;
+    }
+
+    public static final String TASK_PREFIX = "task-";
+
+    public static String TASK_KEY(ConnectorTaskId taskId) {
+        return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
+    }
+
+    public static final String COMMIT_TASKS_PREFIX = "commit-";
+
+    public static String COMMIT_TASKS_KEY(String connectorName) {
+        return COMMIT_TASKS_PREFIX + connectorName;
+    }
+
+    // Note that while using real serialization for values as we have here, but ad hoc string serialization for keys,
+    // isn't ideal, we use this approach because it avoids any potential problems with schema evolution or
+    // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely
+    // the same.
+    public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
+            .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA))
+            .build();
+    public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
+    public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()
+            .field("tasks", Schema.INT32_SCHEMA)
+            .build();
+
+    private static final long READ_TO_END_TIMEOUT_MS = 30000;
+
+    private final Object lock;
+    private boolean starting;
+    private final Converter converter;
+    private final Callback<String> connectorConfigCallback;
+    private final Callback<List<ConnectorTaskId>> tasksConfigCallback;
+    private String topic;
+    // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Copycat
+    // format to serialized form
+    private KafkaBasedLog<String, byte[]> configLog;
+    // Connector -> # of tasks
+    private Map<String, Integer> connectorTaskCounts = new HashMap<>();
+    // Connector and task configs: name or id -> config map
+    private Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+    private Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+    // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
+    // is in an inconsistent state and we cannot safely use them until they have been refreshed.
+    private Set<String> inconsistent = new HashSet<>();
+    // The most recently read offset. This does not take into account deferred task updates/commits, so we may have
+    // outstanding data to be applied.
+    private long offset;
+
+    // Connector -> Map[ConnectorTaskId -> Configs]
+    private Map<String, Map<ConnectorTaskId, Map<String, String>>> deferredTaskUpdates = new HashMap<>();
+
+
+    public KafkaConfigStorage(Converter converter, Callback<String> connectorConfigCallback, Callback<List<ConnectorTaskId>> tasksConfigCallback) {
+        this.lock = new Object();
+        this.starting = false;
+        this.converter = converter;
+        this.connectorConfigCallback = connectorConfigCallback;
+        this.tasksConfigCallback = tasksConfigCallback;
+
+        offset = -1;
+    }
+
+    public void configure(Map<String, ?> configs) {
+        if (configs.get(CONFIG_TOPIC_CONFIG) == null)
+            throw new CopycatException("Must specify topic for Copycat connector configuration.");
+        topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
+    }
+
+    public void start() {
+        log.info("Starting KafkaConfigStorage");
+        // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
+        // updates can continue to occur in the background
+        starting = true;
+        configLog.start();
+        starting = false;
+        log.info("Started KafkaConfigStorage");
+    }
+
+    public void stop() {
+        log.info("Closing KafkaConfigStorage");
+        configLog.stop();
+        log.info("Closed KafkaConfigStorage");
+    }
+
+    /**
+     * Get a snapshot of the current state of the cluster.
+     */
+    public ClusterConfigState snapshot() {
+        synchronized (lock) {
+            // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
+            // immutable configs
+            return new ClusterConfigState(
+                    offset,
+                    new HashMap<>(connectorTaskCounts),
+                    new HashMap<>(connectorConfigs),
+                    new HashMap<>(taskConfigs),
+                    new HashSet<>(inconsistent)
+            );
+        }
+    }
+
+    /**
+     * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
+     * tailing the Kafka log with a consumer.
+     *
+     * @param connector  name of the connector to write data for
+     * @param properties the configuration to write
+     */
+    public void putConnectorConfig(String connector, Map<String, String> properties) {
+        Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+        copycatConfig.put("properties", properties);
+        byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig);
+
+        try {
+            configLog.send(CONNECTOR_KEY(connector), serializedConfig);
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Failed to write connector configuration to Kafka: ", e);
+            throw new CopycatException("Error writing connector configuration to Kafka", e);
+        }
+    }
+
+    /**
+     * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
+     * that we would be leaving one of the referenced connectors with an inconsistent state.
+     *
+     * @param configs map containing task configurations
+     * @throws CopycatException if the task configurations do not resolve inconsistencies found in the existing root
+     *                          and task configurations.
+     */
+    public void putTaskConfigs(Map<ConnectorTaskId, Map<String, String>> configs) {
+        // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
+        // any outstanding lagging data to consume.
+        try {
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Failed to write root configuration to Kafka: ", e);
+            throw new CopycatException("Error writing root configuration to Kafka", e);
+        }
+
+        // In theory, there is only a single writer and we shouldn't need this lock since the background thread should
+        // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
+        // the root config being updated.
+        Map<String, Integer> newTaskCounts = new HashMap<>();
+        synchronized (lock) {
+            // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
+            // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
+            Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(configs);
+            for (Map.Entry<String, Set<Integer>> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
+                if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
+                    log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
+                    throw new CopycatException("Error writing task configurations: found some connectors with invalid connectors");
+                }
+                newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
+            }
+        }
+
+        // Start sending all the individual updates
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
+            Struct copycatConfig = new Struct(TASK_CONFIGURATION_V0);
+            copycatConfig.put("properties", taskConfigEntry.getValue());
+            byte[] serializedConfig = converter.fromCopycatData(topic, TASK_CONFIGURATION_V0, copycatConfig);
+            configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
+        }
+
+        // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
+        // the end of the log
+        try {
+            // Read to end to ensure all the task configs have been written
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+            // Write all the commit messages
+            for (Map.Entry<String, Integer> taskCountEntry : newTaskCounts.entrySet()) {
+                Struct copycatConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
+                copycatConfig.put("tasks", taskCountEntry.getValue());
+                byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_TASKS_COMMIT_V0, copycatConfig);
+                configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
+            }
+
+            // Read to end to ensure all the commit messages have been written
+            configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException e) {
+            log.error("Failed to write root configuration to Kafka: ", e);
+            throw new CopycatException("Error writing root configuration to Kafka", e);
+        }
+    }
+
+    private KafkaBasedLog<String, byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
+                                                              Map<String, Object> consumerProps, Callback<ConsumerRecord<String, byte[]>> consumedCallback) {
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+    }
+
+    private final Callback<ConsumerRecord<String, byte[]>> consumedCallback = new Callback<ConsumerRecord<String, byte[]>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
+            if (error != null) {
+                log.error("Unexpected in consumer callback for KafkaConfigStorage: ", error);
+                return;
+            }
+
+            final SchemaAndValue value;
+            try {
+                value = converter.toCopycatData(topic, record.value());
+            } catch (DataException e) {
+                log.error("Failed to convert config data to Copycat format: ", e);
+                return;
+            }
+            offset = record.offset();
+
+            if (record.key().startsWith(CONNECTOR_PREFIX)) {
+                String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
+                synchronized (lock) {
+                    // Connector configs can be applied and callbacks invoked immediately
+                    if (!(value.value() instanceof Map)) {
+                        log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
+                        return;
+                    }
+                    Object newConnectorConfig = ((Map<String, Object>) value.value()).get("properties");
+                    if (!(newConnectorConfig instanceof Map)) {
+                        log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
+                        return;
+                    }
+                    connectorConfigs.put(connectorName, (Map<String, String>) newConnectorConfig);
+                }
+                if (!starting)
+                    connectorConfigCallback.onCompletion(null, connectorName);
+            } else if (record.key().startsWith(TASK_PREFIX)) {
+                synchronized (lock) {
+                    ConnectorTaskId taskId = parseTaskId(record.key());
+                    if (taskId == null) {
+                        log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
+                        return;
+                    }
+                    if (!(value.value() instanceof Map)) {
+                        log.error("Ignoring task configuration because it is in the wrong format: " + value.value());
+                        return;
+                    }
+
+                    Object newTaskConfig = ((Map<String, Object>) value.value()).get("properties");
+                    if (!(newTaskConfig instanceof Map)) {
+                        log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass());
+                        return;
+                    }
+
+                    Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(taskId.connector());
+                    if (deferred == null) {
+                        deferred = new HashMap<>();
+                        deferredTaskUpdates.put(taskId.connector(), deferred);
+                    }
+                    deferred.put(taskId, (Map<String, String>) newTaskConfig);
+                }
+            } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
+                String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
+                List<ConnectorTaskId> updatedTasks = new ArrayList<>();
+                synchronized (lock) {
+                    // Apply any outstanding deferred task updates for the given connector. Note that just because we
+                    // encounter a commit message does not mean it will result in consistent output. In particular due to
+                    // compaction, there may be cases where . For example if we have the following sequence of writes:
+                    //
+                    // 1. Write connector "foo"'s config
+                    // 2. Write connector "foo", task 1's config <-- compacted
+                    // 3. Write connector "foo", task 2's config
+                    // 4. Write connector "foo" task commit message
+                    // 5. Write connector "foo", task 1's config
+                    // 6. Write connector "foo", task 2's config
+                    // 7. Write connector "foo" task commit message
+                    //
+                    // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
+                    // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
+                    // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
+                    // only 5 was written, then there may be nothing that will finish writing the configs and get the
+                    // log back into a consistent state.
+                    //
+                    // It is expected that the user of this class (i.e., the Herder) will take the necessary action to
+                    // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
+                    // exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
+                    if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
+                        log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value());
+                        return;
+                    }
+
+                    Map<ConnectorTaskId, Map<String, String>> deferred = deferredTaskUpdates.get(connectorName);
+
+                    Object newTaskCountObj = ((Map<String, Object>) value.value()).get("tasks");
+                    Integer newTaskCount = (Integer) newTaskCountObj;
+
+                    // Validate the configs we're supposed to update to ensure we're getting a complete configuration
+                    // update of all tasks that are expected based on the number of tasks in the commit message.
+                    Map<String, Set<Integer>> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
+                    Set<Integer> taskIdSet = updatedConfigIdsByConnector.get(connectorName);
+                    if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
+                        // Given the logic for writing commit messages, we should only hit this condition due to compacted
+                        // historical data, in which case we would not have applied any updates yet and there will be no
+                        // task config data already committed for the connector, so we shouldn't have to clear any data
+                        // out. All we need to do is add the flag marking it inconsistent.
+                        inconsistent.add(connectorName);
+                    } else {
+                        if (deferred != null) {
+                            taskConfigs.putAll(deferred);
+                            updatedTasks.addAll(taskConfigs.keySet());
+                        }
+                        inconsistent.remove(connectorName);
+                    }
+                    // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
+                    // update, then we need to see a completely fresh set of configs after this commit message, so we don't
+                    // want any of these outdated configs
+                    if (deferred != null)
+                        deferred.clear();
+
+                    connectorTaskCounts.put(connectorName, newTaskCount);
+                }
+
+                if (!starting)
+                    tasksConfigCallback.onCompletion(null, updatedTasks);
+            } else {
+                log.error("Discarding config update record with invalid key: " + record.key());
+            }
+        }
+    };
+
+    private ConnectorTaskId parseTaskId(String key) {
+        String[] parts = key.split("-");
+        if (parts.length < 3) return null;
+
+        try {
+            int taskNum = Integer.parseInt(parts[parts.length - 1]);
+            String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
+            return new ConnectorTaskId(connectorName, taskNum);
+        } catch (NumberFormatException e) {
+            return null;
+        }
+    }
+
+    /**
+     * Given task configurations, get a set of integer task IDs organized by connector name.
+     */
+    private Map<String, Set<Integer>> taskIdsByConnector(Map<ConnectorTaskId, Map<String, String>> configs) {
+        Map<String, Set<Integer>> connectorTaskIds = new HashMap<>();
+        if (configs == null)
+            return connectorTaskIds;
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : configs.entrySet()) {
+            ConnectorTaskId taskId = taskConfigEntry.getKey();
+            if (!connectorTaskIds.containsKey(taskId.connector()))
+                connectorTaskIds.put(taskId.connector(), new TreeSet<Integer>());
+            connectorTaskIds.get(taskId.connector()).add(taskId.task());
+        }
+        return connectorTaskIds;
+    }
+
+    private boolean completeTaskIdSet(Set<Integer> idSet, int expectedSize) {
+        // Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out
+        // with 2 tasks, then reduce to 1, we'll end up with log entries like:
+        //
+        // 1. Connector "foo" config
+        // 2. Connector "foo", task 1 config
+        // 3. Connector "foo", task 2 config
+        // 4. Connector "foo", commit 2 tasks
+        // 5. Connector "foo", task 1 config
+        // 6. Connector "foo", commit 1 tasks
+        //
+        // However, due to compaction we could end up with a log that looks like this:
+        //
+        // 1. Connector "foo" config
+        // 3. Connector "foo", task 2 config
+        // 5. Connector "foo", task 1 config
+        // 6. Connector "foo", commit 1 tasks
+        //
+        // which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just
+        // validate that all the configs specified by the commit message are present. This should be fine because the
+        // logic for writing configs ensures all the task configs are written (and reads them back) before writing the
+        // commit message.
+
+        if (idSet.size() < expectedSize)
+            return false;
+
+        for (int i = 0; i < expectedSize; i++)
+            if (!idSet.contains(i))
+                return false;
+        return true;
+    }
+}
+