You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/08/26 21:03:52 UTC

[2/2] kafka git commit: KAFKA-4042: Contain connector & task start/stop failures within the Worker

KAFKA-4042: Contain connector & task start/stop failures within the Worker

Invoke the statusListener.onFailure() callback on start failures so that the statusBackingStore is updated. This involved a fix to the putSafe() functionality which prevented any update that was not preceded by a (non-safe) put() from completing, so here when a connector or task is transitioning directly to FAILED.

Worker start methods can still throw if the same connector name or task ID is already registered with the worker, as this condition should not happen.

Author: Shikhar Bhushan <sh...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1778 from shikhar/distherder-stayup-take4


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71f7e7c3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71f7e7c3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71f7e7c3

Branch: refs/heads/trunk
Commit: 71f7e7c3a29e8f7339430837065126256907bd2a
Parents: 1902394
Author: Shikhar Bhushan <sh...@confluent.io>
Authored: Fri Aug 26 14:00:16 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Fri Aug 26 14:00:42 2016 -0700

----------------------------------------------------------------------
 .gitignore                                      |   1 +
 .../kafka/connect/cli/ConnectDistributed.java   |   4 +-
 .../kafka/connect/cli/ConnectStandalone.java    |   4 +-
 .../kafka/connect/runtime/AbstractHerder.java   |   2 +-
 .../kafka/connect/runtime/ConnectorFactory.java | 102 ++++++
 .../apache/kafka/connect/runtime/Worker.java    | 319 ++++++++-----------
 .../runtime/distributed/DistributedHerder.java  |  88 ++---
 .../runtime/standalone/StandaloneHerder.java    |  64 ++--
 .../storage/KafkaStatusBackingStore.java        |  22 +-
 .../kafka/connect/runtime/WorkerTest.java       | 119 ++++---
 .../distributed/DistributedHerderTest.java      | 165 +++++-----
 .../standalone/StandaloneHerderTest.java        | 119 ++-----
 .../storage/KafkaStatusBackingStoreTest.java    |  28 ++
 tests/kafkatest/services/connect.py             |   5 +-
 .../tests/connect/connect_distributed_test.py   |  42 ++-
 15 files changed, 548 insertions(+), 536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index b54fcf3..e491071 100644
--- a/.gitignore
+++ b/.gitignore
@@ -38,6 +38,7 @@ results
 tests/results
 .ducktape
 tests/.ducktape
+tests/venv
 .cache
 
 docs/generated/

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index e7a0c36..7a09ac3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
@@ -62,6 +63,7 @@ public class ConnectDistributed {
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
         Time time = new SystemTime();
+        ConnectorFactory connectorFactory = new ConnectorFactory();
         DistributedConfig config = new DistributedConfig(workerProps);
 
         RestServer rest = new RestServer(config);
@@ -71,7 +73,7 @@ public class ConnectDistributed {
         KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
         offsetBackingStore.configure(config);
 
-        Worker worker = new Worker(workerId, time, config, offsetBackingStore);
+        Worker worker = new Worker(workerId, time, connectorFactory, config, offsetBackingStore);
 
         StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
         statusBackingStore.configure(config);

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index b75783c..65a71af 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.ConnectorFactory;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -67,13 +68,14 @@ public class ConnectStandalone {
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
         Time time = new SystemTime();
+        ConnectorFactory connectorFactory = new ConnectorFactory();
         StandaloneConfig config = new StandaloneConfig(workerProps);
 
         RestServer rest = new RestServer(config);
         URI advertisedUrl = rest.advertisedUrl();
         String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
 
-        Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
+        Worker worker = new Worker(workerId, time, connectorFactory, config, new FileOffsetBackingStore());
 
         Herder herder = new StandaloneHerder(worker);
         final Connect connect = new Connect(herder, rest);

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 44da042..b7a0e67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -361,7 +361,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         if (tempConnectors.containsKey(connType)) {
             return tempConnectors.get(connType);
         } else {
-            Connector connector = worker.getConnector(connType);
+            Connector connector = worker.getConnectorFactory().newConnector(connType);
             tempConnectors.put(connType, connector);
             return connector;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
new file mode 100644
index 0000000..b432071
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorFactory.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+
+public class ConnectorFactory {
+
+    public Connector newConnector(String connectorClassOrAlias) {
+        return instantiate(getConnectorClass(connectorClassOrAlias));
+    }
+
+    public Task newTask(Class<? extends Task> taskClass) {
+        return instantiate(taskClass);
+    }
+
+    private static <T> T instantiate(Class<? extends T> cls) {
+        try {
+            return Utils.newInstance(cls);
+        } catch (Throwable t) {
+            throw new ConnectException("Instantiation error", t);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Class<? extends Connector> getConnectorClass(String connectorClassOrAlias) {
+        // Avoid the classpath scan if the full class name was provided
+        try {
+            Class<?> clazz = Class.forName(connectorClassOrAlias);
+            if (!Connector.class.isAssignableFrom(clazz))
+                throw new ConnectException("Class " + connectorClassOrAlias + " does not implement Connector");
+            return (Class<? extends Connector>) clazz;
+        } catch (ClassNotFoundException e) {
+            // Fall through to scan for the alias
+        }
+
+        // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
+        Reflections reflections = new Reflections(new ConfigurationBuilder()
+                .setUrls(ClasspathHelper.forJavaClassPath()));
+
+        Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
+
+        List<Class<? extends Connector>> results = new ArrayList<>();
+
+        for (Class<? extends Connector> connector: connectors) {
+            // Configuration included the class name but not package
+            if (connector.getSimpleName().equals(connectorClassOrAlias))
+                results.add(connector);
+
+            // Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
+            if (connector.getSimpleName().equals(connectorClassOrAlias + "Connector"))
+                results.add(connector);
+        }
+
+        if (results.isEmpty())
+            throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorClassOrAlias +
+                    ", available connectors are: " + connectorNames(connectors));
+        if (results.size() > 1) {
+            throw new ConnectException("More than one connector matches alias " +  connectorClassOrAlias +
+                    ". Please use full package and class name instead. Classes found: " + connectorNames(results));
+        }
+
+        // We just validated that we have exactly one result, so this is safe
+        return results.get(0);
+    }
+
+    private static String connectorNames(Collection<Class<? extends Connector>> connectors) {
+        StringBuilder names = new StringBuilder();
+        for (Class<?> c : connectors)
+            names.append(c.getName()).append(", ");
+        return names.substring(0, names.toString().length() - 2);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index d39806a..1265f9e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -34,15 +33,14 @@ import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -65,6 +63,7 @@ public class Worker {
     private final ExecutorService executor;
     private final Time time;
     private final String workerId;
+    private final ConnectorFactory connectorFactory;
     private final WorkerConfig config;
     private final Converter defaultKeyConverter;
     private final Converter defaultValueConverter;
@@ -77,13 +76,11 @@ public class Worker {
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
-    public Worker(String workerId,
-                  Time time,
-                  WorkerConfig config,
-                  OffsetBackingStore offsetBackingStore) {
+    public Worker(String workerId, Time time, ConnectorFactory connectorFactory, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
         this.executor = Executors.newCachedThreadPool();
         this.workerId = workerId;
         this.time = time;
+        this.connectorFactory = connectorFactory;
         this.config = config;
         this.defaultKeyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
         this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
@@ -127,18 +124,15 @@ public class Worker {
         long started = time.milliseconds();
         long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
 
-        for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
-            WorkerConnector workerConnector = entry.getValue();
-            log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
-                    "Worker is stopped.", entry.getKey());
-            workerConnector.shutdown();
+        if (!connectors.isEmpty()) {
+            log.warn("Shutting down connectors {} uncleanly; herder should have shut down connectors before the Worker is stopped", connectors.keySet());
+            stopConnectors();
         }
 
-        Collection<ConnectorTaskId> taskIds = tasks.keySet();
-        log.warn("Shutting down tasks {} uncleanly; herder should have shut down "
-                + "tasks before the Worker is stopped.", taskIds);
-        stopTasks(taskIds);
-        awaitStopTasks(taskIds);
+        if (!tasks.isEmpty()) {
+            log.warn("Shutting down tasks {} uncleanly; herder should have shut down tasks before the Worker is stopped", tasks.keySet());
+            stopAndAwaitTasks();
+        }
 
         long timeoutMs = limit - time.milliseconds();
         sourceTaskOffsetCommitter.close(timeoutMs);
@@ -148,34 +142,36 @@ public class Worker {
         log.info("Worker stopped");
     }
 
-    /**
-     * Add a new connector.
-     * @param connConfig connector configuration
-     * @param ctx context for the connector
-     * @param statusListener listener for notifications of connector status changes
-     * @param initialState the initial target state that the connector should be initialized to
-     */
-    public void startConnector(ConnectorConfig connConfig,
-                               ConnectorContext ctx,
-                               ConnectorStatus.Listener statusListener,
-                               TargetState initialState) {
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
-
-        log.info("Creating connector {} of type {}", connName, connClass.getName());
-
+    public boolean startConnector(
+            String connName,
+            Map<String, String> connProps,
+            ConnectorContext ctx,
+            ConnectorStatus.Listener statusListener,
+            TargetState initialState
+    ) {
         if (connectors.containsKey(connName))
             throw new ConnectException("Connector with name " + connName + " already exists");
 
-        final Connector connector = instantiateConnector(connClass);
-        WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
-
-        log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
-        workerConnector.initialize(connConfig);
-        workerConnector.transitionTo(initialState);
+        final WorkerConnector workerConnector;
+        try {
+            final ConnectorConfig connConfig = new ConnectorConfig(connProps);
+            final String connClass = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+            log.info("Creating connector {} of type {}", connName, connClass);
+            final Connector connector = connectorFactory.newConnector(connClass);
+            workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
+            log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connector.getClass());
+            workerConnector.initialize(connConfig);
+            workerConnector.transitionTo(initialState);
+        } catch (Throwable t) {
+            log.error("Failed to start connector {}", connName, t);
+            statusListener.onFailure(connName, t);
+            return false;
+        }
 
         connectors.put(connName, workerConnector);
+
         log.info("Finished creating connector {}", connName);
+        return true;
     }
 
     /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
@@ -184,73 +180,6 @@ public class Worker {
         return workerConnector.isSinkConnector();
     }
 
-    public Connector getConnector(String connType) {
-        Class<? extends Connector> connectorClass = getConnectorClass(connType);
-        return instantiateConnector(connectorClass);
-    }
-
-    @SuppressWarnings("unchecked")
-    private Class<? extends Connector> getConnectorClass(String connectorAlias) {
-        // Avoid the classpath scan if the full class name was provided
-        try {
-            Class<?> clazz = Class.forName(connectorAlias);
-            if (!Connector.class.isAssignableFrom(clazz))
-                throw new ConnectException("Class " + connectorAlias + " does not implement Connector");
-            return (Class<? extends Connector>) clazz;
-        } catch (ClassNotFoundException e) {
-            // Fall through to scan for the alias
-        }
-
-        // Iterate over our entire classpath to find all the connectors and hopefully one of them matches the alias from the connector configration
-        Reflections reflections = new Reflections(new ConfigurationBuilder()
-                .setUrls(ClasspathHelper.forJavaClassPath()));
-
-        Set<Class<? extends Connector>> connectors = reflections.getSubTypesOf(Connector.class);
-
-        List<Class<? extends Connector>> results = new ArrayList<>();
-
-        for (Class<? extends Connector> connector: connectors) {
-            // Configuration included the class name but not package
-            if (connector.getSimpleName().equals(connectorAlias))
-                results.add(connector);
-
-            // Configuration included a short version of the name (i.e. FileStreamSink instead of FileStreamSinkConnector)
-            if (connector.getSimpleName().equals(connectorAlias + "Connector"))
-                results.add(connector);
-        }
-
-        if (results.isEmpty())
-            throw new ConnectException("Failed to find any class that implements Connector and which name matches " + connectorAlias + " available connectors are: " + connectorNames(connectors));
-        if (results.size() > 1) {
-            throw new ConnectException("More than one connector matches alias " +  connectorAlias + ". Please use full package + class name instead. Classes found: " + connectorNames(results));
-        }
-
-        // We just validated that we have exactly one result, so this is safe
-        return results.get(0);
-    }
-
-    private String connectorNames(Collection<Class<? extends Connector>> connectors) {
-        StringBuilder names = new StringBuilder();
-        for (Class<?> c : connectors)
-            names.append(c.getName()).append(", ");
-
-        return names.substring(0, names.toString().length() - 2);
-    }
-
-    public boolean ownsTask(ConnectorTaskId taskId) {
-        return tasks.containsKey(taskId);
-    }
-
-    private static Connector instantiateConnector(Class<? extends Connector> connClass) {
-        try {
-            return Utils.newInstance(connClass);
-        } catch (Throwable t) {
-            // Catches normal exceptions due to instantiation errors as well as any runtime errors that
-            // may be caused by user code
-            throw new ConnectException("Failed to create connector instance", t);
-        }
-    }
-
     public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
         log.trace("Reconfiguring connector tasks for {}", connName);
 
@@ -271,17 +200,34 @@ public class Worker {
         return result;
     }
 
-    public void stopConnector(String connName) {
+    public void stopConnectors() {
+        stopConnectors(new HashSet<>(connectors.keySet()));
+    }
+
+    public Collection<String> stopConnectors(Collection<String> connectors) {
+        final List<String> stopped = new ArrayList<>(connectors.size());
+        for (String connector: connectors) {
+            if (stopConnector(connector)) {
+                stopped.add(connector);
+            }
+        }
+        return stopped;
+    }
+
+    public boolean stopConnector(String connName) {
         log.info("Stopping connector {}", connName);
 
         WorkerConnector connector = connectors.get(connName);
-        if (connector == null)
-            throw new ConnectException("Connector " + connName + " not found in this worker.");
+        if (connector == null) {
+            log.warn("Ignoring stop request for unowned connector {}", connName);
+            return false;
+        }
 
         connector.shutdown();
         connectors.remove(connName);
 
         log.info("Stopped connector {}", connName);
+        return true;
     }
 
     /**
@@ -293,60 +239,55 @@ public class Worker {
 
     public boolean isRunning(String connName) {
         WorkerConnector connector = connectors.get(connName);
-        if (connector == null)
-            throw new ConnectException("Connector " + connName + " not found in this worker.");
-        return connector.isRunning();
+        return connector != null && connector.isRunning();
     }
 
-    /**
-     * Add a new task.
-     * @param id Globally unique ID for this task.
-     * @param taskConfig the parsed task configuration
-     * @param connConfig the parsed connector configuration
-     * @param statusListener listener for notifications of task status changes
-     * @param initialState the initial target state that the task should be initialized to
-     */
-    public void startTask(ConnectorTaskId id,
-                          TaskConfig taskConfig,
-                          ConnectorConfig connConfig,
-                          TaskStatus.Listener statusListener,
-                          TargetState initialState) {
+    public boolean startTask(
+            ConnectorTaskId id,
+            Map<String, String> connProps,
+            Map<String, String> taskProps,
+            TaskStatus.Listener statusListener,
+            TargetState initialState
+    ) {
         log.info("Creating task {}", id);
 
-        if (tasks.containsKey(id)) {
-            String msg = "Task already exists in this worker; the herder should not have requested "
-                    + "that this : " + id;
-            log.error(msg);
-            throw new ConnectException(msg);
+        if (tasks.containsKey(id))
+            throw new ConnectException("Task already exists in this worker: " + id);
+
+        final WorkerTask workerTask;
+        try {
+            final ConnectorConfig connConfig = new ConnectorConfig(connProps);
+            final TaskConfig taskConfig = new TaskConfig(taskProps);
+
+            final Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
+            final Task task = connectorFactory.newTask(taskClass);
+            log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
+
+            Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+            if (keyConverter != null)
+                keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
+            else
+                keyConverter = defaultKeyConverter;
+            Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+            if (valueConverter != null)
+                valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
+            else
+                valueConverter = defaultValueConverter;
+
+            workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
+            workerTask.initialize(taskConfig);
+        } catch (Throwable t) {
+            log.error("Failed to start task {}", id, t);
+            statusListener.onFailure(id, t);
+            return false;
         }
 
-        Class<? extends Task> taskClass = taskConfig.getClass(TaskConfig.TASK_CLASS_CONFIG).asSubclass(Task.class);
-        final Task task = instantiateTask(taskClass);
-        log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
-
-        Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
-        if (keyConverter != null)
-            keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
-        else
-            keyConverter = defaultKeyConverter;
-        Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
-        if (valueConverter != null)
-            valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
-        else
-            valueConverter = defaultValueConverter;
-
-        final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
-
-        // Start the task before adding modifying any state, any exceptions are caught higher up the
-        // call chain and there's no cleanup to do here
-        workerTask.initialize(taskConfig);
         executor.submit(workerTask);
-
-        if (task instanceof SourceTask) {
-            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
-            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+        if (workerTask instanceof WorkerSourceTask) {
+            sourceTaskOffsetCommitter.schedule(id, (WorkerSourceTask) workerTask);
         }
         tasks.put(id, workerTask);
+        return true;
     }
 
     private WorkerTask buildWorkerTask(ConnectorTaskId id,
@@ -373,25 +314,42 @@ public class Worker {
         }
     }
 
-    private static Task instantiateTask(Class<? extends Task> taskClass) {
-        try {
-            return Utils.newInstance(taskClass);
-        } catch (KafkaException e) {
-            throw new ConnectException("Task class not found", e);
+    public boolean stopAndAwaitTask(ConnectorTaskId id) {
+        return !stopAndAwaitTasks(Collections.singleton(id)).isEmpty();
+    }
+
+    public void stopAndAwaitTasks() {
+        stopAndAwaitTasks(new HashSet<>(tasks.keySet()));
+    }
+
+    public Collection<ConnectorTaskId> stopAndAwaitTasks(Collection<ConnectorTaskId> ids) {
+        final List<ConnectorTaskId> stoppable = new ArrayList<>(ids.size());
+        for (ConnectorTaskId taskId : ids) {
+            final WorkerTask task = tasks.get(taskId);
+            if (task == null) {
+                log.warn("Ignoring stop request for unowned task {}", taskId);
+                continue;
+            }
+            stopTask(task);
+            stoppable.add(taskId);
         }
+        awaitStopTasks(stoppable);
+        return stoppable;
     }
 
-    public void stopTasks(Collection<ConnectorTaskId> ids) {
-        for (ConnectorTaskId id : ids)
-            stopTask(getTask(id));
+    private void stopTask(WorkerTask task) {
+        log.info("Stopping task {}", task.id());
+        if (task instanceof WorkerSourceTask)
+            sourceTaskOffsetCommitter.remove(task.id());
+        task.stop();
     }
 
-    public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
+    private void awaitStopTasks(Collection<ConnectorTaskId> ids) {
         long now = time.milliseconds();
         long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
         for (ConnectorTaskId id : ids) {
             long remaining = Math.max(0, deadline - time.milliseconds());
-            awaitStopTask(getTask(id), remaining);
+            awaitStopTask(tasks.get(id), remaining);
         }
     }
 
@@ -403,20 +361,6 @@ public class Worker {
         tasks.remove(task.id());
     }
 
-    private void stopTask(WorkerTask task) {
-        log.info("Stopping task {}", task.id());
-        if (task instanceof WorkerSourceTask)
-            sourceTaskOffsetCommitter.remove(task.id());
-        task.stop();
-    }
-
-    public void stopAndAwaitTask(ConnectorTaskId id) {
-        WorkerTask task = getTask(id);
-        stopTask(task);
-        awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
-        log.info("Task {} completed shutdown.", task.id());
-    }
-
     /**
      * Get the IDs of the tasks currently running in this worker.
      */
@@ -424,15 +368,6 @@ public class Worker {
         return tasks.keySet();
     }
 
-    private WorkerTask getTask(ConnectorTaskId id) {
-        WorkerTask task = tasks.get(id);
-        if (task == null) {
-            log.error("Task not found: " + id);
-            throw new ConnectException("Task not found: " + id);
-        }
-        return task;
-    }
-
     public Converter getInternalKeyConverter() {
         return internalKeyConverter;
     }
@@ -441,12 +376,12 @@ public class Worker {
         return internalValueConverter;
     }
 
-    public String workerId() {
-        return workerId;
+    public ConnectorFactory getConnectorFactory() {
+        return connectorFactory;
     }
 
-    public boolean ownsConnector(String connName) {
-        return this.connectors.containsKey(connName);
+    public String workerId() {
+        return workerId;
     }
 
     public void setTargetState(String connName, TargetState state) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 6232187..afcd283 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -17,7 +17,6 @@
 
 package org.apache.kafka.connect.runtime.distributed;
 
-import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
@@ -31,7 +30,6 @@ import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -322,7 +320,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
             // additionally, if the worker is running the connector itself, then we need to
             // request reconfiguration to ensure that config changes while paused take effect
-            if (worker.ownsConnector(connector) && targetState == TargetState.STARTED)
+            if (targetState == TargetState.STARTED)
                 reconfigureConnectorTasksWithRetry(connector);
         }
     }
@@ -331,18 +329,9 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     public void halt() {
         synchronized (this) {
             // Clean up any connectors and tasks that are still running.
-            log.info("Stopping connectors and tasks that are still assigned to this worker.");
-            for (String connName : new HashSet<>(worker.connectorNames())) {
-                try {
-                    worker.stopConnector(connName);
-                } catch (Throwable t) {
-                    log.error("Failed to shut down connector " + connName, t);
-                }
-            }
-
-            Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
-            worker.stopTasks(tasks);
-            worker.awaitStopTasks(tasks);
+            log.info("Stopping connectors and tasks that are still assigned to the worker");
+            worker.stopConnectors();
+            worker.stopAndAwaitTasks();
 
             member.stop();
 
@@ -573,11 +562,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                     return null;
                 }
 
-                if (worker.ownsConnector(connName)) {
+                if (assignment.connectors().contains(connName)) {
                     try {
                         worker.stopConnector(connName);
-                        startConnector(connName);
-                        callback.onCompletion(null, null);
+                        if (startConnector(connName))
+                            callback.onCompletion(null, null);
+                        else
+                            callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
                     } catch (Throwable t) {
                         callback.onCompletion(t, null);
                     }
@@ -609,11 +600,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                     return null;
                 }
 
-                if (worker.ownsTask(id)) {
+                if (assignment.tasks().contains(id)) {
                     try {
                         worker.stopAndAwaitTask(id);
-                        startTask(id);
-                        callback.onCompletion(null, null);
+                        if (startTask(id))
+                            callback.onCompletion(null, null);
+                        else
+                            callback.onCompletion(new ConnectException("Failed to start task: " + id), null);
                     } catch (Throwable t) {
                         callback.onCompletion(t, null);
                     }
@@ -751,48 +744,41 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         // Start assigned connectors and tasks
         log.info("Starting connectors and tasks using config offset {}", assignment.offset());
         for (String connectorName : assignment.connectors()) {
-            try {
-                startConnector(connectorName);
-            } catch (ConfigException e) {
-                log.error("Couldn't instantiate connector " + connectorName + " because it has an invalid connector " +
-                        "configuration. This connector will not execute until reconfigured.", e);
-            }
+            startConnector(connectorName);
         }
         for (ConnectorTaskId taskId : assignment.tasks()) {
-            try {
-                startTask(taskId);
-            } catch (ConfigException e) {
-                log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
-                        "configuration. This task will not execute until reconfigured.", e);
-            }
+            startTask(taskId);
         }
         log.info("Finished starting connectors and tasks");
     }
 
-    private void startTask(ConnectorTaskId taskId) {
+    private boolean startTask(ConnectorTaskId taskId) {
         log.info("Starting task {}", taskId);
-        TargetState initialState = configState.targetState(taskId.connector());
-        TaskConfig taskConfig = new TaskConfig(configState.taskConfig(taskId));
-        ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
-        worker.startTask(taskId, taskConfig, connConfig, this, initialState);
+        return worker.startTask(
+                taskId,
+                configState.connectorConfig(taskId.connector()),
+                configState.taskConfig(taskId),
+                this,
+                configState.targetState(taskId.connector())
+        );
     }
 
     // Helper for starting a connector with the given name, which will extract & parse the config, generate connector
     // context and add to the worker. This needs to be called from within the main worker thread for this herder.
-    private void startConnector(String connectorName) {
+    private boolean startConnector(String connectorName) {
         log.info("Starting connector {}", connectorName);
-        Map<String, String> configs = configState.connectorConfig(connectorName);
-        ConnectorConfig connConfig = new ConnectorConfig(configs);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
-        ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
-        TargetState initialState = configState.targetState(connectorName);
-        worker.startConnector(connConfig, ctx, this, initialState);
+        final Map<String, String> configProps = configState.connectorConfig(connectorName);
+        final ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connectorName);
+        final TargetState initialState = configState.targetState(connectorName);
+        boolean started = worker.startConnector(connectorName, configProps, ctx, this, initialState);
 
         // Immediately request configuration since this could be a brand new connector. However, also only update those
         // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
         // just restoring an existing connector.
-        if (initialState == TargetState.STARTED)
-            reconfigureConnectorTasksWithRetry(connName);
+        if (started && initialState == TargetState.STARTED)
+            reconfigureConnectorTasksWithRetry(connectorName);
+
+        return started;
     }
 
     private void reconfigureConnectorTasksWithRetry(final String connName) {
@@ -1053,17 +1039,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                 // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
                 // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
                 // unnecessary repeated connections to the source/sink system.
-                for (String connectorName : connectors)
-                    worker.stopConnector(connectorName);
+                worker.stopConnectors(connectors);
 
                 // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
                 // stopping them then state could continue to be reused when the task remains on this worker. For example,
                 // this would avoid having to close a connection and then reopen it when the task is assigned back to this
                 // worker again.
-                if (!tasks.isEmpty()) {
-                    worker.stopTasks(tasks); // trigger stop() for all tasks
-                    worker.awaitStopTasks(tasks); // await stopping tasks
-                }
+                worker.stopAndAwaitTasks(tasks);
 
                 // Ensure that all status updates have been pushed to the storage system before rebalancing.
                 // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index cac8d18..2015f27 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -26,7 +26,6 @@ import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.SinkConnectorConfig;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
@@ -81,11 +80,7 @@ public class StandaloneHerder extends AbstractHerder {
         // the tasks.
         for (String connName : configState.connectors()) {
             removeConnectorTasks(connName);
-            try {
-                worker.stopConnector(connName);
-            } catch (ConnectException e) {
-                log.error("Error shutting down connector {}: ", connName, e);
-            }
+            worker.stopConnector(connName);
         }
         stopServices();
         log.info("Herder stopped");
@@ -161,7 +156,10 @@ public class StandaloneHerder extends AbstractHerder {
                 created = true;
             }
             if (config != null) {
-                startConnector(config);
+                if (!startConnector(config)) {
+                    callback.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
+                    return;
+                }
                 updateConnectorTasks(connName);
             }
             if (config != null)
@@ -209,18 +207,14 @@ public class StandaloneHerder extends AbstractHerder {
         Map<String, String> taskConfigProps = configState.taskConfig(taskId);
         if (taskConfigProps == null)
             cb.onCompletion(new NotFoundException("Task " + taskId + " not found", null), null);
-        TaskConfig taskConfig = new TaskConfig(taskConfigProps);
-        ConnectorConfig connConfig = new ConnectorConfig(configState.connectorConfig(taskId.connector()));
+        Map<String, String> connConfigProps = configState.connectorConfig(taskId.connector());
 
         TargetState targetState = configState.targetState(taskId.connector());
-        try {
-            worker.stopAndAwaitTask(taskId);
-            worker.startTask(taskId, taskConfig, connConfig, this, targetState);
+        worker.stopAndAwaitTask(taskId);
+        if (worker.startTask(taskId, connConfigProps, taskConfigProps, this, targetState))
             cb.onCompletion(null, null);
-        } catch (Exception e) {
-            log.error("Failed to restart task {}", taskId, e);
-            cb.onCompletion(e, null);
-        }
+        else
+            cb.onCompletion(new ConnectException("Failed to start task: " + taskId), null);
     }
 
     @Override
@@ -229,28 +223,18 @@ public class StandaloneHerder extends AbstractHerder {
             cb.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null);
 
         Map<String, String> config = configState.connectorConfig(connName);
-        try {
-            worker.stopConnector(connName);
-            startConnector(config);
+        worker.stopConnector(connName);
+        if (startConnector(config))
             cb.onCompletion(null, null);
-        } catch (Exception e) {
-            log.error("Failed to restart connector {}", connName, e);
-            cb.onCompletion(e, null);
-        }
+        else
+            cb.onCompletion(new ConnectException("Failed to start connector: " + connName), null);
     }
 
-    /**
-     * Start a connector in the worker and record its state.
-     * @param connectorProps new connector configuration
-     * @return the connector name
-     */
-    private String startConnector(Map<String, String> connectorProps) {
-        ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
-        String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+    private boolean startConnector(Map<String, String> connectorProps) {
+        String connName = connectorProps.get(ConnectorConfig.NAME_CONFIG);
         configBackingStore.putConnectorConfig(connName, connectorProps);
         TargetState targetState = configState.targetState(connName);
-        worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this, targetState);
-        return connName;
+        return worker.startConnector(connName, connectorProps, new HerderConnectorContext(this, connName), this, targetState);
     }
 
     private List<Map<String, String>> recomputeTaskConfigs(String connName) {
@@ -273,27 +257,17 @@ public class StandaloneHerder extends AbstractHerder {
 
     private void createConnectorTasks(String connName, TargetState initialState) {
         Map<String, String> connConfigs = configState.connectorConfig(connName);
-        ConnectorConfig connConfig = new ConnectorConfig(connConfigs);
 
         for (ConnectorTaskId taskId : configState.tasks(connName)) {
             Map<String, String> taskConfigMap = configState.taskConfig(taskId);
-            TaskConfig taskConfig = new TaskConfig(taskConfigMap);
-            try {
-                worker.startTask(taskId, taskConfig, connConfig, this, initialState);
-            } catch (Throwable e) {
-                log.error("Failed to add task {}: ", taskId, e);
-                // Swallow this so we can continue updating the rest of the tasks
-                // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
-                // that died after starting successfully.
-            }
+            worker.startTask(taskId, connConfigs, taskConfigMap, this, initialState);
         }
     }
 
     private void removeConnectorTasks(String connName) {
         Collection<ConnectorTaskId> tasks = configState.tasks(connName);
         if (!tasks.isEmpty()) {
-            worker.stopTasks(tasks);
-            worker.awaitStopTasks(tasks);
+            worker.stopAndAwaitTasks(tasks);
             configBackingStore.removeTaskConfigs(connName);
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
index d24645e..c377ff6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -64,11 +64,11 @@ import java.util.Set;
  * but it can avoid specific unsafe conditions. In particular, we putSafe()
  * allows writes in the following conditions:
  *
- * 3) It is (probably) safe to overwrite the state if there is no previous
+ * 1) It is (probably) safe to overwrite the state if there is no previous
  *    value.
- * 1) It is (probably) safe to overwrite the state if the previous value was
+ * 2) It is (probably) safe to overwrite the state if the previous value was
  *    set by a worker with the same workerId.
- * 2) It is (probably) safe to overwrite the previous state if the current
+ * 3) It is (probably) safe to overwrite the previous state if the current
  *    generation is higher than the previous .
  *
  * Basically all these conditions do is reduce the window for conflicts. They
@@ -201,7 +201,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
         final int sequence;
         synchronized (this) {
             this.generation = status.generation();
-            if (safeWrite && !entry.canWrite(status))
+            if (safeWrite && !entry.canWriteSafely(status))
                 return;
             sequence = entry.increment();
         }
@@ -216,7 +216,7 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
                         synchronized (KafkaStatusBackingStore.this) {
                             if (entry.isDeleted()
                                     || status.generation() != generation
-                                    || (safeWrite && !entry.canWrite(status, sequence)))
+                                    || (safeWrite && !entry.canWriteSafely(status, sequence)))
                                 return;
                         }
                         kafkaLog.send(key, value, this);
@@ -448,14 +448,14 @@ public class KafkaStatusBackingStore implements StatusBackingStore {
             return deleted;
         }
 
-        public boolean canWrite(T status) {
-            return value != null &&
-                    (value.workerId().equals(status.workerId())
-                    || value.generation() <= status.generation());
+        public boolean canWriteSafely(T status) {
+            return value == null
+                    || value.workerId().equals(status.workerId())
+                    || value.generation() <= status.generation();
         }
 
-        public boolean canWrite(T status, int sequence) {
-            return canWrite(status) && this.sequence == sequence;
+        public boolean canWriteSafely(T status, int sequence) {
+            return canWriteSafely(status) && this.sequence == sequence;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/71f7e7c3/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f9839f5..97e29be 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -56,10 +56,11 @@ import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
-@PrepareForTest(Worker.class)
+@PrepareForTest({Worker.class})
 @PowerMockIgnore("javax.management.*")
 public class WorkerTest extends ThreadedTest {
 
@@ -69,6 +70,7 @@ public class WorkerTest extends ThreadedTest {
 
     private WorkerConfig config;
     private Worker worker;
+    private ConnectorFactory connectorFactory = PowerMock.createMock(ConnectorFactory.class);
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
     private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
     private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
@@ -96,8 +98,7 @@ public class WorkerTest extends ThreadedTest {
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
+        EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -125,15 +126,14 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
 
-        ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
+        worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
+            worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -147,6 +147,29 @@ public class WorkerTest extends ThreadedTest {
     }
 
     @Test
+    public void testStartConnectorFailure() throws Exception {
+        expectStartStorage();
+
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker.start();
+
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
+        props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
+        props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
+        props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "java.util.HashMap"); // Bad connector class name
+
+        connectorStatusListener.onFailure(EasyMock.eq(CONNECTOR_ID), EasyMock.<Throwable>anyObject());
+        EasyMock.expectLastCall();
+
+        assertFalse(worker.startConnector(CONNECTOR_ID, props, PowerMock.createMock(ConnectorContext.class), connectorStatusListener, TargetState.STARTED));
+
+        assertEquals(Collections.emptySet(), worker.connectorNames());
+
+        assertFalse(worker.stopConnector(CONNECTOR_ID));
+    }
+
+    @Test
     public void testAddConnectorByAlias() throws Exception {
         expectStartStorage();
 
@@ -154,8 +177,7 @@ public class WorkerTest extends ThreadedTest {
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
+        EasyMock.expect(connectorFactory.newConnector("WorkerTestConnector")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -183,12 +205,11 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
 
-        ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
+        worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -207,8 +228,7 @@ public class WorkerTest extends ThreadedTest {
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
+        EasyMock.expect(connectorFactory.newConnector("WorkerTest")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -236,12 +256,11 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
 
-        ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
+        worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -252,14 +271,13 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
-
-    @Test(expected = ConnectException.class)
+    @Test
     public void testStopInvalidConnector() {
         expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
 
         worker.stopConnector(CONNECTOR_ID);
@@ -273,8 +291,7 @@ public class WorkerTest extends ThreadedTest {
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
-        PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
+        EasyMock.expect(connectorFactory.newConnector(WorkerTestConnector.class.getName())).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
         Map<String, String> props = new HashMap<>();
@@ -308,15 +325,14 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
 
-        ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
+        worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
+            worker.startConnector(CONNECTOR_ID, props, ctx, connectorStatusListener, TargetState.STARTED);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -347,8 +363,7 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
+        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
         PowerMock.expectNew(
@@ -381,10 +396,10 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertEquals(Collections.emptySet(), worker.taskIds());
@@ -394,16 +409,36 @@ public class WorkerTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
-    @Test(expected = ConnectException.class)
+    @Test
+    public void testStartTaskFailure() throws Exception {
+        expectStartStorage();
+
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
+        worker.start();
+
+        Map<String, String> origProps = new HashMap<>();
+        origProps.put(TaskConfig.TASK_CLASS_CONFIG, "missing.From.This.Workers.Classpath");
+
+        assertFalse(worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED));
+
+        taskStatusListener.onFailure(EasyMock.eq(TASK_ID), EasyMock.<Throwable>anyObject());
+        EasyMock.expectLastCall();
+
+        assertEquals(Collections.emptySet(), worker.taskIds());
+
+        assertFalse(worker.stopAndAwaitTask(TASK_ID));
+    }
+
+    @Test
     public void testStopInvalidTask() {
         expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
 
-        worker.stopAndAwaitTask(TASK_ID);
+        assertFalse(worker.stopAndAwaitTask(TASK_ID));
     }
 
     @Test
@@ -415,8 +450,7 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
+        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
         
         PowerMock.expectNew(
@@ -451,9 +485,9 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
-        worker.startTask(TASK_ID, new TaskConfig(origProps), anyConnectorConfig(), taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, anyConnectorConfigMap(), origProps, taskStatusListener, TargetState.STARTED);
         worker.stop();
 
         PowerMock.verifyAll();
@@ -467,8 +501,7 @@ public class WorkerTest extends ThreadedTest {
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
-        PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
+        EasyMock.expect(connectorFactory.newTask(TestSourceTask.class)).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
@@ -504,7 +537,7 @@ public class WorkerTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), connectorFactory, config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
         Map<String, String> connProps = anyConnectorConfigMap();
@@ -512,7 +545,7 @@ public class WorkerTest extends ThreadedTest {
         connProps.put("key.converter.extra.config", "foo");
         connProps.put(ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG, TestConverter.class.getName());
         connProps.put("value.converter.extra.config", "bar");
-        worker.startTask(TASK_ID, new TaskConfig(origProps), new ConnectorConfig(connProps), taskStatusListener, TargetState.STARTED);
+        worker.startTask(TASK_ID, connProps, origProps, taskStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertEquals(Collections.emptySet(), worker.taskIds());
@@ -546,10 +579,6 @@ public class WorkerTest extends ThreadedTest {
         return props;
     }
 
-    private ConnectorConfig anyConnectorConfig() {
-        return new ConnectorConfig(anyConnectorConfigMap());
-    }
-
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     public static class WorkerTestConnector extends Connector {