You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 07:47:49 UTC

[2/3] kafka git commit: KAFKA-3093: Add Connect status tracking API

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 4b60131..9bd191e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -20,6 +20,10 @@ package org.apache.kafka.connect.storage;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
@@ -219,15 +223,14 @@ public class KafkaConfigStorage {
 
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
         configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 65bd9d0..dfb8c51 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.Callback;
@@ -66,15 +68,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
         offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
new file mode 100644
index 0000000..948a325
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * StatusBackingStore implementation which uses a compacted topic for storage
+ * of connector and task status information. When a state change is observed,
+ * the new state is written to the compacted topic. The new state will not be
+ * visible until it has been read back from the topic.
+ *
+ * In spite of their names, the putSafe() methods cannot guarantee the safety
+ * of the write (since Kafka itself cannot provide such guarantees currently),
+ * but it can avoid specific unsafe conditions. In particular, we putSafe()
+ * allows writes in the following conditions:
+ *
+ * 3) It is (probably) safe to overwrite the state if there is no previous
+ *    value.
+ * 1) It is (probably) safe to overwrite the state if the previous value was
+ *    set by a worker with the same workerId.
+ * 2) It is (probably) safe to overwrite the previous state if the current
+ *    generation is higher than the previous .
+ *
+ * Basically all these conditions do is reduce the window for conflicts. They
+ * obviously cannot take into account in-flight requests.
+ *
+ */
+public class KafkaStatusBackingStore implements StatusBackingStore {
+    private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
+
+    public static final String STATUS_TOPIC_CONFIG = "status.storage.topic";
+
+    private static final String TASK_STATUS_PREFIX = "status-task-";
+    private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
+
+    public static final String STATE_KEY_NAME = "state";
+    public static final String TRACE_KEY_NAME = "trace";
+    public static final String WORKER_ID_KEY_NAME = "worker_id";
+    public static final String GENERATION_KEY_NAME = "generation";
+
+    private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
+            .field(STATE_KEY_NAME, Schema.STRING_SCHEMA)
+            .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
+            .field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA)
+            .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA)
+            .build();
+
+    private final Time time;
+    private final Converter converter;
+    private final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
+    private final Map<String, CacheEntry<ConnectorStatus>> connectors;
+
+    private String topic;
+    private KafkaBasedLog<String, byte[]> kafkaLog;
+    private int generation;
+
+    public KafkaStatusBackingStore(Time time, Converter converter) {
+        this.time = time;
+        this.converter = converter;
+        this.tasks = new Table<>();
+        this.connectors = new HashMap<>();
+    }
+
+    // visible for testing
+    KafkaStatusBackingStore(Time time, Converter converter, String topic, KafkaBasedLog<String, byte[]> kafkaLog) {
+        this(time, converter);
+        this.kafkaLog = kafkaLog;
+        this.topic = topic;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (configs.get(STATUS_TOPIC_CONFIG) == null)
+            throw new ConnectException("Must specify topic for connector status.");
+        this.topic = (String) configs.get(STATUS_TOPIC_CONFIG);
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
+            @Override
+            public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
+                read(record);
+            }
+        };
+        this.kafkaLog = new KafkaBasedLog<>(topic, producerProps, consumerProps, readCallback, time);
+    }
+
+    @Override
+    public void start() {
+        kafkaLog.start();
+
+        // read to the end on startup to ensure that api requests see the most recent states
+        kafkaLog.readToEnd();
+    }
+
+    @Override
+    public void stop() {
+        kafkaLog.stop();
+    }
+
+    @Override
+    public void put(final ConnectorStatus status) {
+        sendConnectorStatus(status, false);
+    }
+
+    @Override
+    public void putSafe(final ConnectorStatus status) {
+        sendConnectorStatus(status, true);
+    }
+
+    @Override
+    public void put(final TaskStatus status) {
+        sendTaskStatus(status, false);
+    }
+
+    @Override
+    public void putSafe(final TaskStatus status) {
+        sendTaskStatus(status, true);
+    }
+
+    @Override
+    public void flush() {
+        kafkaLog.flush();
+    }
+
+    private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
+        String connector  = status.id();
+        CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+        String key = CONNECTOR_STATUS_PREFIX + connector;
+        send(key, status, entry, safeWrite);
+    }
+
+    private void sendTaskStatus(final TaskStatus status, boolean safeWrite) {
+        ConnectorTaskId taskId = status.id();
+        CacheEntry<TaskStatus> entry = getOrAdd(taskId);
+        String key = TASK_STATUS_PREFIX + taskId.connector() + "-" + taskId.task();
+        send(key, status, entry, safeWrite);
+    }
+
+    private <V extends AbstractStatus> void send(final String key,
+                                                 final V status,
+                                                 final CacheEntry<V> entry,
+                                                 final boolean safeWrite) {
+        final int sequence;
+        synchronized (this) {
+            this.generation = status.generation();
+            if (safeWrite && !entry.canWrite(status))
+                return;
+            sequence = entry.increment();
+        }
+
+        final byte[] value = status.state() == ConnectorStatus.State.DESTROYED ? null : serialize(status);
+
+        kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                if (exception != null) {
+                    if (exception instanceof RetriableException) {
+                        synchronized (this) {
+                            if (entry.isDeleted()
+                                    || status.generation() != generation
+                                    || (safeWrite && !entry.canWrite(status, sequence)))
+                                return;
+                        }
+                        kafkaLog.send(key, value, this);
+                    } else {
+                        log.error("Failed to write status update", exception);
+                    }
+                }
+            }
+        });
+    }
+
+    private synchronized CacheEntry<ConnectorStatus> getOrAdd(String connector) {
+        CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+        if (entry == null) {
+            entry = new CacheEntry<>();
+            connectors.put(connector, entry);
+        }
+        return entry;
+    }
+
+    private synchronized void remove(String connector) {
+        CacheEntry<ConnectorStatus> removed = connectors.remove(connector);
+        if (removed != null)
+            removed.delete();
+
+        Map<Integer, CacheEntry<TaskStatus>> tasks = this.tasks.remove(connector);
+        if (tasks != null) {
+            for (CacheEntry<TaskStatus> taskEntry : tasks.values())
+                taskEntry.delete();
+        }
+    }
+
+    private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId task) {
+        CacheEntry<TaskStatus> entry = tasks.get(task.connector(), task.task());
+        if (entry == null) {
+            entry = new CacheEntry<>();
+            tasks.put(task.connector(), task.task(), entry);
+        }
+        return entry;
+    }
+
+    private synchronized void remove(ConnectorTaskId id) {
+        CacheEntry<TaskStatus> removed = tasks.remove(id.connector(), id.task());
+        if (removed != null)
+            removed.delete();
+    }
+
+    @Override
+    public synchronized TaskStatus get(ConnectorTaskId id) {
+        CacheEntry<TaskStatus> entry = tasks.get(id.connector(), id.task());
+        return entry == null ? null : entry.get();
+    }
+
+    @Override
+    public synchronized ConnectorStatus get(String connector) {
+        CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+        return entry == null ? null : entry.get();
+    }
+
+    @Override
+    public synchronized Collection<TaskStatus> getAll(String connector) {
+        List<TaskStatus> res = new ArrayList<>();
+        for (CacheEntry<TaskStatus> statusEntry : tasks.row(connector).values()) {
+            TaskStatus status = statusEntry.get();
+            if (status != null)
+                res.add(status);
+        }
+        return res;
+    }
+
+    @Override
+    public synchronized Set<String> connectors() {
+        return new HashSet<>(connectors.keySet());
+    }
+
+    private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
+        try {
+            SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+            if (!(schemaAndValue.value() instanceof Map)) {
+                log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+                return null;
+            }
+
+            Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+            TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+            String trace = (String) statusMap.get(TRACE_KEY_NAME);
+            String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+            int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+            return new ConnectorStatus(connector, state, trace, workerUrl, generation);
+        } catch (Exception e) {
+            log.error("Failed to deserialize connector status", e);
+            return null;
+        }
+    }
+
+    private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
+        try {
+            SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+            if (!(schemaAndValue.value() instanceof Map)) {
+                log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+                return null;
+            }
+            Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+            TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+            String trace = (String) statusMap.get(TRACE_KEY_NAME);
+            String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+            int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+            return new TaskStatus(taskId, state, workerUrl, generation, trace);
+        } catch (Exception e) {
+            log.error("Failed to deserialize task status", e);
+            return null;
+        }
+    }
+
+    private byte[] serialize(AbstractStatus status) {
+        Struct struct = new Struct(STATUS_SCHEMA_V0);
+        struct.put(STATE_KEY_NAME, status.state().name());
+        if (status.trace() != null)
+            struct.put(TRACE_KEY_NAME, status.trace());
+        struct.put(WORKER_ID_KEY_NAME, status.workerId());
+        struct.put(GENERATION_KEY_NAME, status.generation());
+        return converter.fromConnectData(topic, STATUS_SCHEMA_V0, struct);
+    }
+
+    private String parseConnectorStatusKey(String key) {
+        return key.substring(CONNECTOR_STATUS_PREFIX.length());
+    }
+
+    private ConnectorTaskId parseConnectorTaskId(String key) {
+        String[] parts = key.split("-");
+        if (parts.length < 4) return null;
+
+        try {
+            int taskNum = Integer.parseInt(parts[parts.length - 1]);
+            String connectorName = Utils.join(Arrays.copyOfRange(parts, 2, parts.length - 1), "-");
+            return new ConnectorTaskId(connectorName, taskNum);
+        } catch (NumberFormatException e) {
+            log.warn("Invalid task status key {}", key);
+            return null;
+        }
+    }
+
+    private void readConnectorStatus(String key, byte[] value) {
+        String connector = parseConnectorStatusKey(key);
+        if (connector == null || connector.isEmpty()) {
+            log.warn("Discarding record with invalid connector status key {}", key);
+            return;
+        }
+
+        if (value == null) {
+            log.trace("Removing status for connector {}", connector);
+            remove(connector);
+            return;
+        }
+
+        ConnectorStatus status = parseConnectorStatus(connector, value);
+        if (status == null)
+            return;
+
+        synchronized (KafkaStatusBackingStore.this) {
+            log.trace("Received connector {} status update {}", connector, status);
+            CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+            entry.put(status);
+        }
+    }
+
+    private void readTaskStatus(String key, byte[] value) {
+        ConnectorTaskId id = parseConnectorTaskId(key);
+        if (id == null) {
+            log.warn("Discarding record with invalid task status key {}", key);
+            return;
+        }
+
+        if (value == null) {
+            log.trace("Removing task status for {}", id);
+            remove(id);
+            return;
+        }
+
+        TaskStatus status = parseTaskStatus(id, value);
+        if (status == null) {
+            log.warn("Failed to parse task status with key {}", key);
+            return;
+        }
+
+        synchronized (KafkaStatusBackingStore.this) {
+            log.trace("Received task {} status update {}", id, status);
+            CacheEntry<TaskStatus> entry = getOrAdd(id);
+            entry.put(status);
+        }
+    }
+
+    // visible for testing
+    void read(ConsumerRecord<String, byte[]> record) {
+        String key = record.key();
+        if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
+            readConnectorStatus(key, record.value());
+        } else if (key.startsWith(TASK_STATUS_PREFIX)) {
+            readTaskStatus(key, record.value());
+        } else {
+            log.warn("Discarding record with invalid key {}", key);
+        }
+    }
+
+    private static class CacheEntry<T extends AbstractStatus> {
+        private T value = null;
+        private int sequence = 0;
+        private boolean deleted = false;
+
+        public int increment() {
+            return ++sequence;
+        }
+
+        public void put(T value) {
+            this.value = value;
+        }
+
+        public T get() {
+            return value;
+        }
+
+        public void delete() {
+            this.deleted = true;
+        }
+
+        public boolean isDeleted() {
+            return deleted;
+        }
+
+        public boolean canWrite(T status) {
+            return value != null &&
+                    (value.workerId().equals(status.workerId())
+                    || value.generation() <= status.generation());
+        }
+
+        public boolean canWrite(T status, int sequence) {
+            return canWrite(status) && this.sequence == sequence;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
new file mode 100644
index 0000000..96b235b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.Table;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MemoryStatusBackingStore implements StatusBackingStore {
+    private final Table<String, Integer, TaskStatus> tasks;
+    private final Map<String, ConnectorStatus> connectors;
+
+    public MemoryStatusBackingStore() {
+        this.tasks = new Table<>();
+        this.connectors = new HashMap<>();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public synchronized void put(ConnectorStatus status) {
+        if (status.state() == ConnectorStatus.State.DESTROYED)
+            connectors.remove(status.id());
+        else
+            connectors.put(status.id(), status);
+    }
+
+    @Override
+    public synchronized void putSafe(ConnectorStatus status) {
+        put(status);
+    }
+
+    @Override
+    public synchronized void put(TaskStatus status) {
+        if (status.state() == TaskStatus.State.DESTROYED)
+            tasks.remove(status.id().connector(), status.id().task());
+        else
+            tasks.put(status.id().connector(), status.id().task(), status);
+    }
+
+    @Override
+    public synchronized void putSafe(TaskStatus status) {
+        put(status);
+    }
+
+    @Override
+    public synchronized TaskStatus get(ConnectorTaskId id) {
+        return tasks.get(id.connector(), id.task());
+    }
+
+    @Override
+    public synchronized ConnectorStatus get(String connector) {
+        return connectors.get(connector);
+    }
+
+    @Override
+    public synchronized Collection<TaskStatus> getAll(String connector) {
+        return new HashSet<>(tasks.row(connector).values());
+    }
+
+    @Override
+    public synchronized Set<String> connectors() {
+        return new HashSet<>(connectors.keySet());
+    }
+
+    @Override
+    public void flush() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
new file mode 100644
index 0000000..6464f89
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface StatusBackingStore extends Configurable {
+
+    /**
+     * Start dependent services (if needed)
+     */
+    void start();
+
+    /**
+     * Stop dependent services (if needed)
+     */
+    void stop();
+
+    /**
+     * Set the state of the connector to the given value.
+     * @param status the status of the connector
+     */
+    void put(ConnectorStatus status);
+
+    /**
+     * Safely set the state of the connector to the given value. What is
+     * considered "safe" depends on the implementation, but basically it
+     * means that the store can provide higher assurance that another worker
+     * hasn't concurrently written any conflicting data.
+     * @param status the status of the connector
+     */
+    void putSafe(ConnectorStatus status);
+
+    /**
+     * Set the state of the connector to the given value.
+     * @param status the status of the task
+     */
+    void put(TaskStatus status);
+
+    /**
+     * Safely set the state of the task to the given value. What is
+     * considered "safe" depends on the implementation, but basically it
+     * means that the store can provide higher assurance that another worker
+     * hasn't concurrently written any conflicting data.
+     * @param status the status of the task
+     */
+    void putSafe(TaskStatus status);
+
+    /**
+     * Get the current state of the task.
+     * @param id the id of the task
+     * @return the state or null if there is none
+     */
+    TaskStatus get(ConnectorTaskId id);
+
+    /**
+     * Get the current state of the connector.
+     * @param connector the connector name
+     * @return the state or null if there is none
+     */
+    ConnectorStatus get(String connector);
+
+    /**
+     * Get the states of all tasks for the given connector.
+     * @param connector the connector name
+     * @return a map from task ids to their respective status
+     */
+    Collection<TaskStatus> getAll(String connector);
+
+    /**
+     * Get all cached connectors.
+     * @return the set of connector names
+     */
+    Set<String> connectors();
+
+    /**
+     * Flush any pending writes
+     */
+    void flush();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index c82645c..5ab60cd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -99,8 +99,11 @@ public class KafkaBasedLog<K, V> {
      * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
      * @param time Time interface
      */
-    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
-                         Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+    public KafkaBasedLog(String topic,
+                         Map<String, Object> producerConfigs,
+                         Map<String, Object> consumerConfigs,
+                         Callback<ConsumerRecord<K, V>> consumedCallback,
+                         Time time) {
         this.topic = topic;
         this.producerConfigs = producerConfigs;
         this.consumerConfigs = consumerConfigs;
@@ -140,9 +143,9 @@ public class KafkaBasedLog<K, V> {
         thread = new WorkThread();
         thread.start();
 
-        log.info("Finished reading KafakBasedLog for topic " + topic);
+        log.info("Finished reading KafkaBasedLog for topic " + topic);
 
-        log.info("Started KafakBasedLog for topic " + topic);
+        log.info("Started KafkaBasedLog for topic " + topic);
     }
 
     public void stop() {
@@ -198,6 +201,13 @@ public class KafkaBasedLog<K, V> {
     }
 
     /**
+     * Flush the underlying producer to ensure that all pending writes have been sent.
+     */
+    public void flush() {
+        producer.flush();
+    }
+
+    /**
      * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
      * @return the future associated with the operation
      */
@@ -219,12 +229,18 @@ public class KafkaBasedLog<K, V> {
     private Producer<K, V> createProducer() {
         // Always require producer acks to all to ensure durable writes
         producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        // Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
+        producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
         return new KafkaProducer<>(producerConfigs);
     }
 
     private Consumer<K, V> createConsumer() {
         // Always force reset to the beginning of the log since this class wants to consume all available log data
         consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        // Turn off autocommit since we always want to consume the full log
+        consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         return new KafkaConsumer<>(consumerConfigs);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
new file mode 100644
index 0000000..f36d3e5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Table<R, C, V> {
+
+    private Map<R, Map<C, V>> table = new HashMap<>();
+
+    public V put(R row, C column, V value) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null) {
+            columns = new HashMap<>();
+            table.put(row, columns);
+        }
+        return columns.put(column, value);
+    }
+
+    public V get(R row, C column) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null)
+            return null;
+        return columns.get(column);
+    }
+
+    public Map<C, V> remove(R row) {
+        return table.remove(row);
+    }
+
+    public V remove(R row, C column) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null)
+            return null;
+
+        V value = columns.remove(column);
+        if (columns.isEmpty())
+            table.remove(row);
+        return value;
+    }
+
+    public Map<C, V> row(R row) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null)
+            return Collections.emptyMap();
+        return Collections.unmodifiableMap(columns);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
new file mode 100644
index 0000000..f17023c
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AbstractHerderTest extends EasyMockSupport {
+
+    @Test
+    public void connectorStatus() {
+        String workerId = "workerId";
+        String connector = "connector";
+        int generation = 5;
+        ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+
+        StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(StatusBackingStore.class, String.class)
+                .withArgs(store, workerId)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(herder.generation()).andStubReturn(generation);
+
+        EasyMock.expect(store.get(connector))
+                .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+
+        EasyMock.expect(store.getAll(connector))
+                .andReturn(Collections.singletonList(
+                        new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
+
+        replayAll();
+
+
+        ConnectorStateInfo state = herder.connectorStatus(connector);
+
+        assertEquals(connector, state.name());
+        assertEquals("RUNNING", state.connector().state());
+        assertEquals(1, state.tasks().size());
+        assertEquals(workerId, state.connector().workerId());
+
+        ConnectorStateInfo.TaskState taskState = state.tasks().get(0);
+        assertEquals(0, taskState.id());
+        assertEquals("UNASSIGNED", taskState.state());
+        assertEquals(workerId, taskState.workerId());
+
+        verifyAll();
+    }
+
+    @Test
+    public void taskStatus() {
+        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+        String workerId = "workerId";
+
+        StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(StatusBackingStore.class, String.class)
+                .withArgs(store, workerId)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(herder.generation()).andStubReturn(5);
+
+        final Capture<TaskStatus> statusCapture = EasyMock.newCapture();
+        store.putSafe(EasyMock.capture(statusCapture));
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(store.get(taskId)).andAnswer(new IAnswer<TaskStatus>() {
+            @Override
+            public TaskStatus answer() throws Throwable {
+                return statusCapture.getValue();
+            }
+        });
+
+        replayAll();
+
+        herder.onFailure(taskId, new RuntimeException());
+
+        ConnectorStateInfo.TaskState taskState = herder.taskStatus(taskId);
+        assertEquals(workerId, taskState.workerId());
+        assertEquals("FAILED", taskState.state());
+        assertEquals(0, taskState.id());
+        assertNotNull(taskState.trace());
+
+        verifyAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 978e3a1..6721609 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -98,6 +98,8 @@ public class WorkerSinkTaskTest {
     @Mock
     private Converter valueConverter;
     @Mock
+    private TaskStatus.Listener statusListener;
+    @Mock
     private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
 
@@ -116,7 +118,7 @@ public class WorkerSinkTaskTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
 
         recordsReturned = 0;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index e202209..77f1ed0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -97,11 +97,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
     @Mock private Converter keyConverter;
-    @Mock
-    private Converter valueConverter;
+    @Mock private Converter valueConverter;
     private WorkerSinkTask workerTask;
     @Mock private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+    @Mock private TaskStatus.Listener statusListener;
 
     private long recordsReturned;
 
@@ -120,7 +120,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
 
         recordsReturned = 0;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index c6eb0c5..8e9eb72 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
@@ -90,6 +91,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Mock private OffsetStorageWriter offsetWriter;
     private WorkerSourceTask workerTask;
     @Mock private Future<RecordMetadata> sendFuture;
+    @MockStrict private TaskStatus.Listener statusListener;
 
     private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
 
@@ -113,7 +115,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private void createWorkerTask() {
-        workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, keyConverter, valueConverter, producer,
                 offsetReader, offsetWriter, config, new SystemTime());
     }
 
@@ -125,6 +127,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
         EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the offset writer
@@ -133,6 +137,42 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        executor.submit(workerTask);
+        awaitPolls(pollLatch);
+        workerTask.stop();
+        assertEquals(true, workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailureInPoll() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(1);
+        RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andThrow(exception);
+
+        statusListener.onFailure(taskId, exception);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -153,6 +193,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
         EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
@@ -164,6 +206,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -185,6 +230,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
         EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
@@ -194,6 +241,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -269,6 +319,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index f5213a6..20e3fe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.partialMockBuilder;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
@@ -33,22 +36,32 @@ public class WorkerTaskTest {
 
     @Test
     public void standardStartup() {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class)
-                .withArgs(new ConnectorTaskId("foo", 0))
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+                .withArgs(taskId, statusListener)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
-        EasyMock.expectLastCall();
+        expectLastCall();
 
         workerTask.execute();
-        EasyMock.expectLastCall();
+        expectLastCall();
+
+        statusListener.onStartup(taskId);
+        expectLastCall();
 
         workerTask.close();
-        EasyMock.expectLastCall();
+        expectLastCall();
+
+        statusListener.onShutdown(taskId);
+        expectLastCall();
 
         replay(workerTask);
 
@@ -62,9 +75,13 @@ public class WorkerTaskTest {
 
     @Test
     public void stopBeforeStarting() {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class)
-                .withArgs(new ConnectorTaskId("foo", 0))
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+                .withArgs(taskId, statusListener)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -88,5 +105,62 @@ public class WorkerTaskTest {
         verify(workerTask);
     }
 
+    @Test
+    public void cancelBeforeStopping() throws Exception {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
+        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+                .withArgs(taskId, statusListener)
+                .addMockedMethod("initialize")
+                .addMockedMethod("execute")
+                .addMockedMethod("close")
+                .createStrictMock();
+
+        final CountDownLatch stopped = new CountDownLatch(1);
+        final Thread thread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    stopped.await();
+                } catch (Exception e) {
+                }
+            }
+        };
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        workerTask.execute();
+        expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                thread.start();
+                return null;
+            }
+        });
+
+        statusListener.onStartup(taskId);
+        expectLastCall();
+
+        workerTask.close();
+        expectLastCall();
+
+        // there should be no call to onShutdown()
+
+        replay(workerTask);
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.run();
+
+        workerTask.stop();
+        workerTask.cancel();
+        stopped.countDown();
+        thread.join();
+
+        verify(workerTask);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f33347a..0ca405e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
@@ -60,10 +60,13 @@ public class WorkerTest extends ThreadedTest {
 
     private static final String CONNECTOR_ID = "test-connector";
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+    private static final String WORKER_ID = "localhost:8083";
 
     private WorkerConfig config;
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
+    private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
+    private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
 
     @Before
     public void setup() {
@@ -80,17 +83,14 @@ public class WorkerTest extends ThreadedTest {
     }
 
     @Test
-    public void testAddRemoveConnector() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+    public void testStartAndStopConnector() throws Exception {
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -105,24 +105,29 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Remove
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
+        expectStopStorage();
+
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.addConnector(config, ctx);
+            worker.startConnector(config, ctx, connectorStatusListener);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -137,16 +142,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByAlias() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -161,22 +163,26 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Remove
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
-        PowerMock.replayAll();
+        expectStopStorage();
 
+        PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -189,16 +195,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByShortAlias() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -213,21 +216,26 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Remove
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
+        expectStopStorage();
+
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -241,14 +249,11 @@ public class WorkerTest extends ThreadedTest {
 
     @Test(expected = ConnectException.class)
     public void testStopInvalidConnector() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         worker.stopConnector(CONNECTOR_ID);
@@ -256,16 +261,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testReconfigureConnectorTasks() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -280,6 +282,9 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Reconfigure
         EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
         Map<String, String> taskProps = new HashMap<>();
@@ -290,20 +295,22 @@ public class WorkerTest extends ThreadedTest {
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
+        expectStopStorage();
+
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.addConnector(config, ctx);
+            worker.startConnector(config, ctx, connectorStatusListener);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -327,23 +334,21 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+        expectStartStorage();
 
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+        EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
         PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+                EasyMock.eq(task),
+                EasyMock.anyObject(TaskStatus.Listener.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -356,6 +361,8 @@ public class WorkerTest extends ThreadedTest {
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.initialize(origProps);
         EasyMock.expectLastCall();
+        workerTask.run();
+        EasyMock.expectLastCall();
 
         // Remove
         workerTask.stop();
@@ -363,17 +370,16 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
+        expectStopStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.addTask(taskId, new TaskConfig(origProps));
-        assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
-        worker.stopTask(taskId);
+        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+        assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
+        worker.stopAndAwaitTask(TASK_ID);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
@@ -383,36 +389,33 @@ public class WorkerTest extends ThreadedTest {
 
     @Test(expected = ConnectException.class)
     public void testStopInvalidTask() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        worker.stopTask(TASK_ID);
+        worker.stopAndAwaitTask(TASK_ID);
     }
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+        EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
         PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
         
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+                EasyMock.eq(task),
+                EasyMock.anyObject(TaskStatus.Listener.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -425,27 +428,41 @@ public class WorkerTest extends ThreadedTest {
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.initialize(origProps);
         EasyMock.expectLastCall();
+        workerTask.run();
+        EasyMock.expectLastCall();
 
         // Remove on Worker.stop()
         workerTask.stop();
         EasyMock.expectLastCall();
+
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
+        expectStopStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(TASK_ID, new TaskConfig(origProps));
+        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
         worker.stop();
 
         PowerMock.verifyAll();
     }
 
+    private void expectStartStorage() {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+    }
+
+    private void expectStopStorage() {
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+    }
+
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     private static class WorkerTestConnector extends Connector {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 76f9bc0..d439e96 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -31,6 +31,8 @@ import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
@@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue;
 public class DistributedHerderTest {
     private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
     static {
+        HERDER_CONFIG.put(KafkaStatusBackingStore.STATUS_TOPIC_CONFIG, "status-topic");
         HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
         HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
@@ -122,7 +125,10 @@ public class DistributedHerderTest {
     private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
             Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
 
+    private static final String WORKER_ID = "localhost:8083";
+
     @Mock private KafkaConfigStorage configStorage;
+    @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerGroupMember member;
     private MockTime time;
     private DistributedHerder herder;
@@ -139,11 +145,12 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
         time = new MockTime();
 
-        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
-                new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time);
+        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
+                new DistributedConfig(HERDER_CONFIG), WORKER_ID, worker, statusBackingStore, configStorage, member, MEMBER_URL, time);
         connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
         taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
         rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
+        PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
     }
 
     @Test
@@ -152,10 +159,11 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -168,17 +176,55 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testRebalance() {
+        // Join group and get assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
+
+        // and the new assignment started
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testHaltCleansUpWorker() {
         EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
         EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
-        worker.stopTask(TASK1);
+        worker.stopTasks(Collections.singleton(TASK1));
+        PowerMock.expectLastCall();
+        worker.awaitStopTasks(Collections.singleton(TASK1));
         PowerMock.expectLastCall();
         member.stop();
         PowerMock.expectLastCall();
         configStorage.stop();
         PowerMock.expectLastCall();
+        statusBackingStore.stop();
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
@@ -242,7 +288,8 @@ public class DistributedHerderTest {
         // Start with one connector
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
@@ -286,7 +333,8 @@ public class DistributedHerderTest {
         // Performs rebalance and gets new assignment
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -312,7 +360,8 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -325,7 +374,8 @@ public class DistributedHerderTest {
         EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -360,8 +410,9 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         // Performs rebalance and gets new assignment
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
-                ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0));
-        worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
+                ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
+                Arrays.asList(TASK0));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -381,7 +432,8 @@ public class DistributedHerderTest {
         // Join group and as leader fail to do assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
-                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
         // Reading to end of log times out
         TestFuture<Void> readToEndFuture = new TestFuture<>();
         readToEndFuture.resolveOnGet(new TimeoutException());
@@ -393,10 +445,11 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
 
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -457,7 +510,8 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
@@ -485,7 +539,8 @@ public class DistributedHerderTest {
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
         Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
-        worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
@@ -534,13 +589,19 @@ public class DistributedHerderTest {
     }
 
 
-    private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+    private void expectRebalance(final long offset,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks) {
         expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);
     }
 
     // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
-    private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks,
-                                 final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+    private void expectRebalance(final Collection<String> revokedConnectors,
+                                 final List<ConnectorTaskId> revokedTasks,
+                                 final short error,
+                                 final long offset,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks) {
         member.ensureActive();
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
@@ -549,10 +610,30 @@ public class DistributedHerderTest {
                     rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
                 ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
                         error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
-                rebalanceListener.onAssigned(assignment);
+                rebalanceListener.onAssigned(assignment, 0);
                 return null;
             }
         });
+
+        if (revokedConnectors != null) {
+            for (String connector : revokedConnectors) {
+                worker.stopConnector(connector);
+                PowerMock.expectLastCall();
+            }
+        }
+
+        if (revokedTasks != null && !revokedTasks.isEmpty()) {
+            worker.stopTasks(revokedTasks);
+            PowerMock.expectLastCall();
+            worker.awaitStopTasks(revokedTasks);
+            PowerMock.expectLastCall();
+        }
+
+        if (revokedConnectors != null) {
+            statusBackingStore.flush();
+            PowerMock.expectLastCall();
+        }
+
         member.wakeup();
         PowerMock.expectLastCall();
     }