You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/03/03 17:55:18 UTC

kafka git commit: KAFKA-2934; Offset storage file configuration in Connect standalone mode is not included in StandaloneConfig

Repository: kafka
Updated Branches:
  refs/heads/trunk 10394aa80 -> 079c88178


KAFKA-2934; Offset storage file configuration in Connect standalone mode is not included in StandaloneConfig

Added offsetBackingStore config to StandaloneConfig and DistributedConfig;
Added config for offset.storage.topic and config.storage.topic into DistributedConfig;

Author: jinxing <ji...@fenbi.com>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #734 from ZoneMayor/trunk-KAFKA-2934


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/079c8817
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/079c8817
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/079c8817

Branch: refs/heads/trunk
Commit: 079c88178dff4b3a4c9de55629e7d15b60e5f562
Parents: 10394aa
Author: jinxing <ji...@fenbi.com>
Authored: Thu Mar 3 08:54:37 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Mar 3 08:54:37 2016 -0800

----------------------------------------------------------------------
 .../kafka/connect/cli/ConnectDistributed.java   |  4 +--
 .../apache/kafka/connect/runtime/Worker.java    |  2 +-
 .../runtime/distributed/DistributedConfig.java  | 32 ++++++++++++++++++-
 .../runtime/distributed/DistributedHerder.java  |  2 +-
 .../runtime/standalone/StandaloneConfig.java    | 12 ++++++-
 .../connect/storage/FileOffsetBackingStore.java | 10 +++---
 .../connect/storage/KafkaConfigStorage.java     | 16 +++++-----
 .../storage/KafkaOffsetBackingStore.java        | 18 +++++------
 .../storage/KafkaStatusBackingStore.java        | 18 +++++------
 .../storage/MemoryOffsetBackingStore.java       |  3 +-
 .../storage/MemoryStatusBackingStore.java       |  3 +-
 .../connect/storage/OffsetBackingStore.java     | 10 ++++--
 .../connect/storage/StatusBackingStore.java     | 10 ++++--
 .../connect/runtime/WorkerSinkTaskTest.java     |  1 +
 .../runtime/WorkerSinkTaskThreadedTest.java     |  1 +
 .../connect/runtime/WorkerSourceTaskTest.java   |  1 +
 .../kafka/connect/runtime/WorkerTest.java       |  3 +-
 .../distributed/DistributedHerderTest.java      |  8 +++--
 .../storage/FileOffsetBackingStoreTest.java     | 19 +++++++----
 .../connect/storage/KafkaConfigStorageTest.java | 22 +++++++++----
 .../storage/KafkaOffsetBackingStoreTest.java    | 33 ++++++++++----------
 21 files changed, 153 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index bc5b75a..849fa2f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -67,12 +67,12 @@ public class ConnectDistributed {
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
         KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
-        offsetBackingStore.configure(config.originals());
+        offsetBackingStore.configure(config);
 
         Worker worker = new Worker(workerId, time, config, offsetBackingStore);
 
         StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
-        statusBackingStore.configure(config.originals());
+        statusBackingStore.configure(config);
 
         DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
         final Connect connect = new Connect(worker, herder, rest);

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 39b69a3..4c0d016 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -96,7 +96,7 @@ public class Worker {
         this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false);
 
         this.offsetBackingStore = offsetBackingStore;
-        this.offsetBackingStore.configure(config.originals());
+        this.offsetBackingStore.configure(config);
     }
 
     public void start() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 0c5c92f..f5aa8ae 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -67,6 +67,24 @@ public class DistributedConfig extends WorkerConfig {
             " fails to catch up within worker.sync.timeout.ms, leave the Connect cluster for this long before rejoining.";
     public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
 
+    /**
+     * <code>offset.storage.topic</code>
+     */
+    public static final String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
+    private static final String OFFSET_STORAGE_TOPIC_CONFIG_DOC = "kafka topic to store connector offsets in";
+
+    /**
+     * <code>config.storage.topic</code>
+     */
+    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
+    private static final String CONFIG_TOPIC_CONFIG_DOC = "kafka topic to store configs";
+
+    /**
+     * <code>status.storage.topic</code>
+     */
+    public static final String STATUS_STORAGE_TOPIC_CONFIG = "status.storage.topic";
+    public static final String STATUS_STORAGE_TOPIC_CONFIG_DOC = "kafka topic to track connector and task status";
+
     static {
         CONFIG = baseConfigDef()
                 .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
@@ -161,7 +179,19 @@ public class DistributedConfig extends WorkerConfig {
                         ConfigDef.Type.INT,
                         WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
                         ConfigDef.Importance.MEDIUM,
-                        WORKER_UNSYNC_BACKOFF_MS_DOC);
+                        WORKER_UNSYNC_BACKOFF_MS_DOC)
+                .define(OFFSET_STORAGE_TOPIC_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ConfigDef.Importance.HIGH,
+                        OFFSET_STORAGE_TOPIC_CONFIG_DOC)
+                .define(CONFIG_TOPIC_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ConfigDef.Importance.HIGH,
+                        CONFIG_TOPIC_CONFIG_DOC)
+                .define(STATUS_STORAGE_TOPIC_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ConfigDef.Importance.HIGH,
+                        STATUS_STORAGE_TOPIC_CONFIG_DOC);
     }
 
     public DistributedConfig(Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 83ed714..84ad6e0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -137,7 +137,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             this.configStorage = configStorage;
         } else {
             this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), connectorConfigCallback(), taskConfigCallback());
-            this.configStorage.configure(config.originals());
+            this.configStorage.configure(config);
         }
         configState = ClusterConfigState.EMPTY;
         this.time = time;

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
index 7cefe22..8014b3a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
@@ -25,8 +25,18 @@ import java.util.Map;
 public class StandaloneConfig extends WorkerConfig {
     private static final ConfigDef CONFIG;
 
+    /**
+     * <code>offset.storage.file.filename</code>
+     */
+    public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
+    private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "file to store offset data in";
+
     static {
-        CONFIG = baseConfigDef();
+        CONFIG = baseConfigDef()
+                .define(OFFSET_STORAGE_FILE_FILENAME_CONFIG,
+                        ConfigDef.Type.STRING,
+                        ConfigDef.Importance.HIGH,
+                        OFFSET_STORAGE_FILE_FILENAME_DOC);
     }
 
     public StandaloneConfig(Map<String, String> props) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
index 1032bce..f377617 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,7 +42,6 @@ import java.util.Map;
 public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
     private static final Logger log = LoggerFactory.getLogger(FileOffsetBackingStore.class);
 
-    public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
     private File file;
 
     public FileOffsetBackingStore() {
@@ -48,10 +49,9 @@ public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
     }
 
     @Override
-    public void configure(Map<String, ?> props) {
-        super.configure(props);
-        String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
-        file = new File(filename);
+    public void configure(WorkerConfig config) {
+        super.configure(config);
+        file = new File(config.getString(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 6a06fec..7f2fb83 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -20,6 +20,7 @@ package org.apache.kafka.connect.storage;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -33,6 +34,7 @@ import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -147,8 +149,6 @@ import java.util.concurrent.TimeoutException;
 public class KafkaConfigStorage {
     private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class);
 
-    public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
-
     public static final String CONNECTOR_PREFIX = "connector-";
 
     public static String CONNECTOR_KEY(String connectorName) {
@@ -216,19 +216,19 @@ public class KafkaConfigStorage {
         offset = -1;
     }
 
-    public void configure(Map<String, ?> configs) {
-        if (configs.get(CONFIG_TOPIC_CONFIG) == null)
-            throw new ConnectException("Must specify topic for connector configuration.");
-        topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
+    public void configure(DistributedConfig config) {
+        topic = config.getString(DistributedConfig.CONFIG_TOPIC_CONFIG);
+        if (topic.equals(""))
+            throw new ConfigException("Must specify topic for connector configuration.");
 
         Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
+        producerProps.putAll(config.originals());
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>();
-        consumerProps.putAll(configs);
+        consumerProps.putAll(config.originals());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index dfb8c51..e8984fb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -21,10 +21,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConvertingFutureCallback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -53,27 +55,25 @@ import java.util.concurrent.TimeoutException;
 public class KafkaOffsetBackingStore implements OffsetBackingStore {
     private static final Logger log = LoggerFactory.getLogger(KafkaOffsetBackingStore.class);
 
-    public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
-
     private KafkaBasedLog<byte[], byte[]> offsetLog;
     private HashMap<ByteBuffer, ByteBuffer> data;
 
     @Override
-    public void configure(Map<String, ?> configs) {
-        String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
-        if (topic == null)
-            throw new ConnectException("Offset storage topic must be specified");
+    public void configure(WorkerConfig config) {
+        String topic = config.getString(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG);
+        if (topic.equals(""))
+            throw new ConfigException("Offset storage topic must be specified");
 
         data = new HashMap<>();
 
         Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
+        producerProps.putAll(config.originals());
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>();
-        consumerProps.putAll(configs);
+        consumerProps.putAll(config.originals());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index db7ccc7..eb9a48c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -31,10 +32,11 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -76,8 +78,6 @@ import java.util.Set;
 public class KafkaStatusBackingStore implements StatusBackingStore {
     private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
 
-    public static final String STATUS_TOPIC_CONFIG = "status.storage.topic";
-
     private static final String TASK_STATUS_PREFIX = "status-task-";
     private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
 
@@ -117,19 +117,19 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
     }
 
     @Override
-    public void configure(Map<String, ?> configs) {
-        if (configs.get(STATUS_TOPIC_CONFIG) == null)
-            throw new ConnectException("Must specify topic for connector status.");
-        this.topic = (String) configs.get(STATUS_TOPIC_CONFIG);
+    public void configure(WorkerConfig config) {
+        this.topic = config.getString(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG);
+        if (topic.equals(""))
+            throw new ConfigException("Must specify topic for connector status.");
 
         Map<String, Object> producerProps = new HashMap<>();
-        producerProps.putAll(configs);
+        producerProps.putAll(config.originals());
         producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
         producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
 
         Map<String, Object> consumerProps = new HashMap<>();
-        consumerProps.putAll(configs);
+        consumerProps.putAll(config.originals());
         consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
         consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index d62e38f..669e5f5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +47,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
     }
 
     @Override
-    public void configure(Map<String, ?> props) {
+    public void configure(WorkerConfig config) {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
index 96b235b..f21c4ad 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.storage;
 
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.Table;
 
@@ -37,7 +38,7 @@ public class MemoryStatusBackingStore implements StatusBackingStore {
     }
 
     @Override
-    public void configure(Map<String, ?> configs) {
+    public void configure(WorkerConfig config) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
index 83fdb53..1b74a90 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
@@ -17,7 +17,7 @@
 
 package org.apache.kafka.connect.storage;
 
-import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.Callback;
 
 import java.nio.ByteBuffer;
@@ -38,7 +38,7 @@ import java.util.concurrent.Future;
  * connector so that the shared namespace does not result in conflicting keys.
  * </p>
  */
-public interface OffsetBackingStore extends Configurable {
+public interface OffsetBackingStore {
 
     /**
      * Start this offset store.
@@ -69,4 +69,10 @@ public interface OffsetBackingStore extends Configurable {
      */
     public Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
                             Callback<Void> callback);
+
+    /**
+     * Configure class with the given key-value pairs
+     * @param config can be DistributedConfig or StandaloneConfig
+     */
+    public void configure(WorkerConfig config);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
index 6464f89..268c5df 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
@@ -16,15 +16,15 @@
  **/
 package org.apache.kafka.connect.storage;
 
-import org.apache.kafka.common.Configurable;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.Collection;
 import java.util.Set;
 
-public interface StatusBackingStore extends Configurable {
+public interface StatusBackingStore {
 
     /**
      * Start dependent services (if needed)
@@ -97,4 +97,10 @@ public interface StatusBackingStore extends Configurable {
      * Flush any pending writes
      */
     void flush();
+
+    /**
+     * Configure class with the given key-value pairs
+     * @param config config for StatusBackingStore
+     */
+    public void configure(WorkerConfig config);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index aef3344..f419a7b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -115,6 +115,7 @@ public class WorkerSinkTaskTest {
         workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index b37b34f..ac10d59 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -117,6 +117,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 14c0c6e..8f57336 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -109,6 +109,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         config = new StandaloneConfig(workerProps);
         producerCallbacks = EasyMock.newCapture();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 0ca405e..67d3fdc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -79,6 +79,7 @@ public class WorkerTest extends ThreadedTest {
         workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter");
         workerProps.put("internal.key.converter.schemas.enable", "false");
         workerProps.put("internal.value.converter.schemas.enable", "false");
+        workerProps.put("offset.storage.file.filename", "/tmp/connect.offsets");
         config = new StandaloneConfig(workerProps);
     }
 
@@ -452,7 +453,7 @@ public class WorkerTest extends ThreadedTest {
     }
 
     private void expectStartStorage() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
         EasyMock.expectLastCall();
         offsetBackingStore.start();
         EasyMock.expectLastCall();

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index d439e96..8017ecf 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -31,7 +31,6 @@ import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
-import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -68,8 +67,8 @@ import static org.junit.Assert.assertTrue;
 public class DistributedHerderTest {
     private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
     static {
-        HERDER_CONFIG.put(KafkaStatusBackingStore.STATUS_TOPIC_CONFIG, "status-topic");
-        HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+        HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "config-topic");
         HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
         // The WorkerConfig base class has some required settings without defaults
@@ -77,6 +76,9 @@ public class DistributedHerderTest {
         HERDER_CONFIG.put(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         HERDER_CONFIG.put(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
         HERDER_CONFIG.put(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        HERDER_CONFIG.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+        HERDER_CONFIG.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        HERDER_CONFIG.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
     }
     private static final String MEMBER_URL = "memberUrl";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
index 3833e88..6055d9a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
+ S* <p/>
  * http://www.apache.org/licenses/LICENSE-2.0
  * <p/>
  * Unless required by applicable law or agreed to in writing, software
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.easymock.EasyMock;
 import org.junit.After;
@@ -36,7 +37,8 @@ import static org.junit.Assert.assertEquals;
 public class FileOffsetBackingStoreTest {
 
     FileOffsetBackingStore store;
-    Map<String, Object> props;
+    Map<String, String> props;
+    StandaloneConfig config;
     File tempFile;
 
     private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
@@ -50,9 +52,14 @@ public class FileOffsetBackingStoreTest {
     public void setup() throws IOException {
         store = new FileOffsetBackingStore();
         tempFile = File.createTempFile("fileoffsetbackingstore", null);
-        props = new HashMap<>();
-        props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
-        store.configure(props);
+        props = new HashMap<String, String>();
+        props.put(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
+        props.put(StandaloneConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        props.put(StandaloneConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        props.put(StandaloneConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        props.put(StandaloneConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        config = new StandaloneConfig(props);
+        store.configure(config);
         store.start();
     }
 
@@ -87,7 +94,7 @@ public class FileOffsetBackingStoreTest {
 
         // Restore into a new store to ensure correct reload from scratch
         FileOffsetBackingStore restore = new FileOffsetBackingStore();
-        restore.configure(props);
+        restore.configure(config);
         restore.start();
         Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
         assertEquals(buffer("value"), values.get(buffer("key")));

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
index cfc713f..f95704c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.KafkaBasedLog;
@@ -64,10 +65,19 @@ import static org.junit.Assert.fail;
 public class KafkaConfigStorageTest {
     private static final String TOPIC = "connect-configs";
     private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+    private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
 
     static {
-        DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC);
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
         DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS);
     }
 
     private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
@@ -139,7 +149,7 @@ public class KafkaConfigStorageTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
         assertEquals(TOPIC, capturedTopic.getValue());
         assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
         assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@@ -179,7 +189,7 @@ public class KafkaConfigStorageTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Null before writing
@@ -247,7 +257,7 @@ public class KafkaConfigStorageTest {
         PowerMock.replayAll();
 
 
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Bootstrap as if we had already added the connector, but no tasks had been added yet
@@ -314,7 +324,7 @@ public class KafkaConfigStorageTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
 
         // Should see a single connector and its config should be the last one seen anywhere in the log
@@ -383,7 +393,7 @@ public class KafkaConfigStorageTest {
 
         PowerMock.replayAll();
 
-        configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
         configStorage.start();
         // After reading the log, it should have been in an inconsistent state
         ClusterConfigState configState = configStorage.snapshot();

http://git-wip-us.apache.org/repos/asf/kafka/blob/079c8817/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index 22bb376..aa92942 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -23,7 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.KafkaBasedLog;
 import org.easymock.Capture;
@@ -62,9 +62,20 @@ import static org.junit.Assert.fail;
 public class KafkaOffsetBackingStoreTest {
     private static final String TOPIC = "connect-offsets";
     private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
+    private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
     static {
-        DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
         DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
+        DEFAULT_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, "connect-configs");
+        DEFAULT_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        DEFAULT_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
+        DEFAULT_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        DEFAULT_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_PROPS);
+
     }
     private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
     static {
@@ -95,12 +106,6 @@ public class KafkaOffsetBackingStoreTest {
         store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"});
     }
 
-    @Test(expected = ConnectException.class)
-    public void testMissingTopic() {
-        store = new KafkaOffsetBackingStore();
-        store.configure(Collections.<String, Object>emptyMap());
-    }
-
     @Test
     public void testStartStop() throws Exception {
         expectConfigure();
@@ -109,7 +114,7 @@ public class KafkaOffsetBackingStoreTest {
 
         PowerMock.replayAll();
 
-        store.configure(DEFAULT_PROPS);
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         assertEquals(TOPIC, capturedTopic.getValue());
         assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
         assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
@@ -135,7 +140,7 @@ public class KafkaOffsetBackingStoreTest {
 
         PowerMock.replayAll();
 
-        store.configure(DEFAULT_PROPS);
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
         HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
         assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
@@ -199,9 +204,7 @@ public class KafkaOffsetBackingStoreTest {
 
         PowerMock.replayAll();
 
-
-
-        store.configure(DEFAULT_PROPS);
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
 
         // Getting from empty store should return nulls
@@ -285,9 +288,7 @@ public class KafkaOffsetBackingStoreTest {
 
         PowerMock.replayAll();
 
-
-
-        store.configure(DEFAULT_PROPS);
+        store.configure(DEFAULT_DISTRIBUTED_CONFIG);
         store.start();
 
         // Set some offsets