You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/13 19:23:43 UTC

[2/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of Copycat configs.

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
index a74b39c..b5af1fe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -17,39 +17,22 @@
 
 package org.apache.kafka.copycat.storage;
 
-import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.util.Callback;
 import org.apache.kafka.copycat.util.ConvertingFutureCallback;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -70,99 +53,42 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
     public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
 
-    private final static long CREATE_TOPIC_TIMEOUT_MS = 30000;
-
-    private Time time;
-    private Map<String, ?> configs;
-    private String topic;
-    private Consumer<byte[], byte[]> consumer;
-    private Producer<byte[], byte[]> producer;
+    private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
 
-    private Thread thread;
-    private boolean stopRequested;
-    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-
-    public KafkaOffsetBackingStore() {
-        this(new SystemTime());
-    }
-
-    public KafkaOffsetBackingStore(Time time) {
-        this.time = time;
-    }
-
     @Override
     public void configure(Map<String, ?> configs) {
-        this.configs = configs;
-        topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
+        String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
         if (topic == null)
             throw new CopycatException("Offset storage topic must be specified");
 
         data = new HashMap<>();
-        stopRequested = false;
-        readLogEndOffsetCallbacks = new ArrayDeque<>();
-    }
-
-    @Override
-    public void start() {
-        log.info("Starting KafkaOffsetBackingStore with topic " + topic);
-
-        producer = createProducer();
-        consumer = createConsumer();
-        List<TopicPartition> partitions = new ArrayList<>();
-
-        // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
-        // we rely on topic auto-creation
-        List<PartitionInfo> partitionInfos = null;
-        long started = time.milliseconds();
-        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
-            partitionInfos = consumer.partitionsFor(topic);
-            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
-        }
-        if (partitionInfos == null)
-            throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
-                    " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
-                    " this is your first use of the topic it may have taken too long to create.");
 
-        for (PartitionInfo partition : partitionInfos)
-            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
-        consumer.assign(partitions);
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
 
-        readToLogEnd();
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
 
-        thread = new WorkThread();
-        thread.start();
+        offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
+    }
 
+    @Override
+    public void start() {
+        log.info("Starting KafkaOffsetBackingStore");
+        offsetLog.start();
         log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
     }
 
     @Override
     public void stop() {
         log.info("Stopping KafkaOffsetBackingStore");
-
-        synchronized (this) {
-            stopRequested = true;
-            consumer.wakeup();
-        }
-
-        try {
-            thread.join();
-        } catch (InterruptedException e) {
-            throw new CopycatException("Failed to stop KafkaOffsetBackingStore. Exiting without cleanly shutting " +
-                    "down it's producer and consumer.", e);
-        }
-
-        try {
-            producer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to close KafkaOffsetBackingStore producer", e);
-        }
-
-        try {
-            consumer.close();
-        } catch (KafkaException e) {
-            log.error("Failed to close KafkaOffsetBackingStore consumer", e);
-        }
+        offsetLog.stop();
+        log.info("Stopped KafkaOffsetBackingStore");
     }
 
     @Override
@@ -172,15 +98,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
             @Override
             public Map<ByteBuffer, ByteBuffer> convert(Void result) {
                 Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
-                synchronized (KafkaOffsetBackingStore.this) {
-                    for (ByteBuffer key : keys)
-                        values.put(key, data.get(key));
-                }
+                for (ByteBuffer key : keys)
+                    values.put(key, data.get(key));
                 return values;
             }
         };
-        readLogEndOffsetCallbacks.add(future);
-        consumer.wakeup();
+        // This operation may be relatively (but not too) expensive since it always requires checking end offsets, even
+        // if we've already read up to the end. However, it also should not be common (offsets should only be read when
+        // resetting a task). Always requiring that we read to the end is simpler than trying to differentiate when it
+        // is safe not to (which should only be if we *know* we've maintained ownership since the last write).
+        offsetLog.readToEnd(future);
         return future;
     }
 
@@ -188,95 +115,26 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
         SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
 
-        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
-            producer.send(new ProducerRecord<>(topic, entry.getKey().array(), entry.getValue().array()), producerCallback);
-        }
+        for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+            offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback);
 
         return producerCallback;
     }
 
-
-
-    private Producer<byte[], byte[]> createProducer() {
-        Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
-        return new KafkaProducer<>(producerProps);
-    }
-
-    private Consumer<byte[], byte[]> createConsumer() {
-        Map<String, Object> consumerConfig = new HashMap<>();
-        consumerConfig.putAll(configs);
-        consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
-        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        return new KafkaConsumer<>(consumerConfig);
-    }
-
-    private void poll(long timeoutMs) {
-        try {
-            ConsumerRecords<byte[], byte[]> records = consumer.poll(timeoutMs);
-            for (ConsumerRecord record : records) {
-                ByteBuffer key = record.key() != null ? ByteBuffer.wrap((byte[]) record.key()) : null;
-                ByteBuffer value = record.value() != null ? ByteBuffer.wrap((byte[]) record.value()) : null;
-                data.put(key, value);
-            }
-        } catch (ConsumerWakeupException e) {
-            // Expected on get() or stop(). The calling code should handle this
-            throw e;
-        } catch (KafkaException e) {
-            log.error("Error polling: " + e);
-        }
-    }
-
-    private void readToLogEnd() {
-        log.trace("Reading to end of offset log");
-
-        Set<TopicPartition> assignment = consumer.assignment();
-
-        // This approach to getting the current end offset is hacky until we have an API for looking these up directly
-        Map<TopicPartition, Long> offsets = new HashMap<>();
-        for (TopicPartition tp : assignment) {
-            long offset = consumer.position(tp);
-            offsets.put(tp, offset);
-            consumer.seekToEnd(tp);
-        }
-
-        Map<TopicPartition, Long> endOffsets = new HashMap<>();
-        try {
-            poll(0);
-        } finally {
-            // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
-            // the consumers position is reset or it'll get into an inconsistent state.
-            for (TopicPartition tp : assignment) {
-                long startOffset = offsets.get(tp);
-                long endOffset = consumer.position(tp);
-                if (endOffset > startOffset) {
-                    endOffsets.put(tp, endOffset);
-                    consumer.seek(tp, startOffset);
-                }
-                log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
-            }
+    private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
+        @Override
+        public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
+            ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
+            ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+            data.put(key, value);
         }
+    };
 
-        while (!endOffsets.isEmpty()) {
-            poll(Integer.MAX_VALUE);
-
-            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
-            while (it.hasNext()) {
-                Map.Entry<TopicPartition, Long> entry = it.next();
-                if (consumer.position(entry.getKey()) >= entry.getValue())
-                    it.remove();
-                else
-                    break;
-            }
-        }
+    private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
+                                                              Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
+        return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
     }
 
-
     private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
         private int numLeft;
         private boolean completed = false;
@@ -349,45 +207,5 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
         }
     }
 
-    private class WorkThread extends Thread {
-        @Override
-        public void run() {
-            try {
-                while (true) {
-                    int numCallbacks;
-                    synchronized (KafkaOffsetBackingStore.this) {
-                        if (stopRequested)
-                            break;
-                        numCallbacks = readLogEndOffsetCallbacks.size();
-                    }
-
-                    if (numCallbacks > 0) {
-                        try {
-                            readToLogEnd();
-                        } catch (ConsumerWakeupException e) {
-                            // Either received another get() call and need to retry reading to end of log or stop() was
-                            // called. Both are handled by restarting this loop.
-                            continue;
-                        }
-                    }
 
-                    synchronized (KafkaOffsetBackingStore.this) {
-                        for (int i = 0; i < numCallbacks; i++) {
-                            Callback<Void> cb = readLogEndOffsetCallbacks.poll();
-                            cb.onCompletion(null, null);
-                        }
-                    }
-
-                    try {
-                        poll(Integer.MAX_VALUE);
-                    } catch (ConsumerWakeupException e) {
-                        // See previous comment, both possible causes of this wakeup are handled by starting this loop again
-                        continue;
-                    }
-                }
-            } catch (Throwable t) {
-                log.error("Unexpected exception in KafkaOffsetBackingStore's work thread", t);
-            }
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index dbb3d0d..84229a5 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -92,7 +92,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
                     continue;
                 }
                 Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
-                SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
+                SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue() != null ? rawEntry.getValue().array() : null);
                 Object deserializedValue = deserializedSchemaAndValue.value();
                 OffsetUtils.validateFormat(deserializedValue);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
index bd3a87b..8d78a57 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -25,6 +25,9 @@ import java.util.Map;
 
 public class OffsetUtils {
     public static void validateFormat(Object offsetData) {
+        if (offsetData == null)
+            return;
+
         if (!(offsetData instanceof Map))
             throw new DataException("Offsets must be specified as a Map");
         validateFormat((Map<Object, Object>) offsetData);

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
new file mode 100644
index 0000000..5e860d9
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ *     KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all
+ *     clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
+ * </p>
+ * <p>
+ *     This functionality is useful for storing different types of data that all clients may need to agree on --
+ *     offsets or config for example. This class runs a consumer in a background thread to continuously tail the target
+ *     topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful
+ *     utilities like checking the current log end offset and waiting until the current end of the log is reached.
+ * </p>
+ * <p>
+ *     To support different use cases, this class works with either single- or multi-partition topics.
+ * </p>
+ * <p>
+ *     Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
+ *     record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
+ *     calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
+ *     and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
+ * </p>
+ */
+public class KafkaBasedLog<K, V> {
+    private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
+    private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+
+    private Time time;
+    private final String topic;
+    private final Map<String, Object> producerConfigs;
+    private final Map<String, Object> consumerConfigs;
+    private final Callback<ConsumerRecord<K, V>> consumedCallback;
+    private Consumer<K, V> consumer;
+    private Producer<K, V> producer;
+
+    private Thread thread;
+    private boolean stopRequested;
+    private Queue<Callback<Void>> readLogEndOffsetCallbacks;
+
+    /**
+     * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
+     * {@link #start()} is invoked.
+     *
+     * @param topic the topic to treat as a log
+     * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
+     *                        contain compatible serializer settings for the generic types used on this class. Some
+     *                        setting, such as the number of acks, will be overridden to ensure correct behavior of this
+     *                        class.
+     * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
+     *                        contain compatible serializer settings for the generic types used on this class. Some
+     *                        setting, such as the auto offset reset policy, will be overridden to ensure correct
+     *                        behavior of this class.
+     * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
+     * @param time Time interface
+     */
+    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
+                         Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+        this.topic = topic;
+        this.producerConfigs = producerConfigs;
+        this.consumerConfigs = consumerConfigs;
+        this.consumedCallback = consumedCallback;
+        this.stopRequested = false;
+        this.readLogEndOffsetCallbacks = new ArrayDeque<>();
+        this.time = time;
+    }
+
+    public void start() {
+        log.info("Starting KafkaBasedLog with topic " + topic);
+
+        producer = createProducer();
+        consumer = createConsumer();
+
+        List<TopicPartition> partitions = new ArrayList<>();
+
+        // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
+        // we rely on topic auto-creation
+        List<PartitionInfo> partitionInfos = null;
+        long started = time.milliseconds();
+        while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+            partitionInfos = consumer.partitionsFor(topic);
+            Utils.sleep(Math.min(time.milliseconds() - started, 1000));
+        }
+        if (partitionInfos == null)
+            throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
+                    " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
+                    " this is your first use of the topic it may have taken too long to create.");
+
+        for (PartitionInfo partition : partitionInfos)
+            partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+        consumer.assign(partitions);
+
+        readToLogEnd();
+
+        thread = new WorkThread();
+        thread.start();
+
+        log.info("Finished reading KafakBasedLog for topic " + topic);
+
+        log.info("Started KafakBasedLog for topic " + topic);
+    }
+
+    public void stop() {
+        log.info("Stopping KafkaBasedLog for topic " + topic);
+
+        synchronized (this) {
+            stopRequested = true;
+        }
+        consumer.wakeup();
+
+        try {
+            thread.join();
+        } catch (InterruptedException e) {
+            throw new CopycatException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " +
+                    "down it's producer and consumer.", e);
+        }
+
+        try {
+            producer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to stop KafkaBasedLog producer", e);
+        }
+
+        try {
+            consumer.close();
+        } catch (KafkaException e) {
+            log.error("Failed to stop KafkaBasedLog consumer", e);
+        }
+
+        log.info("Stopped KafkaBasedLog for topic " + topic);
+    }
+
+    /**
+     * Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.
+     * Note that this checks the current, offsets, reads to them, and invokes the callback regardless of whether
+     * additional records have been written to the log. If the caller needs to ensure they have truly reached the end
+     * of the log, they must ensure there are no other writers during this period.
+     *
+     * This waits until the end of all partitions has been reached.
+     *
+     * This method is asynchronous. If you need a synchronous version, pass an instance of
+     * {@link org.apache.kafka.copycat.util.FutureCallback} as the {@param callback} parameter and wait on it to block.
+     *
+     * @param callback the callback to invoke once the end of the log has been reached.
+     */
+    public void readToEnd(Callback<Void> callback) {
+        producer.flush();
+        synchronized (this) {
+            readLogEndOffsetCallbacks.add(callback);
+        }
+        consumer.wakeup();
+    }
+
+    /**
+     * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
+     * @return the future associated with the operation
+     */
+    public Future<Void> readToEnd() {
+        FutureCallback<Void> future = new FutureCallback<>(null);
+        readToEnd(future);
+        return future;
+    }
+
+    public void send(K key, V value) {
+        send(key, value, null);
+    }
+
+    public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+        producer.send(new ProducerRecord<>(topic, key, value), callback);
+    }
+
+
+    private Producer<K, V> createProducer() {
+        // Always require producer acks to all to ensure durable writes
+        producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+        return new KafkaProducer<>(producerConfigs);
+    }
+
+    private Consumer<K, V> createConsumer() {
+        // Always force reset to the beginning of the log since this class wants to consume all available log data
+        consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return new KafkaConsumer<>(consumerConfigs);
+    }
+
+    private void poll(long timeoutMs) {
+        try {
+            ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
+            for (ConsumerRecord<K, V> record : records)
+                consumedCallback.onCompletion(null, record);
+        } catch (ConsumerWakeupException e) {
+            // Expected on get() or stop(). The calling code should handle this
+            throw e;
+        } catch (KafkaException e) {
+            log.error("Error polling: " + e);
+        }
+    }
+
+    private void readToLogEnd() {
+        log.trace("Reading to end of offset log");
+
+        Set<TopicPartition> assignment = consumer.assignment();
+
+        // This approach to getting the current end offset is hacky until we have an API for looking these up directly
+        Map<TopicPartition, Long> offsets = new HashMap<>();
+        for (TopicPartition tp : assignment) {
+            long offset = consumer.position(tp);
+            offsets.put(tp, offset);
+            consumer.seekToEnd(tp);
+        }
+
+        Map<TopicPartition, Long> endOffsets = new HashMap<>();
+        try {
+            poll(0);
+        } finally {
+            // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
+            // the consumers position is reset or it'll get into an inconsistent state.
+            for (TopicPartition tp : assignment) {
+                long startOffset = offsets.get(tp);
+                long endOffset = consumer.position(tp);
+                if (endOffset > startOffset) {
+                    endOffsets.put(tp, endOffset);
+                    consumer.seek(tp, startOffset);
+                }
+                log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
+            }
+        }
+
+        while (!endOffsets.isEmpty()) {
+            poll(Integer.MAX_VALUE);
+
+            Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<TopicPartition, Long> entry = it.next();
+                if (consumer.position(entry.getKey()) >= entry.getValue())
+                    it.remove();
+                else
+                    break;
+            }
+        }
+    }
+
+
+    private class WorkThread extends Thread {
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    int numCallbacks;
+                    synchronized (KafkaBasedLog.this) {
+                        if (stopRequested)
+                            break;
+                        numCallbacks = readLogEndOffsetCallbacks.size();
+                    }
+
+                    if (numCallbacks > 0) {
+                        try {
+                            readToLogEnd();
+                        } catch (ConsumerWakeupException e) {
+                            // Either received another get() call and need to retry reading to end of log or stop() was
+                            // called. Both are handled by restarting this loop.
+                            continue;
+                        }
+                    }
+
+                    synchronized (KafkaBasedLog.this) {
+                        // Only invoke exactly the number of callbacks we found before triggering the read to log end
+                        // since it is possible for another write + readToEnd to sneak in in the meantime
+                        for (int i = 0; i < numCallbacks; i++) {
+                            Callback<Void> cb = readLogEndOffsetCallbacks.poll();
+                            cb.onCompletion(null, null);
+                        }
+                    }
+
+                    try {
+                        poll(Integer.MAX_VALUE);
+                    } catch (ConsumerWakeupException e) {
+                        // See previous comment, both possible causes of this wakeup are handled by starting this loop again
+                        continue;
+                    }
+                }
+            } catch (Throwable t) {
+                log.error("Unexpected exception in KafkaBasedLog's work thread", t);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index e4d1d8e..d33f846 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -97,10 +97,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
-        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
         workerConfig = new WorkerConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 3ff3a62..13d5228 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -92,10 +92,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
-        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
         config = new WorkerConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index e75d2f9..4a30992 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -57,10 +57,10 @@ public class WorkerTest extends ThreadedTest {
         Properties workerProps = new Properties();
         workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
         workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
-        workerProps.setProperty("offset.key.converter.schemas.enable", "false");
-        workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+        workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+        workerProps.setProperty("internal.value.converter.schemas.enable", "false");
         config = new WorkerConfig(workerProps);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
new file mode 100644
index 0000000..0463b85
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceConnector;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DistributedHerder.class})
+@PowerMockIgnore("javax.management.*")
+public class DistributedHerderTest {
+    private static final List<String> CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2", "sink-test3");
+    private static final List<String> SOURCE_CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2");
+    private static final List<String> SINK_CONNECTOR_NAMES = Arrays.asList("sink-test3");
+    private static final String TOPICS_LIST_STR = "topic1,topic2";
+
+    private static final Map<String, String> CONFIG_STORAGE_CONFIG = Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+
+    @Mock private KafkaConfigStorage configStorage;
+    private DistributedHerder herder;
+    @Mock private Worker worker;
+    @Mock private Callback<String> createCallback;
+
+    private Map<String, Map<String, String>> connectorProps;
+    private Map<String, Class<? extends Connector>> connectorClasses;
+    private Map<String, Class<? extends Task>> connectorTaskClasses;
+    private Map<String, Connector> connectors;
+    private Properties taskProps;
+
+    @Before
+    public void setUp() {
+        worker = PowerMock.createMock(Worker.class);
+        herder = new DistributedHerder(worker, configStorage);
+
+        connectorProps = new HashMap<>();
+        connectorClasses = new HashMap<>();
+        connectorTaskClasses = new HashMap<>();
+        connectors = new HashMap<>();
+        for (String connectorName : CONNECTOR_NAMES) {
+            Class<? extends Connector> connectorClass = connectorName.contains("source") ? BogusSourceConnector.class : BogusSinkConnector.class;
+            Class<? extends Task> taskClass = connectorName.contains("source") ? BogusSourceTask.class : BogusSinkTask.class;
+            Connector connector = connectorName.contains("source") ? PowerMock.createMock(BogusSourceConnector.class) : PowerMock.createMock(BogusSinkConnector.class);
+
+            Map<String, String> props = new HashMap<>();
+            props.put(ConnectorConfig.NAME_CONFIG, connectorName);
+            props.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+            props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
+
+            connectorProps.put(connectorName, props);
+            connectorClasses.put(connectorName, connectorClass);
+            connectorTaskClasses.put(connectorName, taskClass);
+            connectors.put(connectorName, connector);
+        }
+
+        PowerMock.mockStatic(DistributedHerder.class);
+
+        // These can be anything since connectors can pass along whatever they want.
+        taskProps = new Properties();
+        taskProps.setProperty("foo", "bar");
+    }
+
+    @Test
+    public void testCreateSourceConnector() throws Exception {
+        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateSinkConnector() throws Exception {
+        String connectorName = SINK_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testDestroyConnector() throws Exception {
+        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        expectDestroy(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
+            @Override
+            public void onCompletion(Throwable error, Void result) {
+
+            }
+        });
+        herder.deleteConnector(CONNECTOR_NAMES.get(0), futureCb);
+        futureCb.get(1000L, TimeUnit.MILLISECONDS);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testCreateAndStop() throws Exception {
+        String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+        expectConfigStorageConfigureStart();
+        expectEmptyRestore();
+        expectAdd(connectorName);
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestoreAndStop() throws Exception {
+        String restoreConnectorName1 = SOURCE_CONNECTOR_NAMES.get(0);
+        String restoreConnectorName2 = SINK_CONNECTOR_NAMES.get(0);
+        String additionalConnectorName = SOURCE_CONNECTOR_NAMES.get(1);
+
+        expectConfigStorageConfigureStart();
+        expectRestore(Arrays.asList(restoreConnectorName1, restoreConnectorName2));
+        expectAdd(additionalConnectorName);
+        // Stopping the herder should correctly stop all restored and new connectors
+        expectStop(restoreConnectorName1);
+        expectStop(restoreConnectorName2);
+        expectStop(additionalConnectorName);
+        configStorage.stop();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.configure(CONFIG_STORAGE_CONFIG);
+        herder.start();
+        herder.addConnector(connectorProps.get(additionalConnectorName), createCallback);
+        herder.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigStorageConfigureStart() {
+        configStorage.configure(CONFIG_STORAGE_CONFIG);
+        PowerMock.expectLastCall();
+        configStorage.start();
+        PowerMock.expectLastCall();
+    }
+
+    private void expectAdd(String connectorName) throws Exception {
+        configStorage.putConnectorConfig(connectorName, connectorProps.get(connectorName));
+        PowerMock.expectLastCall();
+        expectInstantiateConnector(connectorName, true);
+    }
+
+    private void expectEmptyRestore() throws Exception {
+        expectRestore(Collections.<String>emptyList());
+    }
+
+    private void expectRestore(List<String> connectorNames) throws Exception {
+        Map<String, Integer> rootConfig = new HashMap<>();
+        Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+        for (String connName : connectorNames) {
+            rootConfig.put(connName, 0);
+            connectorConfigs.put(connName, connectorProps.get(connName));
+        }
+        EasyMock.expect(configStorage.snapshot())
+                .andReturn(new ClusterConfigState(1, rootConfig, connectorConfigs, Collections.EMPTY_MAP, Collections.EMPTY_SET));
+
+        // Restore never uses a callback
+        for (String connectorName : connectorNames)
+            expectInstantiateConnector(connectorName, false);
+    }
+
+    private void expectInstantiateConnector(String connectorName, boolean expectCallback) throws Exception {
+        PowerMock.expectPrivate(DistributedHerder.class, "instantiateConnector", connectorClasses.get(connectorName).getName())
+                .andReturn(connectors.get(connectorName));
+        if (expectCallback) {
+            createCallback.onCompletion(null, connectorName);
+            PowerMock.expectLastCall();
+        }
+
+        Connector connector = connectors.get(connectorName);
+        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
+        PowerMock.expectLastCall();
+        connector.start(new Properties());
+        PowerMock.expectLastCall();
+
+        // Just return the connector properties for the individual task we generate by default
+        EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(connectorTaskClasses.get(connectorName));
+
+        EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
+                .andReturn(Arrays.asList(taskProps));
+        // And we should instantiate the tasks. For a sink task, we should see added properties for
+        // the input topic partitions
+        Properties generatedTaskProps = new Properties();
+        generatedTaskProps.putAll(taskProps);
+        if (connectorName.contains("sink"))
+            generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+        ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
+        worker.addTask(taskId, connectorTaskClasses.get(connectorName).getName(), generatedTaskProps);
+        PowerMock.expectLastCall();
+    }
+
+    private void expectStop(String connectorName) {
+        worker.stopTask(new ConnectorTaskId(connectorName, 0));
+        EasyMock.expectLastCall();
+        Connector connector = connectors.get(connectorName);
+        connector.stop();
+        EasyMock.expectLastCall();
+    }
+
+    private void expectDestroy(String connectorName) {
+        expectStop(connectorName);
+        configStorage.putConnectorConfig(connectorName, null);
+        PowerMock.expectLastCall();
+    }
+
+    // We need to use a real class here due to some issue with mocking java.lang.Class
+    private abstract class BogusSourceConnector extends SourceConnector {
+    }
+
+    private abstract class BogusSourceTask extends SourceTask {
+    }
+
+    private abstract class BogusSinkConnector extends SinkConnector {
+    }
+
+    private abstract class BogusSinkTask extends SourceTask {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
index 477893b..606b94d 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.copycat.runtime.standalone;
 import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.Task;
 import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
 import org.apache.kafka.copycat.runtime.Worker;
 import org.apache.kafka.copycat.sink.SinkConnector;
 import org.apache.kafka.copycat.sink.SinkTask;
@@ -39,6 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
@@ -54,7 +57,7 @@ public class StandaloneHerderTest {
     private Connector connector;
     @Mock protected Callback<String> createCallback;
 
-    private Properties connectorProps;
+    private Map<String, String> connectorProps;
     private Properties taskProps;
 
     @Before
@@ -62,9 +65,9 @@ public class StandaloneHerderTest {
         worker = PowerMock.createMock(Worker.class);
         herder = new StandaloneHerder(worker);
 
-        connectorProps = new Properties();
-        connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
-        connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+        connectorProps = new HashMap<>();
+        connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+        connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
         PowerMock.mockStatic(StandaloneHerder.class);
 
         // These can be anything since connectors can pass along whatever they want.
@@ -74,8 +77,8 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateSourceConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceClass.class);
-        expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
         PowerMock.replayAll();
 
         herder.addConnector(connectorProps, createCallback);
@@ -85,8 +88,8 @@ public class StandaloneHerderTest {
 
     @Test
     public void testCreateSinkConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSinkClass.class);
-        expectAdd(BogusSinkClass.class, BogusSinkTask.class, true);
+        connector = PowerMock.createMock(BogusSinkConnector.class);
+        expectAdd(BogusSinkConnector.class, BogusSinkTask.class, true);
 
         PowerMock.replayAll();
 
@@ -97,8 +100,8 @@ public class StandaloneHerderTest {
 
     @Test
     public void testDestroyConnector() throws Exception {
-        connector = PowerMock.createMock(BogusSourceClass.class);
-        expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
         expectDestroy();
         PowerMock.replayAll();
 
@@ -114,32 +117,31 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testCreateAndStop() throws Exception {
+        connector = PowerMock.createMock(BogusSourceConnector.class);
+        expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectStop();
+        PowerMock.replayAll();
 
-    private void expectAdd(Class<? extends Connector> connClass,
-                             Class<? extends Task> taskClass,
-                             boolean sink) throws Exception {
-        expectCreate(connClass, taskClass, sink, true);
-    }
+        herder.addConnector(connectorProps, createCallback);
+        herder.stop();
 
-    private void expectRestore(Class<? extends Connector> connClass,
-                                 Class<? extends Task> taskClass) throws Exception {
-        // Restore never uses a callback. These tests always use sources
-        expectCreate(connClass, taskClass, false, false);
+        PowerMock.verifyAll();
     }
 
-    private void expectCreate(Class<? extends Connector> connClass,
-                                Class<? extends Task> taskClass,
-                                boolean sink, boolean expectCallback) throws Exception {
-        connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+    private void expectAdd(Class<? extends Connector> connClass,
+                           Class<? extends Task> taskClass,
+                           boolean sink) throws Exception {
+        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
 
         PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
                 .andReturn(connector);
-        if (expectCallback) {
-            createCallback.onCompletion(null, CONNECTOR_NAME);
-            PowerMock.expectLastCall();
-        }
 
-        connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
+        createCallback.onCompletion(null, CONNECTOR_NAME);
+        PowerMock.expectLastCall();
+
+        connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
         PowerMock.expectLastCall();
         connector.start(new Properties());
         PowerMock.expectLastCall();
@@ -171,13 +173,13 @@ public class StandaloneHerderTest {
     }
 
     // We need to use a real class here due to some issue with mocking java.lang.Class
-    private abstract class BogusSourceClass extends SourceConnector {
+    private abstract class BogusSourceConnector extends SourceConnector {
     }
 
     private abstract class BogusSourceTask extends SourceTask {
     }
 
-    private abstract class BogusSinkClass extends SinkConnector {
+    private abstract class BogusSinkConnector extends SinkConnector {
     }
 
     private abstract class BogusSinkTask extends SourceTask {

http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
new file mode 100644
index 0000000..b02b752
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.copycat.data.Field;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
+import org.apache.kafka.copycat.util.TestFuture;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConfigStorage.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaConfigStorageTest {
+    private static final String TOPIC = "copycat-configs";
+    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+
+    static {
+        DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
+        DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+    }
+
+    private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
+    private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
+    private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
+
+    // Need a) connector with multiple tasks and b) multiple connectors
+    private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
+            new ConnectorTaskId("connector1", 0),
+            new ConnectorTaskId("connector1", 1),
+            new ConnectorTaskId("connector2", 0)
+    );
+    private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
+
+    // Need some placeholders -- the contents don't matter here, just that they are restored properly
+    private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
+            Collections.singletonMap("config-key-one", "config-value-one"),
+            Collections.singletonMap("config-key-two", "config-value-two"),
+            Collections.singletonMap("config-key-three", "config-value-three")
+    );
+    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
+            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
+    );
+    private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
+    );
+
+    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
+            = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
+
+    // The exact format doesn't matter here since both conversions are mocked
+    private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
+            "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
+            "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
+            "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
+    );
+
+    @Mock
+    private Converter converter;
+    @Mock
+    private Callback<String> connectorReconfiguredCallback;
+    @Mock
+    private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback;
+    @Mock
+    KafkaBasedLog<String, byte[]> storeLog;
+    private KafkaConfigStorage configStorage;
+
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+
+    private long logOffset = 0;
+
+    @Before
+    public void setUp() {
+        configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"},
+                converter, connectorReconfiguredCallback, tasksReconfiguredCallback);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+        configStorage.start();
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configState = configStorage.snapshot();
+        assertEquals(0, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Second should also block and all configs should still be available
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configState = configStorage.snapshot();
+        assertEquals(1, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigs() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        // Task configs should read to end, write to the log, read to end, write root, then read to end again
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 2); // Starts with 0 tasks, after update has 2
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Bootstrap as if we had already added the connector, but no tasks had been added yet
+        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+        // Writing task task configs should block until all the writes have been performed and the root record update
+        // has completed
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configStorage.putTaskConfigs(taskConfigs);
+
+        // Validate root config by listing all connectors and tasks
+        configState = configStorage.snapshot();
+        assertEquals(2, configState.offset());
+        String connectorName = CONNECTOR_IDS.get(0);
+        assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        // Restoring data should notify only of the latest values after loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                // Connector after root update should make it through, task update shouldn't
+                new ConsumerRecord<>(TOPIC, 0, 5, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+                new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 7;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be last read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
+        // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
+        // We start out by loading an initial configuration where we started to write a task update and failed before
+        // writing an the commit, and then compaction cleaned up the earlier record.
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                // This is the record that has been compacted:
+                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+
+        // One failed attempt to write new task configs
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+
+        // Successful attempt to write new task config
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 1); // Updated to just 1 task
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0)));
+        EasyMock.expectLastCall();
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.start();
+        // After reading the log, it should have been in an inconsistent state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(5, configState.offset()); // Should always be last read, not last committed
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
+        assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_IDS.get(0))), configState.inconsistentConnectors());
+
+        // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
+        try {
+            configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
+            fail("Should have failed due to incomplete task set.");
+        } catch (KafkaException e) {
+            // expected
+        }
+
+        // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
+        // we are going to shrink the number of tasks to 1
+        configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        // Validate updated config
+        configState = configStorage.snapshot();
+        // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
+        // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
+        assertEquals(7, configState.offset());
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
+                EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
+
+    // If non-empty, deserializations should be a LinkedHashMap
+    private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
+                             final Map<byte[], Struct> deserializations) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, rec);
+                return null;
+            }
+        });
+        for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
+            // Note null schema because default settings for internal serialization are schema-less
+            EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
+                    .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
+        }
+    }
+
+    private void expectStop() {
+        storeLog.stop();
+        PowerMock.expectLastCall();
+    }
+
+    // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
+    // from the log. Validate the data that is captured when the conversion is performed matches the specified data
+    // (by checking a single field's value)
+    private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                        final String dataFieldName, final Object dataFieldValue) {
+        final Capture<Struct> capturedRecord = EasyMock.newCapture();
+        EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+                .andReturn(serialized);
+        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+        PowerMock.expectLastCall();
+        EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
+                .andAnswer(new IAnswer<SchemaAndValue>() {
+                    @Override
+                    public SchemaAndValue answer() throws Throwable {
+                        assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
+                        // Note null schema because default settings for internal serialization are schema-less
+                        return new SchemaAndValue(null, structToMap(capturedRecord.getValue()));
+                    }
+                });
+    }
+
+    // This map needs to maintain ordering
+    private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
+        EasyMock.expect(storeLog.readToEnd())
+                .andAnswer(new IAnswer<Future<Void>>() {
+                    @Override
+                    public Future<Void> answer() throws Throwable {
+                        TestFuture<Void> future = new TestFuture<Void>();
+                        for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
+                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue()));
+                        future.resolveOnGet((Void) null);
+                        return future;
+                    }
+                });
+    }
+
+
+    private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                                                 final String dataFieldName, final Object dataFieldValue) {
+        expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(configKey, serialized);
+        expectReadToEnd(recordsToRead);
+    }
+
+    // Manually insert a connector into config storage, updating the task configs, connector config, and root config
+    private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
+        Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
+        for (int i = 0; i < taskConfigs.size(); i++)
+            storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
+
+        Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
+        connectorConfigs.put(connectorName, connectorConfig);
+
+        Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
+    }
+
+    // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
+    private Map<String, Object> structToMap(Struct struct) {
+        HashMap<String, Object> result = new HashMap<>();
+        for (Field field : struct.schema().fields())
+            result.put(field.name(), struct.get(field));
+        return result;
+    }
+
+}