You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/30 23:00:15 UTC

[4/4] kafka git commit: KAFKA-2369: Add REST API for Copycat.

KAFKA-2369: Add REST API for Copycat.

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Gwen Shapira, James Cheng

Closes #378 from ewencp/kafka-2369-copycat-rest-api


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c001b204
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c001b204
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c001b204

Branch: refs/heads/trunk
Commit: c001b2040cb7bf3a24aa77ec8de1312d4f780620
Parents: efdc2ad
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Fri Oct 30 15:00:00 2015 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Fri Oct 30 15:00:00 2015 -0700

----------------------------------------------------------------------
 build.gradle                                    |  84 ++++-
 checkstyle/import-control.xml                   |  10 +
 .../kafka/copycat/cli/CopycatDistributed.java   |  35 +-
 .../kafka/copycat/cli/CopycatStandalone.java    |  21 +-
 .../apache/kafka/copycat/cli/WorkerConfig.java  | 118 ------
 .../copycat/errors/AlreadyExistsException.java  |  35 ++
 .../kafka/copycat/errors/NotFoundException.java |  35 ++
 .../apache/kafka/copycat/runtime/Copycat.java   |   7 +-
 .../apache/kafka/copycat/runtime/Herder.java    |  92 ++++-
 .../runtime/SourceTaskOffsetCommitter.java      |   1 -
 .../apache/kafka/copycat/runtime/Worker.java    |  15 +-
 .../kafka/copycat/runtime/WorkerConfig.java     | 138 +++++++
 .../kafka/copycat/runtime/WorkerSinkTask.java   |   1 -
 .../copycat/runtime/WorkerSinkTaskThread.java   |   1 -
 .../kafka/copycat/runtime/WorkerSourceTask.java |   1 -
 .../runtime/distributed/ClusterConfigState.java |  11 +-
 .../runtime/distributed/CopycatProtocol.java    |  43 ++-
 .../runtime/distributed/DistributedConfig.java  | 187 ++++++++++
 .../runtime/distributed/DistributedHerder.java  | 351 +++++++++++++-----
 .../distributed/DistributedHerderConfig.java    | 191 ----------
 .../runtime/distributed/NotLeaderException.java |  19 +-
 .../runtime/distributed/WorkerCoordinator.java  |  23 +-
 .../runtime/distributed/WorkerGroupMember.java  |   9 +-
 .../kafka/copycat/runtime/rest/RestServer.java  | 258 +++++++++++++
 .../runtime/rest/entities/ConnectorInfo.java    |  81 +++++
 .../rest/entities/CreateConnectorRequest.java   |  59 +++
 .../runtime/rest/entities/ErrorMessage.java     |  63 ++++
 .../runtime/rest/entities/ServerInfo.java       |  41 +++
 .../copycat/runtime/rest/entities/TaskInfo.java |  58 +++
 .../rest/errors/CopycatExceptionMapper.java     |  60 +++
 .../rest/errors/CopycatRestException.java       |  70 ++++
 .../rest/resources/ConnectorsResource.java      | 201 ++++++++++
 .../runtime/rest/resources/RootResource.java    |  36 ++
 .../runtime/standalone/StandaloneConfig.java    |  35 ++
 .../runtime/standalone/StandaloneHerder.java    | 199 +++++++---
 .../kafka/copycat/util/ConnectorTaskId.java     |  10 +-
 .../copycat/util/ConvertingFutureCallback.java  |   3 +-
 .../kafka/copycat/util/FutureCallback.java      |   4 +
 .../copycat/runtime/WorkerSinkTaskTest.java     |   4 +-
 .../copycat/runtime/WorkerSourceTaskTest.java   |   4 +-
 .../kafka/copycat/runtime/WorkerTest.java       |  10 +-
 .../distributed/DistributedHerderTest.java      | 236 ++++++++++--
 .../distributed/WorkerCoordinatorTest.java      |  21 +-
 .../rest/resources/ConnectorsResourceTest.java  | 364 +++++++++++++++++++
 .../standalone/StandaloneHerderTest.java        | 228 ++++++++++--
 .../copycat/storage/KafkaConfigStorageTest.java |  11 +-
 tests/kafkatest/services/copycat.py             |  81 ++++-
 .../kafkatest/tests/copycat_distributed_test.py |   8 +-
 tests/kafkatest/tests/copycat_rest_test.py      | 163 +++++++++
 tests/kafkatest/tests/copycat_test.py           |   4 +-
 .../templates/copycat-distributed.properties    |   2 +
 tests/setup.py                                  |   2 +-
 vagrant/system-test-Vagrantfile.local           |   4 +
 53 files changed, 3086 insertions(+), 662 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 2184031..d177e2e 100644
--- a/build.gradle
+++ b/build.gradle
@@ -33,6 +33,9 @@ def junit='junit:junit:4.11'
 def easymock='org.easymock:easymock:3.3.1'
 def powermock='org.powermock:powermock-module-junit4:1.6.2'
 def powermock_easymock='org.powermock:powermock-api-easymock:1.6.2'
+def jackson_version = '2.5.4'
+def jetty_version = '9.2.12.v20150709'
+def jersey_version = '2.22.1'
 
 allprojects {
   apply plugin: 'idea'
@@ -501,7 +504,7 @@ project(':tools') {
     dependencies {
         compile project(':clients')
         compile 'net.sourceforge.argparse4j:argparse4j:0.5.0'
-        compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
+        compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
         compile "$slf4jlog4j"
 
         testCompile "$junit"
@@ -671,6 +674,21 @@ project(':copycat:api') {
     include "**/org/apache/kafka/copycat/*"
   }
 
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('copycat-*')
+    }
+    into "$buildDir/dependant-libs"
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+
   artifacts {
     archives testJar
   }
@@ -692,7 +710,7 @@ project(':copycat:json') {
   dependencies {
     compile project(':copycat:api')
     compile "$slf4japi"
-    compile 'com.fasterxml.jackson.core:jackson-databind:2.5.4'
+    compile "com.fasterxml.jackson.core:jackson-databind:$jackson_version"
 
     testCompile "$junit"
     testCompile "$easymock"
@@ -717,6 +735,21 @@ project(':copycat:json') {
     include "**/org/apache/kafka/copycat/*"
   }
 
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('copycat-*')
+    }
+    into "$buildDir/dependant-libs"
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+
   artifacts {
     archives testJar
   }
@@ -729,18 +762,6 @@ project(':copycat:json') {
     configFile = new File(rootDir, "checkstyle/checkstyle.xml")
   }
   test.dependsOn('checkstyleMain', 'checkstyleTest')
-
-  tasks.create(name: "copyDependantLibs", type: Copy) {
-    from (configurations.runtime) {
-      exclude('kafka-clients*')
-      exclude('copycat-*')
-    }
-    into "$buildDir/dependant-libs"
-  }
-
-  jar {
-    dependsOn copyDependantLibs
-  }
 }
 
 project(':copycat:runtime') {
@@ -752,6 +773,11 @@ project(':copycat:runtime') {
     compile project(':clients')
     compile "$slf4japi"
 
+    compile "org.eclipse.jetty:jetty-server:$jetty_version"
+    compile "org.eclipse.jetty:jetty-servlet:$jetty_version"
+    compile "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$jackson_version"
+    compile "org.glassfish.jersey.containers:jersey-container-servlet:$jersey_version"
+
     testCompile "$junit"
     testCompile "$easymock"
     testCompile "$powermock"
@@ -777,6 +803,21 @@ project(':copycat:runtime') {
     include "**/org/apache/kafka/copycat/*"
   }
 
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('copycat-*')
+    }
+    into "$buildDir/dependant-libs"
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+
   artifacts {
     archives testJar
   }
@@ -822,6 +863,21 @@ project(':copycat:file') {
     include "**/org/apache/kafka/copycat/*"
   }
 
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('copycat-*')
+    }
+    into "$buildDir/dependant-libs"
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+
   artifacts {
     archives testJar
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 187bee8..75027f5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -161,6 +161,14 @@
 
     <subpackage name="runtime">
       <allow pkg="org.apache.kafka.copycat" />
+
+      <subpackage name="rest">
+        <allow pkg="org.eclipse.jetty" />
+        <allow pkg="javax.ws.rs" />
+        <allow pkg="javax.servlet" />
+        <allow pkg="org.glassfish.jersey" />
+        <allow pkg="com.fasterxml.jackson" />
+      </subpackage>
     </subpackage>
 
     <subpackage name="cli">
@@ -177,6 +185,8 @@
 
     <subpackage name="util">
       <allow pkg="org.apache.kafka.copycat" />
+      <!-- for annotations to avoid code duplication -->
+      <allow pkg="com.fasterxml.jackson.annotation" />
     </subpackage>
 
     <subpackage name="json">

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
index 0ff6e81..ca3f76c 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java
@@ -21,14 +21,13 @@ import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.copycat.runtime.Copycat;
 import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.distributed.DistributedConfig;
 import org.apache.kafka.copycat.runtime.distributed.DistributedHerder;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
 import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore;
-import org.apache.kafka.copycat.util.Callback;
-import org.apache.kafka.copycat.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
 import java.util.Properties;
 
 /**
@@ -46,40 +45,22 @@ public class CopycatDistributed {
 
     public static void main(String[] args) throws Exception {
         Properties workerProps;
-        Properties connectorProps;
 
         if (args.length < 1) {
-            log.info("Usage: CopycatDistributed worker.properties [connector1.properties connector2.properties ...]");
+            log.info("Usage: CopycatDistributed worker.properties");
             System.exit(1);
         }
 
         String workerPropsFile = args[0];
         workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
 
-        WorkerConfig workerConfig = new WorkerConfig(workerProps);
-        Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore());
-        DistributedHerder herder = new DistributedHerder(worker, workerConfig.originals());
-        final Copycat copycat = new Copycat(worker, herder);
+        DistributedConfig config = new DistributedConfig(workerProps);
+        Worker worker = new Worker(config, new KafkaOffsetBackingStore());
+        RestServer rest = new RestServer(config);
+        DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl());
+        final Copycat copycat = new Copycat(worker, herder, rest);
         copycat.start();
 
-        try {
-            for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
-                connectorProps = Utils.loadProps(connectorPropsFile);
-                FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
-                    @Override
-                    public void onCompletion(Throwable error, String id) {
-                        if (error != null)
-                            log.error("Failed to create job for {}", connectorPropsFile);
-                    }
-                });
-                herder.addConnector(Utils.propsToStringMap(connectorProps), cb);
-                cb.get();
-            }
-        } catch (Throwable t) {
-            log.error("Stopping after connector error", t);
-            copycat.stop();
-        }
-
         // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
         copycat.awaitStop();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
index 65a15e4..cd4fc96 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java
@@ -19,9 +19,13 @@ package org.apache.kafka.copycat.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
 import org.apache.kafka.copycat.runtime.Copycat;
 import org.apache.kafka.copycat.runtime.Herder;
 import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder;
 import org.apache.kafka.copycat.storage.FileOffsetBackingStore;
 import org.apache.kafka.copycat.util.Callback;
@@ -59,23 +63,28 @@ public class CopycatStandalone {
         String workerPropsFile = args[0];
         workerProps = !workerPropsFile.isEmpty() ? Utils.loadProps(workerPropsFile) : new Properties();
 
-        WorkerConfig workerConfig = new WorkerConfig(workerProps);
-        Worker worker = new Worker(workerConfig, new FileOffsetBackingStore());
+        StandaloneConfig config = new StandaloneConfig(workerProps);
+        Worker worker = new Worker(config, new FileOffsetBackingStore());
+        RestServer rest = new RestServer(config);
         Herder herder = new StandaloneHerder(worker);
-        final Copycat copycat = new Copycat(worker, herder);
+        final Copycat copycat = new Copycat(worker, herder, rest);
         copycat.start();
 
         try {
             for (final String connectorPropsFile : Arrays.copyOfRange(args, 1, args.length)) {
                 connectorProps = Utils.loadProps(connectorPropsFile);
-                FutureCallback<String> cb = new FutureCallback<>(new Callback<String>() {
+                FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>(new Callback<Herder.Created<ConnectorInfo>>() {
                     @Override
-                    public void onCompletion(Throwable error, String id) {
+                    public void onCompletion(Throwable error, Herder.Created<ConnectorInfo> info) {
                         if (error != null)
                             log.error("Failed to create job for {}", connectorPropsFile);
+                        else
+                            log.info("Created connector {}", info.result().name());
                     }
                 });
-                herder.addConnector(Utils.propsToStringMap(connectorProps), cb);
+                herder.putConnectorConfig(
+                        connectorProps.getProperty(ConnectorConfig.NAME_CONFIG),
+                        Utils.propsToStringMap(connectorProps), false, cb);
                 cb.get();
             }
         } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
deleted file mode 100644
index 2a3f539..0000000
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.cli;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ConfigDef;
-import org.apache.kafka.common.config.ConfigDef.Importance;
-import org.apache.kafka.common.config.ConfigDef.Type;
-
-import java.util.Properties;
-
-/**
- * Configuration for standalone workers.
- */
-@InterfaceStability.Unstable
-public class WorkerConfig extends AbstractConfig {
-
-    public static final String CLUSTER_CONFIG = "cluster";
-    private static final String CLUSTER_CONFIG_DOC =
-            "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters "
-                    + "or instances may co-exist while sharing a single Kafka cluster.";
-    public static final String CLUSTER_DEFAULT = "copycat";
-
-    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
-    public static final String BOOTSTRAP_SERVERS_DOC
-            = "A list of host/port pairs to use for establishing the initial connection to the Kafka "
-            + "cluster. The client will make use of all servers irrespective of which servers are "
-            + "specified here for bootstrapping&mdash;this list only impacts the initial hosts used "
-            + "to discover the full set of servers. This list should be in the form "
-            + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the "
-            + "initial connection to discover the full cluster membership (which may change "
-            + "dynamically), this list need not contain the full set of servers (you may want more "
-            + "than one, though, in case a server is down).";
-    public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
-
-    public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
-    public static final String KEY_CONVERTER_CLASS_DOC =
-            "Converter class for key Copycat data that implements the <code>Converter</code> interface.";
-
-    public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
-    public static final String VALUE_CONVERTER_CLASS_DOC =
-            "Converter class for value Copycat data that implements the <code>Converter</code> interface.";
-
-    public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
-    public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
-            "Converter class for internal key Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
-
-    public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
-    public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
-            "Converter class for offset value Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
-
-    public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
-            = "task.shutdown.graceful.timeout.ms";
-    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
-            "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time,"
-                    + " not per task. All task have shutdown triggered, then they are waited on sequentially.";
-    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000";
-
-    public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms";
-    private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
-            = "Interval at which to try committing offsets for tasks.";
-    public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L;
-
-    public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
-    private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
-            = "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
-            + " committed to offset storage before cancelling the process and restoring the offset "
-            + "data to be committed in a future attempt.";
-    public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
-
-    private static ConfigDef config;
-
-    static {
-        config = new ConfigDef()
-                .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
-                .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
-                        Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
-                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
-                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
-                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
-                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
-                        Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
-                .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
-                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
-                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
-                .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
-                        Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
-                .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
-                        Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC);
-    }
-
-    public WorkerConfig() {
-        this(new Properties());
-    }
-
-    public WorkerConfig(Properties props) {
-        super(config, props);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
new file mode 100644
index 0000000..b09cb53
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/AlreadyExistsException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * Indicates the operation tried to create an entity that already exists.
+ */
+public class AlreadyExistsException extends CopycatException {
+    public AlreadyExistsException(String s) {
+        super(s);
+    }
+
+    public AlreadyExistsException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public AlreadyExistsException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
new file mode 100644
index 0000000..a8e13a9
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/errors/NotFoundException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.errors;
+
+/**
+ * Indicates that an operation attempted to modify or delete a connector or task that is not present on the worker.
+ */
+public class NotFoundException extends CopycatException {
+    public NotFoundException(String s) {
+        super(s);
+    }
+
+    public NotFoundException(String s, Throwable throwable) {
+        super(s, throwable);
+    }
+
+    public NotFoundException(Throwable throwable) {
+        super(throwable);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
index e8dfe14..81f0b16 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Copycat.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.copycat.runtime.rest.RestServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,15 +35,17 @@ public class Copycat {
 
     private final Worker worker;
     private final Herder herder;
+    private final RestServer rest;
     private final CountDownLatch startLatch = new CountDownLatch(1);
     private final CountDownLatch stopLatch = new CountDownLatch(1);
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private final ShutdownHook shutdownHook;
 
-    public Copycat(Worker worker, Herder herder) {
+    public Copycat(Worker worker, Herder herder, RestServer rest) {
         log.debug("Copycat created");
         this.worker = worker;
         this.herder = herder;
+        this.rest = rest;
         shutdownHook = new ShutdownHook();
     }
 
@@ -52,6 +55,7 @@ public class Copycat {
 
         worker.start();
         herder.start();
+        rest.start(herder);
 
         log.info("Copycat started");
 
@@ -63,6 +67,7 @@ public class Copycat {
         if (!wasShuttingDown) {
             log.info("Copycat stopping");
 
+            rest.stop();
             herder.stop();
             worker.stop();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
index 31e68ef..0b03c9a 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
@@ -17,9 +17,14 @@
 
 package org.apache.kafka.copycat.runtime;
 
+import org.apache.kafka.copycat.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.copycat.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.copycat.util.Callback;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * <p>
@@ -49,21 +54,39 @@ public interface Herder {
     void stop();
 
     /**
-     * Submit a connector job to the cluster. This works from any node by forwarding the request to
-     * the leader herder if necessary.
+     * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
+     * from the current configuration. However, note
      *
-     * @param connectorProps user-specified properties for this job
-     * @param callback       callback to invoke when the request completes
+     * @returns A list of connector names
+     * @throws org.apache.kafka.copycat.runtime.distributed.NotLeaderException if this node can not resolve the request
+     *         (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is
+     *         also not the leader
+     * @throws org.apache.kafka.copycat.errors.CopycatException if this node is the leader, but still cannot resolve the
+     *         request (e.g., it is not in sync with other worker's config state)
      */
-    void addConnector(Map<String, String> connectorProps, Callback<String> callback);
+    void connectors(Callback<Collection<String>> callback);
 
     /**
-     * Delete a connector job by name.
-     *
-     * @param name     name of the connector job to shutdown and delete
-     * @param callback callback to invoke when the request completes
+     * Get the definition and status of a connector.
+     */
+    void connectorInfo(String connName, Callback<ConnectorInfo> callback);
+
+    /**
+     * Get the configuration for a connector.
+     * @param connName name of the connector
+     * @param callback callback to invoke with the configuration
+     */
+    void connectorConfig(String connName, Callback<Map<String, String>> callback);
+
+    /**
+     * Set the configuration for a connector. This supports creation, update, and deletion.
+     * @param connName name of the connector
+     * @param config the connectors configuration, or null if deleting the connector
+     * @param allowReplace if true, allow overwriting previous configs; if false, throw AlreadyExistsException if a connector
+     *                     with the same name already exists
+     * @param callback callback to invoke when the configuration has been written
      */
-    void deleteConnector(String name, Callback<Void> callback);
+    void putConnectorConfig(String connName, Map<String, String> config, boolean allowReplace, Callback<Created<ConnectorInfo>> callback);
 
     /**
      * Requests reconfiguration of the task. This should only be triggered by
@@ -73,4 +96,53 @@ public interface Herder {
      */
     void requestTaskReconfiguration(String connName);
 
+    /**
+     * Get the configurations for the current set of tasks of a connector.
+     * @param connName connector to update
+     * @param callback callback to invoke upon completion
+     */
+    void taskConfigs(String connName, Callback<List<TaskInfo>> callback);
+
+    /**
+     * Set the configurations for the tasks of a connector. This should always include all tasks in the connector; if
+     * there are existing configurations and fewer are provided, this will reduce the number of tasks, and if more are
+     * provided it will increase the number of tasks.
+     * @param connName connector to update
+     * @param configs list of configurations
+     * @param callback callback to invoke upon completion
+     */
+    void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
+
+
+    class Created<T> {
+        private final boolean created;
+        private final T result;
+
+        public Created(boolean created, T result) {
+            this.created = created;
+            this.result = result;
+        }
+
+        public boolean created() {
+            return created;
+        }
+
+        public T result() {
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            Created<?> created1 = (Created<?>) o;
+            return Objects.equals(created, created1.created) &&
+                    Objects.equals(result, created1.result);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(created, result);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
index 953cfa5..20a79ca 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
index b37e49f..de9f533 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.connector.Connector;
 import org.apache.kafka.copycat.connector.ConnectorContext;
 import org.apache.kafka.copycat.connector.Task;
@@ -35,6 +34,7 @@ import org.apache.kafka.copycat.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -150,6 +150,10 @@ public class Worker {
         log.info("Worker stopped");
     }
 
+    public WorkerConfig config() {
+        return config;
+    }
+
     /**
      * Add a new connector.
      * @param connConfig connector configuration
@@ -196,24 +200,21 @@ public class Worker {
         }
     }
 
-    public Map<ConnectorTaskId, Map<String, String>> reconfigureConnectorTasks(String connName, int maxTasks, List<String> sinkTopics) {
+    public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
         log.trace("Reconfiguring connector tasks for {}", connName);
 
         Connector connector = connectors.get(connName);
         if (connector == null)
             throw new CopycatException("Connector " + connName + " not found in this worker.");
 
-        Map<ConnectorTaskId, Map<String, String>> result = new HashMap<>();
+        List<Map<String, String>> result = new ArrayList<>();
         String taskClassName = connector.taskClass().getName();
-        int index = 0;
         for (Properties taskProps : connector.taskConfigs(maxTasks)) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
-            index++;
             Map<String, String> taskConfig = Utils.propsToStringMap(taskProps);
             taskConfig.put(TaskConfig.TASK_CLASS_CONFIG, taskClassName);
             if (sinkTopics != null)
                 taskConfig.put(SinkTask.TOPICS_CONFIG, Utils.join(sinkTopics, ","));
-            result.put(taskId, taskConfig);
+            result.add(taskConfig);
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
new file mode 100644
index 0000000..74aadb9
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerConfig.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Properties;
+
+/**
+ * Common base class providing configuration for Copycat workers, whether standalone or distributed.
+ */
+@InterfaceStability.Unstable
+public class WorkerConfig extends AbstractConfig {
+
+    public static final String CLUSTER_CONFIG = "cluster";
+    private static final String CLUSTER_CONFIG_DOC =
+            "ID for this cluster, which is used to provide a namespace so multiple Copycat clusters "
+                    + "or instances may co-exist while sharing a single Kafka cluster.";
+    public static final String CLUSTER_DEFAULT = "copycat";
+
+    public static final String BOOTSTRAP_SERVERS_CONFIG = "bootstrap.servers";
+    public static final String BOOTSTRAP_SERVERS_DOC
+            = "A list of host/port pairs to use for establishing the initial connection to the Kafka "
+            + "cluster. The client will make use of all servers irrespective of which servers are "
+            + "specified here for bootstrapping&mdash;this list only impacts the initial hosts used "
+            + "to discover the full set of servers. This list should be in the form "
+            + "<code>host1:port1,host2:port2,...</code>. Since these servers are just used for the "
+            + "initial connection to discover the full cluster membership (which may change "
+            + "dynamically), this list need not contain the full set of servers (you may want more "
+            + "than one, though, in case a server is down).";
+    public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
+
+    public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
+    public static final String KEY_CONVERTER_CLASS_DOC =
+            "Converter class for key Copycat data that implements the <code>Converter</code> interface.";
+
+    public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter";
+    public static final String VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for value Copycat data that implements the <code>Converter</code> interface.";
+
+    public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
+    public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
+            "Converter class for internal key Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
+
+    public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter";
+    public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC =
+            "Converter class for offset value Copycat data that implements the <code>Converter</code> interface. Used for converting data like offsets and configs.";
+
+    public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG
+            = "task.shutdown.graceful.timeout.ms";
+    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC =
+            "Amount of time to wait for tasks to shutdown gracefully. This is the total amount of time,"
+                    + " not per task. All task have shutdown triggered, then they are waited on sequentially.";
+    private static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT = "5000";
+
+    public static final String OFFSET_COMMIT_INTERVAL_MS_CONFIG = "offset.flush.interval.ms";
+    private static final String OFFSET_COMMIT_INTERVAL_MS_DOC
+            = "Interval at which to try committing offsets for tasks.";
+    public static final long OFFSET_COMMIT_INTERVAL_MS_DEFAULT = 60000L;
+
+    public static final String OFFSET_COMMIT_TIMEOUT_MS_CONFIG = "offset.flush.timeout.ms";
+    private static final String OFFSET_COMMIT_TIMEOUT_MS_DOC
+            = "Maximum number of milliseconds to wait for records to flush and partition offset data to be"
+            + " committed to offset storage before cancelling the process and restoring the offset "
+            + "data to be committed in a future attempt.";
+    public static final long OFFSET_COMMIT_TIMEOUT_MS_DEFAULT = 5000L;
+
+    public static final String REST_HOST_NAME_CONFIG = "rest.host.name";
+    private static final String REST_HOST_NAME_DOC
+            = "Hostname for the REST API. If this is set, it will only bind to this interface.";
+
+    public static final String REST_PORT_CONFIG = "rest.port";
+    private static final String REST_PORT_DOC
+            = "Port for the REST API to listen on.";
+    public static final int REST_PORT_DEFAULT = 8083;
+
+    public static final String REST_ADVERTISED_HOST_NAME_CONFIG = "rest.advertised.host.name";
+    private static final String REST_ADVERTISED_HOST_NAME_DOC
+            = "If this is set, this is the hostname that will be given out to other workers to connect to.";
+
+    public static final String REST_ADVERTISED_PORT_CONFIG = "rest.advertised.port";
+    private static final String REST_ADVERTISED_PORT_DOC
+            = "If this is set, this is the port that will be given out to other workers to connect to.";
+
+    /**
+     * Get a basic ConfigDef for a WorkerConfig. This includes all the common settings. Subclasses can use this to
+     * bootstrap their own ConfigDef.
+     * @return a ConfigDef with all the common options specified
+     */
+    protected static ConfigDef baseConfigDef() {
+        return new ConfigDef()
+                .define(CLUSTER_CONFIG, Type.STRING, CLUSTER_DEFAULT, Importance.HIGH, CLUSTER_CONFIG_DOC)
+                .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT,
+                        Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
+                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, VALUE_CONVERTER_CLASS_DOC)
+                .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC)
+                .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC)
+                .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG,
+                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW,
+                        TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC)
+                .define(OFFSET_COMMIT_INTERVAL_MS_CONFIG, Type.LONG, OFFSET_COMMIT_INTERVAL_MS_DEFAULT,
+                        Importance.LOW, OFFSET_COMMIT_INTERVAL_MS_DOC)
+                .define(OFFSET_COMMIT_TIMEOUT_MS_CONFIG, Type.LONG, OFFSET_COMMIT_TIMEOUT_MS_DEFAULT,
+                        Importance.LOW, OFFSET_COMMIT_TIMEOUT_MS_DOC)
+                .define(REST_HOST_NAME_CONFIG, Type.STRING, Importance.LOW, REST_HOST_NAME_DOC, false)
+                .define(REST_PORT_CONFIG, Type.INT, REST_PORT_DEFAULT, Importance.LOW, REST_PORT_DOC)
+                .define(REST_ADVERTISED_HOST_NAME_CONFIG, Type.STRING, Importance.LOW, REST_ADVERTISED_HOST_NAME_DOC, false)
+                .define(REST_ADVERTISED_PORT_CONFIG, Type.INT, Importance.LOW, REST_ADVERTISED_PORT_DOC, false);
+    }
+
+    public WorkerConfig(ConfigDef definition, Properties props) {
+        super(definition, props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
index 3c5cd13..70b99d0 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.data.SchemaAndValue;
 import org.apache.kafka.copycat.errors.CopycatException;
 import org.apache.kafka.copycat.errors.IllegalWorkerStateException;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
index 41e38f0..0e28c97 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
@@ -18,7 +18,6 @@
 package org.apache.kafka.copycat.runtime;
 
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.util.ShutdownableThread;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
index ee0a532..ea9e6b5 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -22,7 +22,6 @@ import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.copycat.cli.WorkerConfig;
 import org.apache.kafka.copycat.source.SourceRecord;
 import org.apache.kafka.copycat.source.SourceTask;
 import org.apache.kafka.copycat.source.SourceTaskContext;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
index a46141e..f204750 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java
@@ -19,8 +19,9 @@ package org.apache.kafka.copycat.runtime.distributed;
 
 import org.apache.kafka.copycat.util.ConnectorTaskId;
 
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -99,15 +100,15 @@ public class ClusterConfigState {
      * @param connectorName the name of the connector to look up task configs for
      * @return the current set of connector task IDs
      */
-    public Set<ConnectorTaskId> tasks(String connectorName) {
+    public List<ConnectorTaskId> tasks(String connectorName) {
         if (inconsistentConnectors.contains(connectorName))
-            return Collections.emptySet();
+            return Collections.emptyList();
 
         Integer numTasks = connectorTaskCounts.get(connectorName);
         if (numTasks == null)
-            return Collections.emptySet();
+            return Collections.emptyList();
 
-        Set<ConnectorTaskId> taskIds = new HashSet<>();
+        List<ConnectorTaskId> taskIds = new ArrayList<>();
         for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) {
             ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex);
             taskIds.add(taskId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
index a450b1d..e924f77 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/CopycatProtocol.java
@@ -38,9 +38,11 @@ import java.util.Map;
  */
 public class CopycatProtocol {
     public static final String VERSION_KEY_NAME = "version";
+    public static final String URL_KEY_NAME = "url";
     public static final String CONFIG_OFFSET_KEY_NAME = "config-offset";
     public static final String CONNECTOR_KEY_NAME = "connector";
     public static final String LEADER_KEY_NAME = "leader";
+    public static final String LEADER_URL_KEY_NAME = "leader-url";
     public static final String ERROR_KEY_NAME = "error";
     public static final String TASKS_KEY_NAME = "tasks";
     public static final String ASSIGNMENT_KEY_NAME = "assignment";
@@ -53,7 +55,9 @@ public class CopycatProtocol {
             .set(VERSION_KEY_NAME, COPYCAT_PROTOCOL_V0);
 
     public static final Schema CONFIG_STATE_V0 = new Schema(
+            new Field(URL_KEY_NAME, Type.STRING),
             new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64));
+
     // Assignments for each worker are a set of connectors and tasks. These are categorized by connector ID. A sentinel
     // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. that the assignment includes
     // responsibility for running the Connector instance in addition to any tasks it generates).
@@ -63,12 +67,14 @@ public class CopycatProtocol {
     public static final Schema ASSIGNMENT_V0 = new Schema(
             new Field(ERROR_KEY_NAME, Type.INT16),
             new Field(LEADER_KEY_NAME, Type.STRING),
+            new Field(LEADER_URL_KEY_NAME, Type.STRING),
             new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
             new Field(ASSIGNMENT_KEY_NAME, new ArrayOf(CONNECTOR_ASSIGNMENT_V0)));
 
-    public static ByteBuffer serializeMetadata(ConfigState configState) {
+    public static ByteBuffer serializeMetadata(WorkerState workerState) {
         Struct struct = new Struct(CONFIG_STATE_V0);
-        struct.set(CONFIG_OFFSET_KEY_NAME, configState.offset());
+        struct.set(URL_KEY_NAME, workerState.url());
+        struct.set(CONFIG_OFFSET_KEY_NAME, workerState.offset());
         ByteBuffer buffer = ByteBuffer.allocate(COPYCAT_PROTOCOL_HEADER_V0.sizeOf() + CONFIG_STATE_V0.sizeOf(struct));
         COPYCAT_PROTOCOL_HEADER_V0.writeTo(buffer);
         CONFIG_STATE_V0.write(buffer, struct);
@@ -76,19 +82,21 @@ public class CopycatProtocol {
         return buffer;
     }
 
-    public static ConfigState deserializeMetadata(ByteBuffer buffer) {
+    public static WorkerState deserializeMetadata(ByteBuffer buffer) {
         Struct header = (Struct) COPYCAT_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
         checkVersionCompatibility(version);
         Struct struct = (Struct) CONFIG_STATE_V0.read(buffer);
         long configOffset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
-        return new ConfigState(configOffset);
+        String url = struct.getString(URL_KEY_NAME);
+        return new WorkerState(url, configOffset);
     }
 
     public static ByteBuffer serializeAssignment(Assignment assignment) {
         Struct struct = new Struct(ASSIGNMENT_V0);
         struct.set(ERROR_KEY_NAME, assignment.error());
         struct.set(LEADER_KEY_NAME, assignment.leader());
+        struct.set(LEADER_URL_KEY_NAME, assignment.leaderUrl());
         struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset());
         List<Struct> taskAssignments = new ArrayList<>();
         for (Map.Entry<String, List<Integer>> connectorEntry : assignment.asMap().entrySet()) {
@@ -114,6 +122,7 @@ public class CopycatProtocol {
         Struct struct = (Struct) ASSIGNMENT_V0.read(buffer);
         short error = struct.getShort(ERROR_KEY_NAME);
         String leader = struct.getString(LEADER_KEY_NAME);
+        String leaderUrl = struct.getString(LEADER_URL_KEY_NAME);
         long offset = struct.getLong(CONFIG_OFFSET_KEY_NAME);
         List<String> connectorIds = new ArrayList<>();
         List<ConnectorTaskId> taskIds = new ArrayList<>();
@@ -128,24 +137,31 @@ public class CopycatProtocol {
                     taskIds.add(new ConnectorTaskId(connector, taskId));
             }
         }
-        return new Assignment(error, leader, offset, connectorIds, taskIds);
+        return new Assignment(error, leader, leaderUrl, offset, connectorIds, taskIds);
     }
 
-    public static class ConfigState {
+    public static class WorkerState {
+        private final String url;
         private final long offset;
 
-        public ConfigState(long offset) {
+        public WorkerState(String url, long offset) {
+            this.url = url;
             this.offset = offset;
         }
 
+        public String url() {
+            return url;
+        }
+
         public long offset() {
             return offset;
         }
 
         @Override
         public String toString() {
-            return "ConfigState{" +
-                    "offset=" + offset +
+            return "WorkerState{" +
+                    "url='" + url + '\'' +
+                    ", offset=" + offset +
                     '}';
         }
     }
@@ -158,6 +174,7 @@ public class CopycatProtocol {
 
         private final short error;
         private final String leader;
+        private final String leaderUrl;
         private final long offset;
         private final List<String> connectorIds;
         private final List<ConnectorTaskId> taskIds;
@@ -167,10 +184,11 @@ public class CopycatProtocol {
          * @param connectorIds list of connectors that the worker should instantiate and run
          * @param taskIds list of task IDs that the worker should instantiate and run
          */
-        public Assignment(short error, String leader, long configOffset,
+        public Assignment(short error, String leader, String leaderUrl, long configOffset,
                           List<String> connectorIds, List<ConnectorTaskId> taskIds) {
             this.error = error;
             this.leader = leader;
+            this.leaderUrl = leaderUrl;
             this.offset = configOffset;
             this.taskIds = taskIds;
             this.connectorIds = connectorIds;
@@ -184,6 +202,10 @@ public class CopycatProtocol {
             return leader;
         }
 
+        public String leaderUrl() {
+            return leaderUrl;
+        }
+
         public boolean failed() {
             return error != NO_ERROR;
         }
@@ -205,6 +227,7 @@ public class CopycatProtocol {
             return "Assignment{" +
                     "error=" + error +
                     ", leader='" + leader + '\'' +
+                    ", leaderUrl='" + leaderUrl + '\'' +
                     ", offset=" + offset +
                     ", connectorIds=" + connectorIds +
                     ", taskIds=" + taskIds +

http://git-wip-us.apache.org/repos/asf/kafka/blob/c001b204/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
new file mode 100644
index 0000000..7fe6691
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedConfig.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.copycat.runtime.WorkerConfig;
+
+import java.util.Properties;
+
+import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+
+public class DistributedConfig extends WorkerConfig {
+    private static final ConfigDef CONFIG;
+
+    /*
+     * NOTE: DO NOT CHANGE EITHER CONFIG STRINGS OR THEIR JAVA VARIABLE NAMES AS
+     * THESE ARE PART OF THE PUBLIC API AND CHANGE WILL BREAK USER CODE.
+     */
+
+    /**
+     * <code>group.id</code>
+     */
+    public static final String GROUP_ID_CONFIG = "group.id";
+    private static final String GROUP_ID_DOC = "A unique string that identifies the Copycat cluster group this worker belongs to.";
+
+    /**
+     * <code>session.timeout.ms</code>
+     */
+    public static final String SESSION_TIMEOUT_MS_CONFIG = "session.timeout.ms";
+    private static final String SESSION_TIMEOUT_MS_DOC = "The timeout used to detect failures when using Kafka's group management facilities.";
+
+    /**
+     * <code>heartbeat.interval.ms</code>
+     */
+    public static final String HEARTBEAT_INTERVAL_MS_CONFIG = "heartbeat.interval.ms";
+    private static final String HEARTBEAT_INTERVAL_MS_DOC = "The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the worker's session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than <code>session.timeout.ms</code>, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.";
+
+    /**
+     * <code>worker.sync.timeout.ms</code>
+     */
+    public static final String WORKER_SYNC_TIMEOUT_MS_CONFIG = "worker.sync.timeout.ms";
+    private static final String WORKER_SYNC_TIMEOUT_MS_DOC = "When the worker is out of sync with other workers and needs" +
+            " to resynchronize configurations, wait up to this amount of time before giving up, leaving the group, and" +
+            " waiting a backoff period before rejoining.";
+
+    /**
+     * <code>group.unsync.timeout.ms</code>
+     */
+    public static final String WORKER_UNSYNC_BACKOFF_MS_CONFIG = "worker.unsync.backoff.ms";
+    private static final String WORKER_UNSYNC_BACKOFF_MS_DOC = "When the worker is out of sync with other workers and " +
+            " fails to catch up within worker.sync.timeout.ms, leave the Copycat cluster for this long before rejoining.";
+    public static final int WORKER_UNSYNC_BACKOFF_MS_DEFAULT = 5 * 60 * 1000;
+
+    static {
+        CONFIG = baseConfigDef()
+                .define(GROUP_ID_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, GROUP_ID_DOC)
+                .define(SESSION_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        30000,
+                        ConfigDef.Importance.HIGH,
+                        SESSION_TIMEOUT_MS_DOC)
+                .define(HEARTBEAT_INTERVAL_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.HIGH,
+                        HEARTBEAT_INTERVAL_MS_DOC)
+                .define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
+                        ConfigDef.Type.LONG,
+                        5 * 60 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METADATA_MAX_AGE_DOC)
+                .define(CommonClientConfigs.CLIENT_ID_CONFIG,
+                        ConfigDef.Type.STRING,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.CLIENT_ID_DOC)
+                .define(CommonClientConfigs.SEND_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        128 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.SEND_BUFFER_DOC)
+                .define(CommonClientConfigs.RECEIVE_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        32 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.RECEIVE_BUFFER_DOC)
+                .define(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        100L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
+                .define(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        30000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG,
+                        ConfigDef.Type.INT,
+                        2,
+                        atLeast(1),
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG,
+                        ConfigDef.Type.LIST,
+                        "",
+                        ConfigDef.Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL, ConfigDef.Importance.MEDIUM, CommonClientConfigs.SECURITY_PROTOCOL_DOC)
+                .define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC)
+                .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC)
+                .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC, false)
+                .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC, false)
+                .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_KEYSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEYSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEY_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_KEY_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTSTORE_TYPE, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_TRUSTSTORE_TYPE_DOC)
+                .define(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_LOCATION_DOC, false)
+                .define(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, SslConfigs.SSL_TRUSTSTORE_PASSWORD_DOC, false)
+                .define(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_KEYMANGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_KEYMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_TRUSTMANAGER_ALGORITHM, ConfigDef.Importance.LOW, SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_DOC)
+                .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, ConfigDef.Type.STRING, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_SERVICE_NAME_DOC, false)
+                .define(SaslConfigs.SASL_KERBEROS_KINIT_CMD, ConfigDef.Type.STRING, SaslConfigs.DEFAULT_KERBEROS_KINIT_CMD, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_KINIT_CMD_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_WINDOW_FACTOR, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Type.DOUBLE, SaslConfigs.DEFAULT_KERBEROS_TICKET_RENEW_JITTER, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Type.LONG, SaslConfigs.DEFAULT_KERBEROS_MIN_TIME_BEFORE_RELOGIN, ConfigDef.Importance.LOW, SaslConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_DOC)
+                .define(SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Type.LIST, SaslConfigs.DEFAULT_SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES, ConfigDef.Importance.MEDIUM, SaslConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_DOC)
+                .define(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        40 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+                        /* default is set to be a bit lower than the server default (10 min), to avoid both client and server closing connection at same time */
+                .define(CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        9 * 60 * 1000,
+                        ConfigDef.Importance.MEDIUM,
+                        CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
+                .define(WORKER_SYNC_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        3000,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_SYNC_TIMEOUT_MS_DOC)
+                .define(WORKER_UNSYNC_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        WORKER_UNSYNC_BACKOFF_MS_DEFAULT,
+                        ConfigDef.Importance.MEDIUM,
+                        WORKER_UNSYNC_BACKOFF_MS_DOC);
+    }
+
+    public DistributedConfig(Properties props) {
+        super(CONFIG, props);
+    }
+
+}