You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/08/19 03:56:36 UTC

kafka git commit: KAFKA-3845: KIP-75: Add per-connector converters

Repository: kafka
Updated Branches:
  refs/heads/trunk e3ede8434 -> 05ed54bf2


KAFKA-3845: KIP-75: Add per-connector converters

Author: Ewen Cheslack-Postava <me...@ewencp.org>

Reviewers: Shikhar Bhushan, Gwen Shapira

Closes #1721 from ewencp/kafka-3845-per-connector-converters


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/05ed54bf
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/05ed54bf
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/05ed54bf

Branch: refs/heads/trunk
Commit: 05ed54bf2b80691d413dbfa05065eb3afe33972f
Parents: e3ede84
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Thu Aug 18 20:56:31 2016 -0700
Committer: Gwen Shapira <cs...@gmail.com>
Committed: Thu Aug 18 20:56:31 2016 -0700

----------------------------------------------------------------------
 build.gradle                                    |   2 +-
 .../kafka/connect/runtime/ConnectorConfig.java  |  12 +-
 .../apache/kafka/connect/runtime/Worker.java    |  31 +++--
 .../runtime/distributed/DistributedHerder.java  |   6 +-
 .../runtime/standalone/StandaloneConfig.java    |   2 +-
 .../runtime/standalone/StandaloneHerder.java    |  15 ++-
 .../kafka/connect/runtime/WorkerTest.java       | 113 +++++++++++++++++-
 .../distributed/DistributedHerderTest.java      |  20 ++--
 .../standalone/StandaloneHerderTest.java        | 117 ++++++++++---------
 docs/connect.html                               |  16 ++-
 .../tests/connect/connect_rest_test.py          |   4 +-
 tests/kafkatest/tests/connect/connect_test.py   |  13 ++-
 .../templates/connect-file-sink.properties      |  10 +-
 .../templates/connect-file-source.properties    |  10 +-
 14 files changed, 272 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 6edcfdd..3214108 100644
--- a/build.gradle
+++ b/build.gradle
@@ -874,8 +874,8 @@ project(':connect:runtime') {
     testCompile libs.junit
     testCompile libs.powermock
     testCompile libs.powermockEasymock
+    testCompile project(":connect:json")
 
-    testRuntime project(":connect:json")
     testRuntime libs.slf4jlog4j
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 9569b4b..30869a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -53,6 +53,14 @@ public class ConnectorConfig extends AbstractConfig {
                     " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
     private static final String CONNECTOR_CLASS_DISPLAY = "Connector class";
 
+    public static final String KEY_CONVERTER_CLASS_CONFIG = WorkerConfig.KEY_CONVERTER_CLASS_CONFIG;
+    public static final String KEY_CONVERTER_CLASS_DOC = WorkerConfig.KEY_CONVERTER_CLASS_DOC;
+    public static final String KEY_CONVERTER_CLASS_DISPLAY = "Key converter class";
+
+    public static final String VALUE_CONVERTER_CLASS_CONFIG = WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG;
+    public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
+    public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
+
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
     public static final int TASKS_MAX_DEFAULT = 1;
@@ -64,7 +72,9 @@ public class ConnectorConfig extends AbstractConfig {
         return new ConfigDef()
             .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
             .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
-            .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT,  atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY);
+            .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
+            .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
+            .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY);
     }
 
     public ConnectorConfig() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index a0664ad..d39806a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -66,8 +66,8 @@ public class Worker {
     private final Time time;
     private final String workerId;
     private final WorkerConfig config;
-    private final Converter keyConverter;
-    private final Converter valueConverter;
+    private final Converter defaultKeyConverter;
+    private final Converter defaultValueConverter;
     private final Converter internalKeyConverter;
     private final Converter internalValueConverter;
     private final OffsetBackingStore offsetBackingStore;
@@ -85,10 +85,10 @@ public class Worker {
         this.workerId = workerId;
         this.time = time;
         this.config = config;
-        this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
-        this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true);
-        this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
-        this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false);
+        this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
+        this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+        this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
         this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class);
         this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true);
         this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
@@ -302,11 +302,13 @@ public class Worker {
      * Add a new task.
      * @param id Globally unique ID for this task.
      * @param taskConfig the parsed task configuration
+     * @param connConfig the parsed connector configuration
      * @param statusListener listener for notifications of task status changes
      * @param initialState the initial target state that the task should be initialized to
      */
     public void startTask(ConnectorTaskId id,
                           TaskConfig taskConfig,
+                          ConnectorConfig connConfig,
                           TaskStatus.Listener statusListener,
                           TargetState initialState) {
         log.info("Creating task {}", id);
@@ -322,7 +324,18 @@ public class Worker {
         final Task task = instantiateTask(taskClass);
         log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
-        final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState);
+        Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+        if (keyConverter != null)
+            keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
+        else
+            keyConverter = defaultKeyConverter;
+        Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+        if (valueConverter != null)
+            valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
+        else
+            valueConverter = defaultValueConverter;
+
+        final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
 
         // Start the task before adding modifying any state, any exceptions are caught higher up the
         // call chain and there's no cleanup to do here
@@ -339,7 +352,9 @@ public class Worker {
     private WorkerTask buildWorkerTask(ConnectorTaskId id,
                                        Task task,
                                        TaskStatus.Listener statusListener,
-                                       TargetState initialState) {
+                                       TargetState initialState,
+                                       Converter keyConverter,
+                                       Converter valueConverter) {
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index afabbeb..6232187 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -772,9 +772,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private void startTask(ConnectorTaskId taskId) {
         log.info("Starting task {}", taskId);
         TargetState initialState = configState.targetState(taskId.connector());
-        Map<String, String> configs = configState.taskConfig(taskId);
-        TaskConfig taskConfig = new TaskConfig(configs);
-        worker.startTask(taskId, taskConfig, this, initialState);
+        TaskConfig taskConfig = new TaskConfig(configState.taskConfig(taskId));
+        ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
+        worker.startTask(taskId, taskConfig, connConfig, this, initialState);
     }
 
     // Helper for starting a connector with the given name, which will extract & parse the config, generate connector

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
index 8014b3a..5637e05 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java
@@ -29,7 +29,7 @@ public class StandaloneConfig extends WorkerConfig {
      * <code>offset.storage.file.filename</code>
      */
     public static final String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
-    private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "file to store offset data in";
+    private static final String OFFSET_STORAGE_FILE_FILENAME_DOC = "File to store offset data in";
 
     static {
         CONFIG = baseConfigDef()

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 8dbda74..cac8d18 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -206,14 +206,16 @@ public class StandaloneHerder extends AbstractHerder {
         if (!configState.contains(taskId.connector()))
             cb.onCompletion(new NotFoundException("Connector " + taskId.connector() + " not found", null), null);
 
-        Map<String, String> taskConfig = configState.taskConfig(taskId);
-        if (taskConfig == null)
+        Map<String, String> taskConfigProps = configState.taskConfig(taskId);
+        if (taskConfigProps == null)
             cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null);
+        TaskConfig taskConfig = new TaskConfig(taskConfigProps);
+        ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
 
         TargetState targetState = configState.targetState(taskId.connector());
         try {
             worker.stopAndAwaitTask(taskId);
-            worker.startTask(taskId, new TaskConfig(taskConfig), this, targetState);
+            worker.startTask(taskId, taskConfig, connConfig, this, targetState);
             cb.onCompletion(null, null);
         } catch (Exception e) {
             log.error("Failed to restart task {}", taskId, e);
@@ -270,11 +272,14 @@ public class StandaloneHerder extends AbstractHerder {
     }
 
     private void createConnectorTasks(String connName, TargetState initialState) {
+        Map<String, String> connConfigs = configState.connectorConfig(connName);
+        ConnectorConfig connConfig = new ConnectorConfig(connConfigs);
+
         for (ConnectorTaskId taskId : configState.tasks(connName)) {
             Map<String, String> taskConfigMap = configState.taskConfig(taskId);
-            TaskConfig config = new TaskConfig(taskConfigMap);
+            TaskConfig taskConfig = new TaskConfig(taskConfigMap);
             try {
-                worker.startTask(taskId, config, this, initialState);
+                worker.startTask(taskId, taskConfig, connConfig, this, initialState);
             } catch (Throwable e) {
                 log.error("Failed to add task {}: ", taskId, e);
                 // Swallow this so we can continue updating the rest of the tasks

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index ec4f025..f9839f5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -23,7 +23,10 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
@@ -35,6 +38,7 @@ import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
+import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
@@ -352,8 +356,8 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(task),
                 EasyMock.anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
-                EasyMock.anyObject(Converter.class),
-                EasyMock.anyObject(Converter.class),
+                EasyMock.anyObject(JsonConverter.class),
+                EasyMock.anyObject(JsonConverter.class),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
@@ -380,7 +384,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertEquals(Collections.emptySet(), worker.taskIds());
@@ -420,8 +424,8 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(task),
                 EasyMock.anyObject(TaskStatus.Listener.class),
                 EasyMock.eq(TargetState.STARTED),
-                EasyMock.anyObject(Converter.class),
-                EasyMock.anyObject(Converter.class),
+                EasyMock.anyObject(JsonConverter.class),
+                EasyMock.anyObject(JsonConverter.class),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
@@ -449,12 +453,79 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
         worker.stop();
 
         PowerMock.verifyAll();
     }
 
+    @Test
+    public void testConverterOverrides() throws Exception {
+        expectStartStorage();
+
+        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
+        WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+        EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
+
+        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
+        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
+        EasyMock.expect(task.version()).andReturn("1.0");
+
+        Capture<TestConverter> keyConverter = EasyMock.newCapture();
+        Capture<TestConverter> valueConverter = EasyMock.newCapture();
+
+        PowerMock.expectNew(
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+                EasyMock.eq(task),
+                EasyMock.anyObject(TaskStatus.Listener.class),
+                EasyMock.eq(TargetState.STARTED),
+                EasyMock.capture(keyConverter),
+                EasyMock.capture(valueConverter),
+                EasyMock.anyObject(KafkaProducer.class),
+                EasyMock.anyObject(OffsetStorageReader.class),
+                EasyMock.anyObject(OffsetStorageWriter.class),
+                EasyMock.anyObject(WorkerConfig.class),
+                EasyMock.anyObject(Time.class))
+                .andReturn(workerTask);
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+        workerTask.initialize(new TaskConfig(origProps));
+        EasyMock.expectLastCall();
+        workerTask.run();
+        EasyMock.expectLastCall();
+
+        // Remove
+        workerTask.stop();
+        EasyMock.expectLastCall();
+        EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
+        EasyMock.expectLastCall();
+
+        expectStopStorage();
+
+        PowerMock.replayAll();
+
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker.start();
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        Map<String, String> connProps = anyConnectorConfigMap();
+        connProps.put(ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        connProps.put("key.converter.extra.config", "foo");
+        connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
+        connProps.put("value.converter.extra.config", "bar");
+        worker.startTask(TASK_ID, new TaskConfig(origProps), new ConnectorConfig(connProps), taskStatusListener, TargetState.STARTED);
+        assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
+        worker.stopAndAwaitTask(TASK_ID);
+        assertEquals(Collections.emptySet(), worker.taskIds());
+        // Nothing should be left, so this should effectively be a nop
+        worker.stop();
+
+        // Validate extra configs got passed through to overridden converters
+        assertEquals("foo", keyConverter.getValue().configs.get("extra.config"));
+        assertEquals("bar", valueConverter.getValue().configs.get("extra.config"));
+
+        PowerMock.verifyAll();
+    }
+
     private void expectStartStorage() {
         offsetBackingStore.configure(EasyMock.anyObject(WorkerConfig.class));
         EasyMock.expectLastCall();
@@ -467,6 +538,17 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expectLastCall();
     }
 
+    private Map<String, String> anyConnectorConfigMap() {
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        return props;
+    }
+
+    private ConnectorConfig anyConnectorConfig() {
+        return new ConnectorConfig(anyConnectorConfigMap());
+    }
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class WorkerTestConnector extends Connector {
@@ -527,4 +609,23 @@ public class WorkerTest extends ThreadedTest {
         public void stop() {
         }
     }
+
+    public static class TestConverter implements Converter {
+        public Map<String, ?> configs;
+
+        @Override
+        public void configure(Map<String, ?> configs, boolean isKey) {
+            this.configs = configs;
+        }
+
+        @Override
+        public byte[] fromConnectData(String topic, Schema schema, Object value) {
+            return new byte[0];
+        }
+
+        @Override
+        public SchemaAndValue toConnectData(String topic, byte[] value) {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 747db1a..8fc6dbd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -175,7 +175,7 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -198,7 +198,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -234,7 +234,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -530,7 +530,7 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
 
         // now handle the task restart
@@ -545,7 +545,7 @@ public class DistributedHerderTest {
 
         worker.stopAndAwaitTask(TASK0);
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
 
         PowerMock.replayAll();
@@ -841,7 +841,7 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -877,7 +877,7 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -918,7 +918,7 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Collections.<String>emptyList(), Collections.singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -970,7 +970,7 @@ public class DistributedHerderTest {
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
                 Arrays.asList(TASK0));
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -1008,7 +1008,7 @@ public class DistributedHerderTest {
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.<ConnectorConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         member.poll(EasyMock.anyInt());

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index e70b968..3772586 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -73,6 +73,10 @@ public class StandaloneHerderTest {
     private static final int DEFAULT_MAX_TASKS = 1;
     private static final String WORKER_ID = "localhost:8083";
 
+    private enum SourceSink {
+        SOURCE, SINK
+    };
+
     private StandaloneHerder herder;
 
     private Connector connector;
@@ -88,11 +92,11 @@ public class StandaloneHerderTest {
     @Test
     public void testCreateSourceConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -101,7 +105,7 @@ public class StandaloneHerderTest {
     public void testCreateConnectorAlreadyExists() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         // First addition should succeed
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         // Second should fail
         createCallback.onCompletion(EasyMock.<AlreadyExistsException>anyObject(), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
@@ -109,8 +113,8 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -118,11 +122,11 @@ public class StandaloneHerderTest {
     @Test
     public void testCreateSinkConnector() throws Exception {
         connector = PowerMock.createMock(BogusSinkConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSinkConnector.class, BogusSinkTask.class, true);
+        expectAdd(SourceSink.SINK);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSinkConnector.class, true), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SINK), false, createCallback);
 
         PowerMock.verifyAll();
     }
@@ -130,7 +134,7 @@ public class StandaloneHerderTest {
     @Test
     public void testDestroyConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
         statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
@@ -139,7 +143,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
         FutureCallback<Herder.Created<ConnectorInfo>> futureCb = new FutureCallback<>();
         herder.putConnectorConfig(CONNECTOR_NAME, null, true, futureCb);
         futureCb.get(1000L, TimeUnit.MILLISECONDS);
@@ -159,18 +163,18 @@ public class StandaloneHerderTest {
 
     @Test
     public void testRestartConnector() throws Exception {
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
 
-        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
+        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
                 EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -181,7 +185,7 @@ public class StandaloneHerderTest {
 
     @Test
     public void testRestartConnectorFailureOnStop() throws Exception {
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         RuntimeException e = new RuntimeException();
         worker.stopConnector(CONNECTOR_NAME);
@@ -191,7 +195,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -207,19 +211,19 @@ public class StandaloneHerderTest {
 
     @Test
     public void testRestartConnectorFailureOnStart() throws Exception {
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
 
         RuntimeException e = new RuntimeException();
-        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false))),
+        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(SourceSink.SOURCE))),
                 EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall().andThrow(e);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartConnector(CONNECTOR_NAME, cb);
@@ -236,18 +240,19 @@ public class StandaloneHerderTest {
     @Test
     public void testRestartTask() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
 
-        Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
-        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
+        ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE));
+        TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
+        worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED);
         EasyMock.expectLastCall();
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -259,7 +264,7 @@ public class StandaloneHerderTest {
     @Test
     public void testRestartTaskFailureOnStop() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         RuntimeException e = new RuntimeException();
         worker.stopAndAwaitTask(taskId);
@@ -269,7 +274,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -285,19 +290,20 @@ public class StandaloneHerderTest {
     @Test
     public void testRestartTaskFailureOnStart() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         worker.stopAndAwaitTask(taskId);
         EasyMock.expectLastCall();
 
         RuntimeException e = new RuntimeException();
-        Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
-        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
+        ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(SourceSink.SOURCE));
+        TaskConfig taskConfig = new TaskConfig(taskConfig(SourceSink.SOURCE));
+        worker.startTask(taskId, taskConfig, connConfig, herder, TargetState.STARTED);
         EasyMock.expectLastCall().andThrow(e);
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
 
         FutureCallback<Void> cb = new FutureCallback<>();
         herder.restartTask(taskId, cb);
@@ -314,7 +320,7 @@ public class StandaloneHerderTest {
     @Test
     public void testCreateAndStop() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
         // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
 
@@ -325,7 +331,7 @@ public class StandaloneHerderTest {
 
         PowerMock.replayAll();
 
-        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false), false, createCallback);
+        herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(SourceSink.SOURCE), false, createCallback);
         herder.stop();
 
         PowerMock.verifyAll();
@@ -333,7 +339,7 @@ public class StandaloneHerderTest {
 
     @Test
     public void testAccessors() throws Exception {
-        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
+        Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
 
         Callback<Collection<String>> listConnectorsCb = PowerMock.createMock(Callback.class);
         Callback<ConnectorInfo> connectorInfoCb = PowerMock.createMock(Callback.class);
@@ -353,7 +359,7 @@ public class StandaloneHerderTest {
 
         // Create connector
         connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
 
         // Validate accessors with 1 connector
         listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
@@ -364,7 +370,7 @@ public class StandaloneHerderTest {
         connectorConfigCb.onCompletion(null, connConfig);
         EasyMock.expectLastCall();
 
-        TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(BogusSourceTask.class, false));
+        TaskInfo taskInfo = new TaskInfo(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig(SourceSink.SOURCE));
         taskConfigsCb.onCompletion(null, Arrays.asList(taskInfo));
         EasyMock.expectLastCall();
 
@@ -388,7 +394,7 @@ public class StandaloneHerderTest {
 
     @Test
     public void testPutConnectorConfig() throws Exception {
-        Map<String, String> connConfig = connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class, false);
+        Map<String, String> connConfig = connectorConfig(SourceSink.SOURCE);
         Map<String, String> newConnConfig = new HashMap<>(connConfig);
         newConnConfig.put("foo", "bar");
 
@@ -397,7 +403,7 @@ public class StandaloneHerderTest {
 
         // Create
         connector = PowerMock.createMock(BogusSourceConnector.class);
-        expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+        expectAdd(SourceSink.SOURCE);
         // Should get first config
         connectorConfigCb.onCompletion(null, connConfig);
         EasyMock.expectLastCall();
@@ -411,7 +417,7 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
         // Generate same task config, which should result in no additional action to restart tasks
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, null))
-                .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
+                .andReturn(Collections.singletonList(taskConfig(SourceSink.SOURCE)));
         worker.isSinkConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall().andReturn(false);
         ConnectorInfo newConnInfo = new ConnectorInfo(CONNECTOR_NAME, newConnConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
@@ -446,17 +452,14 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
-    private void expectAdd(String name,
-                           Class<? extends Connector> connClass,
-                           Class<? extends Task> taskClass,
-                           boolean sink) throws Exception {
+    private void expectAdd(SourceSink sourceSink) throws Exception {
 
-        Map<String, String> connectorProps = connectorConfig(name, connClass, sink);
+        Map<String, String> connectorProps = connectorConfig(sourceSink);
 
         worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
                               EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
-        EasyMock.expect(worker.isRunning(name)).andReturn(true);
+        EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
 
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
@@ -464,16 +467,18 @@ public class StandaloneHerderTest {
 
         // And we should instantiate the tasks. For a sink task, we should see added properties for
         // the input topic partitions
-        Map<String, String> generatedTaskProps = taskConfig(taskClass, sink);
+        ConnectorConfig connConfig = new ConnectorConfig(connectorConfig(sourceSink));
+        Map<String, String> generatedTaskProps = taskConfig(sourceSink);
+        TaskConfig taskConfig = new TaskConfig(generatedTaskProps);
 
-        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sink ? TOPICS_LIST : null))
+        EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, sourceSink == SourceSink.SINK ? TOPICS_LIST : null))
             .andReturn(Collections.singletonList(generatedTaskProps));
 
-        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskConfig, connConfig, herder, TargetState.STARTED);
         EasyMock.expectLastCall();
 
         worker.isSinkConnector(CONNECTOR_NAME);
-        PowerMock.expectLastCall().andReturn(sink);
+        PowerMock.expectLastCall().andReturn(sourceSink == SourceSink.SINK);
     }
 
     private void expectStop() {
@@ -490,22 +495,24 @@ public class StandaloneHerderTest {
         expectStop();
     }
 
-    private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass, boolean sink) {
-        HashMap<String, String> connectorProps = new HashMap<>();
-        connectorProps.put(ConnectorConfig.NAME_CONFIG, name);
-        connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
-        if (sink) {
-            connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
-        }
-        return connectorProps;
+    private static Map<String, String> connectorConfig(SourceSink sourceSink) {
+        Map<String, String> props = new HashMap<>();
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+        Class<? extends Connector> connectorClass = sourceSink == SourceSink.SINK ? BogusSinkConnector.class : BogusSourceConnector.class;
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        if (sourceSink == SourceSink.SINK)
+            props.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+        return props;
     }
 
-    private static Map<String, String> taskConfig(Class<? extends Task> taskClass, boolean sink) {
+    private static Map<String, String> taskConfig(SourceSink sourceSink) {
         HashMap<String, String> generatedTaskProps = new HashMap<>();
         // Connectors can add any settings, so these are arbitrary
         generatedTaskProps.put("foo", "bar");
+        Class<? extends Task> taskClass = sourceSink == SourceSink.SINK ? BogusSinkTask.class : BogusSourceTask.class;
         generatedTaskProps.put(TaskConfig.TASK_CLASS_CONFIG, taskClass.getName());
-        if (sink)
+        if (sourceSink == SourceSink.SINK)
             generatedTaskProps.put(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
         return generatedTaskProps;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/docs/connect.html
----------------------------------------------------------------------
diff --git a/docs/connect.html b/docs/connect.html
index e5a4ad2..de3b5aa 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -43,7 +43,17 @@ In standalone mode all work is performed in a single process. This configuration
 &gt; bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
 </pre>
 
-The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment.
+The first parameter is the configuration for the worker. This includes settings such as the Kafka connection parameters, serialization format, and how frequently to commit offsets. The provided example should work well with a local cluster running with the default configuration provided by <code>config/server.properties</code>. It will require tweaking to use with a different configuration or production deployment. All workers (both standalone and distributed) require a few configs:
+<ul>
+    <li><code>bootstrap.servers</code> - List of Kafka servers used to bootstrap connections to Kafka</li>
+    <li><code>key.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
+    <li><code>value.converter</code> - Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.</li>
+</ul>
+
+The important configuration options specific to standalone mode are:
+<ul>
+    <li><code>offset.storage.file.filename</code> - File to store offset data in</li>
+</ul>
 
 The remaining parameters are connector configuration files. You may include as many as you want, but all will execute within the same process (on different threads).
 
@@ -55,7 +65,7 @@ Distributed mode handles automatic balancing of work, allows you to scale up (or
 
 The difference is in the class which is started and the configuration parameters which change how the Kafka Connect process decides where to store configurations, how to assign work, and where to store offsets and task statues. In the distributed mode, Kafka Connect stores the offsets, configs and task statuses in Kafka topics. It is recommended to manually create the topics for offset, configs and statuses in order to achieve the desired the number of partitions and replication factors. If the topics are not yet created when starting Kafka Connect, the topics will be auto created with default number of partitions and replication factor, which may not be best suited for its usage.
 
-In particular, the following configuration parameters are critical to set before starting your cluster:
+In particular, the following configuration parameters, in addition to the common settings mentioned above, are critical to set before starting your cluster:
 <ul>
     <li><code>group.id</code> (default <code>connect-cluster</code>) - unique name for the cluster, used in forming the Connect cluster group; note that this <b>must not conflict</b> with consumer group IDs</li>
     <li><code>config.storage.topic</code> (default <code>connect-configs</code>) - topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated topic. You may need to manually create the topic to ensure single partition for the config topic as auto created topics may have multiple partitions.</li>
@@ -76,6 +86,8 @@ Most configurations are connector dependent, so they can't be outlined here. How
     <li><code>name</code> - Unique name for the connector. Attempting to register again with the same name will fail.</li>
     <li><code>connector.class</code> - The Java class for the connector</li>
     <li><code>tasks.max</code> - The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.</li>
+    <li><code>key.converter</code> - (optional) Override the default key converter set by the worker.</li>
+    <li><code>value.converter</code> - (optional) Override the default value converter set by the worker.</li>
 </ul>
 
 The <code>connector.class</code> config supports several formats: the full name or alias of the class for this connector. If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name or use FileStreamSink or FileStreamSinkConnector to make the configuration a bit shorter.

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/tests/kafkatest/tests/connect/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_rest_test.py b/tests/kafkatest/tests/connect/connect_rest_test.py
index 0b00499..70bc32c 100644
--- a/tests/kafkatest/tests/connect/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect/connect_rest_test.py
@@ -29,8 +29,8 @@ class ConnectRestApiTest(KafkaTest):
     FILE_SOURCE_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSourceConnector'
     FILE_SINK_CONNECTOR = 'org.apache.kafka.connect.file.FileStreamSinkConnector'
 
-    FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topic', 'file'}
-    FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'topics', 'file'}
+    FILE_SOURCE_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'topic', 'file'}
+    FILE_SINK_CONFIGS = {'name', 'connector.class', 'tasks.max', 'key.converter', 'value.converter', 'topics', 'file'}
 
     INPUT_FILE = "/mnt/connect.input"
     INPUT_FILE2 = "/mnt/connect.input2"

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/tests/kafkatest/tests/connect/connect_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/connect_test.py b/tests/kafkatest/tests/connect/connect_test.py
index 9184390..93f5734 100644
--- a/tests/kafkatest/tests/connect/connect_test.py
+++ b/tests/kafkatest/tests/connect/connect_test.py
@@ -63,10 +63,17 @@ class ConnectStandaloneFileTest(Test):
     @parametrize(converter="org.apache.kafka.connect.storage.StringConverter", schemas=None)
     @matrix(security_protocol=[SecurityConfig.PLAINTEXT, SecurityConfig.SASL_SSL])
     def test_file_source_and_sink(self, converter="org.apache.kafka.connect.json.JsonConverter", schemas=True, security_protocol='PLAINTEXT'):
+        """
+        Validates basic end-to-end functionality of Connect standalone using the file source and sink converters. Includes
+        parameterizations to test different converters (which also test per-connector converter overrides), schema/schemaless
+        modes, and security support.
+        """
         assert converter != None, "converter type must be set"
-        # Template parameters
-        self.key_converter = converter
-        self.value_converter = converter
+        # Template parameters. Note that we don't set key/value.converter. These default to JsonConverter and we validate
+        # converter overrides via the connector configuration.
+        if converter != "org.apache.kafka.connect.json.JsonConverter":
+            self.override_key_converter = converter
+            self.override_value_converter = converter
         self.schemas = schemas
 
         self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
index 216dab5..bff002b 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-sink.properties
@@ -17,4 +17,12 @@ name=local-file-sink
 connector.class={{ FILE_SINK_CONNECTOR }}
 tasks.max=1
 file={{ OUTPUT_FILE }}
-topics={{ TOPIC }}
\ No newline at end of file
+topics={{ TOPIC }}
+
+# For testing per-connector converters
+{% if override_key_converter is defined %}
+key.converter={{ override_key_converter }}
+{% endif %}
+{% if override_key_converter is defined %}
+value.converter={{ override_value_converter }}
+{% endif %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/05ed54bf/tests/kafkatest/tests/connect/templates/connect-file-source.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect/templates/connect-file-source.properties b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
index bff9720..800d6a0 100644
--- a/tests/kafkatest/tests/connect/templates/connect-file-source.properties
+++ b/tests/kafkatest/tests/connect/templates/connect-file-source.properties
@@ -17,4 +17,12 @@ name=local-file-source
 connector.class={{ FILE_SOURCE_CONNECTOR }}
 tasks.max=1
 file={{ INPUT_FILE }}
-topic={{ TOPIC }}
\ No newline at end of file
+topic={{ TOPIC }}
+
+# For testing per-connector converters
+{% if override_key_converter is defined %}
+key.converter={{ override_key_converter }}
+{% endif %}
+{% if override_key_converter is defined %}
+value.converter={{ override_value_converter }}
+{% endif %}