You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 07:47:48 UTC

[1/3] kafka git commit: KAFKA-3093: Add Connect status tracking API

Repository: kafka
Updated Branches:
  refs/heads/trunk aeb9c2adc -> f7d019ed4


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3eab095..abb62b9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -426,7 +426,7 @@ public class WorkerCoordinatorTest {
         public int assignedCount = 0;
 
         @Override
-        public void onAssigned(ConnectProtocol.Assignment assignment) {
+        public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
             this.assignment = assignment;
             assignedCount++;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 8ef1d24..07d0e3d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -33,6 +36,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
@@ -54,8 +58,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 @RunWith(PowerMockRunner.class)
@@ -64,22 +68,25 @@ public class StandaloneHerderTest {
     private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
     private static final String TOPICS_LIST_STR = "topic1,topic2";
     private static final int DEFAULT_MAX_TASKS = 1;
+    private static final String WORKER_ID = "localhost:8083";
 
     private StandaloneHerder herder;
-    @Mock protected Worker worker;
+
     private Connector connector;
+    @Mock protected Worker worker;
     @Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
+    @Mock protected StatusBackingStore statusBackingStore;
 
     @Before
     public void setup() {
-        worker = PowerMock.createMock(Worker.class);
-        herder = new StandaloneHerder(worker);
+        herder = new StandaloneHerder(WORKER_ID, worker, statusBackingStore);
     }
 
     @Test
     public void testCreateSourceConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
@@ -121,6 +128,10 @@ public class StandaloneHerderTest {
     public void testDestroyConnector() throws Exception {
         connector = PowerMock.createMock(BogusSourceConnector.class);
         expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
+        EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
+        statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
+
         expectDestroy();
 
         PowerMock.replayAll();
@@ -232,7 +243,8 @@ public class StandaloneHerderTest {
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
         Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
-        worker.addConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         EasyMock.expectLastCall();
         // Generate same task config, which should result in no additional action to restart tasks
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
@@ -270,16 +282,19 @@ public class StandaloneHerderTest {
         PowerMock.verifyAll();
     }
 
-    private void expectAdd(String name, Class<? extends Connector> connClass, Class<? extends Task> taskClass,
+    private void expectAdd(String name,
+                           Class<? extends Connector> connClass,
+                           Class<? extends Task> taskClass,
                            boolean sink) throws Exception {
         Map<String, String> connectorProps = connectorConfig(name, connClass);
 
-        worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class));
-        PowerMock.expectLastCall();
+        worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
+                EasyMock.eq(herder));
+        EasyMock.expectLastCall();
 
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
-        PowerMock.expectLastCall();
+        EasyMock.expectLastCall();
 
         // And we should instantiate the tasks. For a sink task, we should see added properties for
         // the input topic partitions
@@ -287,12 +302,15 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
                 .andReturn(Collections.singletonList(generatedTaskProps));
 
-        worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps));
-        PowerMock.expectLastCall();
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder);
+        EasyMock.expectLastCall();
     }
 
     private void expectStop() {
-        worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
+        ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
+        worker.stopTasks(Collections.singleton(task));
+        EasyMock.expectLastCall();
+        worker.awaitStopTasks(Collections.singleton(task));
         EasyMock.expectLastCall();
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
new file mode 100644
index 0000000..cdbab64
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.easymock.Capture;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.junit.Assert.assertEquals;
+
+public class KafkaStatusBackingStoreTest extends EasyMockSupport {
+
+    private static final String STATUS_TOPIC = "status-topic";
+    private static final String WORKER_ID = "localhost:8083";
+    private static final String CONNECTOR = "conn";
+    private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
+
+    @Test
+    public void putConnectorState() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        return null;
+                    }
+                });
+        replayAll();
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putConnectorStateRetriableFailure() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, new TimeoutException());
+                        return null;
+                    }
+                })
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        return null;
+                    }
+                });
+        replayAll();
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putConnectorStateNonRetriableFailure() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, new UnknownServerException());
+                        return null;
+                    }
+                });
+        replayAll();
+
+        // the error is logged and ignored
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putSafeConnectorIgnoresStaleStatus() {
+        byte[] value = new byte[0];
+        String otherWorkerId = "anotherhost:8083";
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        // the persisted came from a different host and has a newer generation
+        Map<String, Object> statusMap = new HashMap<>();
+        statusMap.put("worker_id", otherWorkerId);
+        statusMap.put("state", "RUNNING");
+        statusMap.put("generation", 1L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, statusMap));
+
+        // we're verifying that there is no call to KafkaBasedLog.send
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+        store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, otherWorkerId, 1);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putSafeOverridesValueSetBySameWorker() {
+        final byte[] value = new byte[0];
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        // the persisted came from the same host, but has a newer generation
+        Map<String, Object> firstStatusRead = new HashMap<>();
+        firstStatusRead.put("worker_id", WORKER_ID);
+        firstStatusRead.put("state", "RUNNING");
+        firstStatusRead.put("generation", 1L);
+
+        Map<String, Object> secondStatusRead = new HashMap<>();
+        secondStatusRead.put("worker_id", WORKER_ID);
+        secondStatusRead.put("state", "UNASSIGNED");
+        secondStatusRead.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, firstStatusRead))
+                .andReturn(new SchemaAndValue(null, secondStatusRead));
+
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        store.read(consumerRecord(1, "status-connector-conn", value));
+                        return null;
+                    }
+                });
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+        store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putConnectorStateShouldOverride() {
+        final byte[] value = new byte[0];
+        String otherWorkerId = "anotherhost:8083";
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        // the persisted came from a different host and has a newer generation
+        Map<String, Object> firstStatusRead = new HashMap<>();
+        firstStatusRead.put("worker_id", otherWorkerId);
+        firstStatusRead.put("state", "RUNNING");
+        firstStatusRead.put("generation", 1L);
+
+        Map<String, Object> secondStatusRead = new HashMap<>();
+        secondStatusRead.put("worker_id", WORKER_ID);
+        secondStatusRead.put("state", "UNASSIGNED");
+        secondStatusRead.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, firstStatusRead))
+                .andReturn(new SchemaAndValue(null, secondStatusRead));
+
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        store.read(consumerRecord(1, "status-connector-conn", value));
+                        return null;
+                    }
+                });
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0);
+        store.put(status);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void readConnectorState() {
+        byte[] value = new byte[0];
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        Map<String, Object> statusMap = new HashMap<>();
+        statusMap.put("worker_id", WORKER_ID);
+        statusMap.put("state", "RUNNING");
+        statusMap.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, statusMap));
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-connector-conn", value));
+
+        ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+        assertEquals(status, store.get(CONNECTOR));
+
+        verifyAll();
+    }
+
+    @Test
+    public void putTaskState() {
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        byte[] value = new byte[0];
+        expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+                .andStubReturn(value);
+
+        final Capture<Callback> callbackCapture = newCapture();
+        kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), capture(callbackCapture));
+        expectLastCall()
+                .andAnswer(new IAnswer<Void>() {
+                    @Override
+                    public Void answer() throws Throwable {
+                        callbackCapture.getValue().onCompletion(null, null);
+                        return null;
+                    }
+                });
+        replayAll();
+
+        TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+        store.put(status);
+
+        // state is not visible until read back from the log
+        assertEquals(null, store.get(TASK));
+
+        verifyAll();
+    }
+
+    @Test
+    public void readTaskState() {
+        byte[] value = new byte[0];
+
+        KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+        Converter converter = mock(Converter.class);
+        KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+        Map<String, Object> statusMap = new HashMap<>();
+        statusMap.put("worker_id", WORKER_ID);
+        statusMap.put("state", "RUNNING");
+        statusMap.put("generation", 0L);
+
+        expect(converter.toConnectData(STATUS_TOPIC, value))
+                .andReturn(new SchemaAndValue(null, statusMap));
+
+        replayAll();
+
+        store.read(consumerRecord(0, "status-task-conn-0", value));
+
+        TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+        assertEquals(status, store.get(TASK));
+
+        verifyAll();
+    }
+
+    private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
+        return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
+                TimestampType.CREATE_TIME, key, value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
new file mode 100644
index 0000000..40aee37
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class MemoryStatusBackingStoreTest {
+
+    @Test
+    public void putAndGetConnectorStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        ConnectorStatus status = new ConnectorStatus("connector", ConnectorStatus.State.RUNNING, "localhost:8083", 0);
+        store.put(status);
+        assertEquals(status, store.get("connector"));
+    }
+
+    @Test
+    public void putAndGetTaskStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+        TaskStatus status = new TaskStatus(taskId, ConnectorStatus.State.RUNNING, "localhost:8083", 0);
+        store.put(status);
+        assertEquals(status, store.get(taskId));
+        assertEquals(Collections.singleton(status), store.getAll("connector"));
+    }
+
+    @Test
+    public void deleteConnectorStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        store.put(new ConnectorStatus("connector", ConnectorStatus.State.RUNNING, "localhost:8083", 0));
+        store.put(new ConnectorStatus("connector", ConnectorStatus.State.DESTROYED, "localhost:8083", 0));
+        assertNull(store.get("connector"));
+    }
+
+    @Test
+    public void deleteTaskStatus() {
+        MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+        store.put(new TaskStatus(taskId, ConnectorStatus.State.RUNNING, "localhost:8083", 0));
+        store.put(new TaskStatus(taskId, ConnectorStatus.State.DESTROYED, "localhost:8083", 0));
+        assertNull(store.get(taskId));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
new file mode 100644
index 0000000..ee266b5
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TableTest {
+
+    @Test
+    public void basicOperations() {
+        Table<String, Integer, String> table = new Table<>();
+        table.put("foo", 5, "bar");
+        table.put("foo", 6, "baz");
+        assertEquals("bar", table.get("foo", 5));
+        assertEquals("baz", table.get("foo", 6));
+
+        Map<Integer, String> row = table.row("foo");
+        assertEquals("bar", row.get(5));
+        assertEquals("baz", row.get(6));
+
+        assertEquals("bar", table.remove("foo", 5));
+        assertNull(table.get("foo", 5));
+        assertEquals("baz", table.remove("foo", 6));
+        assertNull(table.get("foo", 6));
+        assertTrue(table.row("foo").isEmpty());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index a6e902f..76336e1 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -181,10 +181,12 @@ class ConnectStandaloneService(ConnectServiceBase):
 class ConnectDistributedService(ConnectServiceBase):
     """Runs Kafka Connect in distributed mode."""
 
-    def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets", configs_topic="connect-configs"):
+    def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
+                 configs_topic="connect-configs", status_topic="connect-status"):
         super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
         self.offsets_topic = offsets_topic
         self.configs_topic = configs_topic
+        self.status_topic = status_topic
 
     def start_cmd(self, node):
         cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
index 1f82e63..9aa16ab 100644
--- a/tests/kafkatest/tests/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect_distributed_test.py
@@ -33,6 +33,7 @@ class ConnectDistributedTest(KafkaTest):
     TOPIC = "test"
     OFFSETS_TOPIC = "connect-offsets"
     CONFIG_TOPIC = "connect-configs"
+    STATUS_TOPIC = "connect-status"
 
     # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
     # across all nodes.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_rest_test.py b/tests/kafkatest/tests/connect_rest_test.py
index 8e713d4..69a8cb7 100644
--- a/tests/kafkatest/tests/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect_rest_test.py
@@ -30,6 +30,7 @@ class ConnectRestApiTest(KafkaTest):
     TOPIC = "test"
     OFFSETS_TOPIC = "connect-offsets"
     CONFIG_TOPIC = "connect-configs"
+    STATUS_TOPIC = "connect-status"
 
     # Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
     # across all nodes.
@@ -160,4 +161,5 @@ class ConnectRestApiTest(KafkaTest):
             return []
 
     def _config_dict_from_props(self, connector_props):
-        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
\ No newline at end of file
+        return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties
index 8a9f6c7..7a7440a 100644
--- a/tests/kafkatest/tests/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/templates/connect-distributed.properties
@@ -33,6 +33,7 @@ internal.value.converter.schemas.enable=false
 
 offset.storage.topic={{ OFFSETS_TOPIC }}
 config.storage.topic={{ CONFIG_TOPIC }}
+status.storage.topic={{ STATUS_TOPIC }}
 
 # Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
 offset.flush.interval.ms=5000


[3/3] kafka git commit: KAFKA-3093: Add Connect status tracking API

Posted by ew...@apache.org.
KAFKA-3093: Add Connect status tracking API

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #920 from hachikuji/KAFKA-3093


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7d019ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7d019ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7d019ed

Branch: refs/heads/trunk
Commit: f7d019ed408fa988129be9af3689bfa4878bc627
Parents: aeb9c2a
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 23 22:47:31 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 23 22:47:31 2016 -0800

----------------------------------------------------------------------
 config/connect-distributed.properties           |   3 +-
 .../connect/connector/ConnectorContext.java     |   7 +
 .../kafka/connect/file/FileStreamSinkTask.java  |   2 +-
 .../kafka/connect/cli/ConnectDistributed.java   |  28 +-
 .../kafka/connect/cli/ConnectStandalone.java    |  13 +-
 .../kafka/connect/runtime/AbstractHerder.java   | 156 +++++++
 .../kafka/connect/runtime/AbstractStatus.java   | 100 ++++
 .../kafka/connect/runtime/ConnectorStatus.java  |  58 +++
 .../apache/kafka/connect/runtime/Herder.java    |  16 +-
 .../connect/runtime/HerderConnectorContext.java |  11 +-
 .../kafka/connect/runtime/TaskStatus.java       |  53 +++
 .../apache/kafka/connect/runtime/Worker.java    | 210 +++++----
 .../kafka/connect/runtime/WorkerSinkTask.java   |   4 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |  18 +-
 .../kafka/connect/runtime/WorkerTask.java       |  47 +-
 .../runtime/distributed/DistributedHerder.java  | 107 +++--
 .../runtime/distributed/WorkerCoordinator.java  |   2 +-
 .../runtime/distributed/WorkerGroupMember.java  |   9 +-
 .../distributed/WorkerRebalanceListener.java    |   2 +-
 .../kafka/connect/runtime/rest/RestServer.java  |   9 +-
 .../rest/entities/ConnectorStateInfo.java       | 108 +++++
 .../rest/resources/ConnectorsResource.java      |  21 +-
 .../runtime/standalone/StandaloneHerder.java    |  51 +-
 .../connect/storage/KafkaConfigStorage.java     |  15 +-
 .../storage/KafkaOffsetBackingStore.java        |  13 +-
 .../storage/KafkaStatusBackingStore.java        | 461 +++++++++++++++++++
 .../storage/MemoryStatusBackingStore.java       | 105 +++++
 .../connect/storage/StatusBackingStore.java     | 100 ++++
 .../kafka/connect/util/KafkaBasedLog.java       |  24 +-
 .../org/apache/kafka/connect/util/Table.java    |  65 +++
 .../connect/runtime/AbstractHerderTest.java     | 116 +++++
 .../connect/runtime/WorkerSinkTaskTest.java     |   4 +-
 .../runtime/WorkerSinkTaskThreadedTest.java     |   6 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |  54 ++-
 .../kafka/connect/runtime/WorkerTaskTest.java   |  88 +++-
 .../kafka/connect/runtime/WorkerTest.java       | 161 ++++---
 .../distributed/DistributedHerderTest.java      | 121 ++++-
 .../distributed/WorkerCoordinatorTest.java      |   2 +-
 .../standalone/StandaloneHerderTest.java        |  42 +-
 .../storage/KafkaStatusBackingStoreTest.java    | 373 +++++++++++++++
 .../storage/MemoryStatusBackingStoreTest.java   |  66 +++
 .../apache/kafka/connect/util/TableTest.java    |  48 ++
 tests/kafkatest/services/connect.py             |   4 +-
 .../kafkatest/tests/connect_distributed_test.py |   1 +
 tests/kafkatest/tests/connect_rest_test.py      |   4 +-
 .../templates/connect-distributed.properties    |   1 +
 46 files changed, 2611 insertions(+), 298 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index 9ec63db..46bd3bc 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -39,4 +39,5 @@ internal.value.converter.schemas.enable=false
 offset.storage.topic=connect-offsets
 # Flush much faster than normal, which is useful for testing/debugging
 offset.flush.interval.ms=10000
-config.storage.topic=connect-configs
\ No newline at end of file
+config.storage.topic=connect-configs
+status.storage.topic=connect-status
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
index 2a06484..c8a06e8 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
@@ -30,4 +30,11 @@ public interface ConnectorContext {
      * added/removed) and the running Tasks will need to be modified.
      */
     void requestTaskReconfiguration();
+
+    /**
+     * Raise an unrecoverable exception to the Connect framework. This will cause the status of the
+     * connector to transition to FAILED.
+     * @param e Exception to be raised.
+     */
+    void raiseError(Exception e);
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 83ba6d4..09d4ed8 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -84,7 +84,7 @@ public class FileStreamSinkTask extends SinkTask {
 
     @Override
     public void stop() {
-        if (outputStream != System.out)
+        if (outputStream != null && outputStream != System.out)
             outputStream.close();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 5ad032e..bc5b75a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -18,6 +18,8 @@
 package org.apache.kafka.connect.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
 import org.apache.kafka.connect.runtime.Worker;
@@ -25,9 +27,12 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
 import java.util.Collections;
 import java.util.Map;
 
@@ -54,12 +59,29 @@ public class ConnectDistributed {
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
+        Time time = new SystemTime();
         DistributedConfig config = new DistributedConfig(workerProps);
-        Worker worker = new Worker(config, new KafkaOffsetBackingStore());
+
         RestServer rest = new RestServer(config);
-        DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl());
+        URI advertisedUrl = rest.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        offsetBackingStore.configure(config.originals());
+
+        Worker worker = new Worker(workerId, time, config, offsetBackingStore);
+
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
+        statusBackingStore.configure(config.originals());
+
+        DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
         final Connect connect = new Connect(worker, herder, rest);
-        connect.start();
+        try {
+            connect.start();
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+        }
 
         // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
         connect.awaitStop();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index f89a72a..6c4335e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.connect.cli;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -33,6 +35,7 @@ import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -63,9 +66,15 @@ public class ConnectStandalone {
         Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
                 Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
 
+        Time time = new SystemTime();
         StandaloneConfig config = new StandaloneConfig(workerProps);
-        Worker worker = new Worker(config, new FileOffsetBackingStore());
+
         RestServer rest = new RestServer(config);
+        URI advertisedUrl = rest.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
+
         Herder herder = new StandaloneHerder(worker);
         final Connect connect = new Connect(worker, herder, rest);
         connect.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
new file mode 100644
index 0000000..ca85d87
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
+ * must invoke the lifecycle hooks appropriately.
+ *
+ * This class takes the following approach for sending status updates to the backing store:
+ *
+ * 1) When the connector or task is starting, we overwrite the previous state blindly. This ensures that
+ *    every rebalance will reset the state of tasks to the proper state. The intuition is that there should
+ *    be less chance of write conflicts when the worker has just received its assignment and is starting tasks.
+ *    In particular, this prevents us from depending on the generation absolutely. If the group disappears
+ *    and the generation is reset, then we'll overwrite the status information with the older (and larger)
+ *    generation with the updated one. The danger of this approach is that slow starting tasks may cause the
+ *    status to be overwritten after a rebalance has completed.
+ *
+ * 2) If the connector or task fails or is shutdown, we use {@link StatusBackingStore#putSafe(ConnectorStatus)},
+ *    which provides a little more protection if the worker is no longer in the group (in which case the
+ *    task may have already been started on another worker). Obviously this is still racy. If the task has just
+ *    started on another worker, we may not have the updated status cached yet. In this case, we'll overwrite
+ *    the value which will cause the state to be inconsistent (most likely until the next rebalance). Until
+ *    we have proper producer groups with fenced groups, there is not much else we can do.
+ */
+public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
+
+    protected final StatusBackingStore statusBackingStore;
+    private final String workerId;
+
+    public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) {
+        this.statusBackingStore = statusBackingStore;
+        this.workerId = workerId;
+    }
+
+    protected abstract int generation();
+
+    protected void startServices() {
+        this.statusBackingStore.start();
+    }
+
+    protected void stopServices() {
+        this.statusBackingStore.stop();
+    }
+
+    @Override
+    public void onStartup(String connector) {
+        statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
+                workerId, generation()));
+    }
+
+    @Override
+    public void onShutdown(String connector) {
+        statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
+                workerId, generation()));
+    }
+
+    @Override
+    public void onFailure(String connector, Throwable cause) {
+        statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
+                trace(cause), workerId, generation()));
+    }
+
+    @Override
+    public void onStartup(ConnectorTaskId id) {
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
+    }
+
+    @Override
+    public void onFailure(ConnectorTaskId id, Throwable cause) {
+        statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause)));
+    }
+
+    @Override
+    public void onShutdown(ConnectorTaskId id) {
+        statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation()));
+    }
+
+    @Override
+    public void onDeletion(String connector) {
+        for (TaskStatus status : statusBackingStore.getAll(connector))
+            statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
+        statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
+    }
+
+    @Override
+    public ConnectorStateInfo connectorStatus(String connName) {
+        ConnectorStatus connector = statusBackingStore.get(connName);
+        if (connector == null)
+            throw new NotFoundException("No status found for connector " + connName);
+
+        Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
+
+        ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
+                connector.state().toString(), connector.workerId(), connector.trace());
+        List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
+
+        for (TaskStatus status : tasks) {
+            taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
+                    status.state().toString(), status.workerId(), status.trace()));
+        }
+
+        Collections.sort(taskStates);
+
+        return new ConnectorStateInfo(connName, connectorState, taskStates);
+    }
+
+    @Override
+    public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
+        TaskStatus status = statusBackingStore.get(id);
+
+        if (status == null)
+            throw new NotFoundException("No status found for task " + id);
+
+        return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(),
+                status.workerId(), status.trace());
+    }
+
+    private String trace(Throwable t) {
+        ByteArrayOutputStream output = new ByteArrayOutputStream();
+        t.printStackTrace(new PrintStream(output));
+        try {
+            return output.toString("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            return null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
new file mode 100644
index 0000000..4f31be1
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public abstract class AbstractStatus<T> {
+
+    public enum State {
+        UNASSIGNED,
+        RUNNING,
+        FAILED,
+        DESTROYED,
+    }
+
+    private final T id;
+    private final State state;
+    private final String trace;
+    private final String workerId;
+    private final int generation;
+
+    public AbstractStatus(T id,
+                          State state,
+                          String workerId,
+                          int generation,
+                          String trace) {
+        this.id = id;
+        this.state = state;
+        this.workerId = workerId;
+        this.generation = generation;
+        this.trace = trace;
+    }
+
+    public T id() {
+        return id;
+    }
+
+    public State state() {
+        return state;
+    }
+
+    public String trace() {
+        return trace;
+    }
+
+    public String workerId() {
+        return workerId;
+    }
+
+    public int generation() {
+        return generation;
+    }
+
+    @Override
+    public String toString() {
+        return "Status{" +
+                "id=" + id +
+                ", state=" + state +
+                ", workerId='" + workerId + '\'' +
+                ", generation=" + generation +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        AbstractStatus<?> that = (AbstractStatus<?>) o;
+
+        if (generation != that.generation) return false;
+        if (id != null ? !id.equals(that.id) : that.id != null) return false;
+        if (state != that.state) return false;
+        if (trace != null ? !trace.equals(that.trace) : that.trace != null) return false;
+        return workerId != null ? workerId.equals(that.workerId) : that.workerId == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = id != null ? id.hashCode() : 0;
+        result = 31 * result + (state != null ? state.hashCode() : 0);
+        result = 31 * result + (trace != null ? trace.hashCode() : 0);
+        result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
+        result = 31 * result + generation;
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
new file mode 100644
index 0000000..d9a2eba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public class ConnectorStatus extends AbstractStatus<String> {
+
+    public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) {
+        super(connector, state, workerUrl, generation, msg);
+    }
+
+    public ConnectorStatus(String connector, State state, String workerUrl, int generation) {
+        super(connector, state, workerUrl, generation, null);
+    }
+
+    public interface Listener {
+
+        /**
+         * Invoked after connector has successfully been shutdown.
+         * @param connector The connector name
+         */
+        void onShutdown(String connector);
+
+        /**
+         * Invoked from the Connector using {@link org.apache.kafka.connect.connector.ConnectorContext#raiseError(Exception)}.
+         * Note that no shutdown event will follow after the task has been failed.
+         * @param connector The connector name
+         * @param cause Error raised from the connector.
+         */
+        void onFailure(String connector, Throwable cause);
+
+        /**
+         * Invoked after successful startup of the connector.
+         * @param connector The connector name
+         */
+        void onStartup(String connector);
+
+        /**
+         * Invoked when the connector is deleted through the REST API.
+         * @param connector The connector name
+         */
+        void onDeletion(String connector);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index fc0689c..95c7700 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -18,8 +18,10 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.Collection;
 import java.util.List;
@@ -61,7 +63,7 @@ public interface Herder {
      * @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request
      *         (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is
      *         also not the leader
-     * @throws ConnectException if this node is the leader, but still cannot resolve the
+     * @throws org.apache.kafka.connect.errors.ConnectException if this node is the leader, but still cannot resolve the
      *         request (e.g., it is not in sync with other worker's config state)
      */
     void connectors(Callback<Collection<String>> callback);
@@ -113,6 +115,18 @@ public interface Herder {
      */
     void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
 
+    /**
+     * Lookup the current status of a connector.
+     * @param connName name of the connector
+     */
+    ConnectorStateInfo connectorStatus(String connName);
+
+    /**
+     * Lookup the status of the a task.
+     * @param id id of the task
+     */
+    ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id);
+
 
     class Created<T> {
         private final boolean created;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
index e3294ef..bd933f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
@@ -24,10 +24,10 @@ import org.apache.kafka.connect.connector.ConnectorContext;
  */
 public class HerderConnectorContext implements ConnectorContext {
 
-    private Herder herder;
-    private String connectorName;
+    private final AbstractHerder herder;
+    private final String connectorName;
 
-    public HerderConnectorContext(Herder herder, String connectorName) {
+    public HerderConnectorContext(AbstractHerder herder, String connectorName) {
         this.herder = herder;
         this.connectorName = connectorName;
     }
@@ -38,4 +38,9 @@ public class HerderConnectorContext implements ConnectorContext {
         // Distributed herder will forward the request to the leader if needed
         herder.requestTaskReconfiguration(connectorName);
     }
+
+    @Override
+    public void raiseError(Exception e) {
+        herder.onFailure(connectorName, e);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
new file mode 100644
index 0000000..3542eb8
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
+
+    public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace) {
+        super(id, state, workerUrl, generation, trace);
+    }
+
+    public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation) {
+        super(id, state, workerUrl, generation, null);
+    }
+
+    public interface Listener {
+
+        /**
+         * Invoked after successful startup of the task.
+         * @param id The id of the task
+         */
+        void onStartup(ConnectorTaskId id);
+
+        /**
+         * Invoked if the task raises an error. No shutdown event will follow.
+         * @param id The id of the task
+         * @param cause The error raised by the task.
+         */
+        void onFailure(ConnectorTaskId id, Throwable cause);
+
+        /**
+         * Invoked after successful shutdown of the task.
+         * @param id The id of the task
+         */
+        void onShutdown(ConnectorTaskId id);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0a4bb7f..8e74fec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -66,25 +65,25 @@ public class Worker {
 
     private final ExecutorService executor;
     private final Time time;
+    private final String workerId;
     private final WorkerConfig config;
+    private final Converter keyConverter;
+    private final Converter valueConverter;
+    private final Converter internalKeyConverter;
+    private final Converter internalValueConverter;
+    private final OffsetBackingStore offsetBackingStore;
 
-    private Converter keyConverter;
-    private Converter valueConverter;
-    private Converter internalKeyConverter;
-    private Converter internalValueConverter;
-    private OffsetBackingStore offsetBackingStore;
-    private HashMap<String, Connector> connectors = new HashMap<>();
+    private HashMap<String, WorkerConnector> connectors = new HashMap<>();
     private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
     private KafkaProducer<byte[], byte[]> producer;
     private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
 
-    public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
-        this(new SystemTime(), config, offsetBackingStore);
-    }
-
-    @SuppressWarnings("unchecked")
-    public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+    public Worker(String workerId,
+                  Time time,
+                  WorkerConfig config,
+                  OffsetBackingStore offsetBackingStore) {
         this.executor = Executors.newCachedThreadPool();
+        this.workerId = workerId;
         this.time = time;
         this.config = config;
         this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
@@ -132,34 +131,18 @@ public class Worker {
         long started = time.milliseconds();
         long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
 
-        for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
-            Connector conn = entry.getValue();
+        for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
+            WorkerConnector conn = entry.getValue();
             log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
                     "Worker is stopped.", conn);
-            try {
-                conn.stop();
-            } catch (ConnectException e) {
-                log.error("Error while shutting down connector " + conn, e);
-            }
+            conn.stop();
         }
 
-        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
-            WorkerTask task = entry.getValue();
-            log.warn("Shutting down task {} uncleanly; herder should have shut down "
-                    + "tasks before the Worker is stopped.", task);
-            try {
-                task.stop();
-            } catch (ConnectException e) {
-                log.error("Error while shutting down task " + task, e);
-            }
-        }
-
-        for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
-            WorkerTask task = entry.getValue();
-            log.debug("Waiting for task {} to finish shutting down", task);
-            if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
-                log.error("Graceful shutdown of task {} failed.", task);
-        }
+        Collection<ConnectorTaskId> taskIds = tasks.keySet();
+        log.warn("Shutting down tasks {} uncleanly; herder should have shut down "
+                + "tasks before the Worker is stopped.", taskIds);
+        stopTasks(taskIds);
+        awaitStopTasks(taskIds);
 
         long timeoutMs = limit - time.milliseconds();
         sourceTaskOffsetCommitter.close(timeoutMs);
@@ -169,16 +152,12 @@ public class Worker {
         log.info("Worker stopped");
     }
 
-    public WorkerConfig config() {
-        return config;
-    }
-
     /**
      * Add a new connector.
      * @param connConfig connector configuration
      * @param ctx context for the connector
      */
-    public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+    public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener lifecycleListener) {
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
 
@@ -188,22 +167,25 @@ public class Worker {
             throw new ConnectException("Connector with name " + connName + " already exists");
 
         final Connector connector = instantiateConnector(connClass);
+        WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, lifecycleListener);
+
         log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
-        connector.initialize(ctx);
+        workerConnector.initialize();
         try {
-            connector.start(connConfig.originalsStrings());
+            workerConnector.start(connConfig.originalsStrings());
         } catch (ConnectException e) {
             throw new ConnectException("Connector threw an exception while starting", e);
         }
 
-        connectors.put(connName, connector);
+        connectors.put(connName, workerConnector);
 
         log.info("Finished creating connector {}", connName);
     }
 
     /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
     public boolean isSinkConnector(String connName) {
-        return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass());
+        WorkerConnector workerConnector = connectors.get(connName);
+        return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
     }
 
     private Class<? extends Connector> getConnectorClass(String connectorAlias) {
@@ -267,10 +249,11 @@ public class Worker {
     public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
         log.trace("Reconfiguring connector tasks for {}", connName);
 
-        Connector connector = connectors.get(connName);
-        if (connector == null)
+        WorkerConnector workerConnector = connectors.get(connName);
+        if (workerConnector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
+        Connector connector = workerConnector.delegate;
         List<Map<String, String>> result = new ArrayList<>();
         String taskClassName = connector.taskClass().getName();
         for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
@@ -286,16 +269,11 @@ public class Worker {
     public void stopConnector(String connName) {
         log.info("Stopping connector {}", connName);
 
-        Connector connector = connectors.get(connName);
+        WorkerConnector connector = connectors.get(connName);
         if (connector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
-        try {
-            connector.stop();
-        } catch (ConnectException e) {
-            log.error("Error shutting down connector {}: ", connector, e);
-        }
-
+        connector.stop();
         connectors.remove(connName);
 
         log.info("Stopped connector {}", connName);
@@ -313,7 +291,7 @@ public class Worker {
      * @param id Globally unique ID for this task.
      * @param taskConfig the parsed task configuration
      */
-    public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+    public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener lifecycleListener) {
         log.info("Creating task {}", id);
 
         if (tasks.containsKey(id)) {
@@ -327,33 +305,35 @@ public class Worker {
         final Task task = instantiateTask(taskClass);
         log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
+        final WorkerTask workerTask = buildWorkerTask(id, task, lifecycleListener);
+
+        // Start the task before adding modifying any state, any exceptions are caught higher up the
+        // call chain and there's no cleanup to do here
+        workerTask.initialize(taskConfig.originalsStrings());
+        executor.submit(workerTask);
+
+        if (task instanceof SourceTask) {
+            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+        }
+        tasks.put(id, workerTask);
+    }
+
+    private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener lifecycleListener) {
         // Decide which type of worker task we need based on the type of task.
-        final WorkerTask workerTask;
         if (task instanceof SourceTask) {
-            SourceTask sourceTask = (SourceTask) task;
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
-            workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
+            return new WorkerSourceTask(id, (SourceTask) task, lifecycleListener, keyConverter, valueConverter, producer,
                     offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {
-            workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+            return new WorkerSinkTask(id, (SinkTask) task, lifecycleListener, config, keyConverter, valueConverter, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
         }
-
-        // Start the task before adding modifying any state, any exceptions are caught higher up the
-        // call chain and there's no cleanup to do here
-        workerTask.initialize(taskConfig.originalsStrings());
-        executor.submit(workerTask);
-
-        if (task instanceof SourceTask) {
-            WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
-            sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
-        }
-        tasks.put(id, workerTask);
     }
 
     private static Task instantiateTask(Class<? extends Task> taskClass) {
@@ -364,16 +344,39 @@ public class Worker {
         }
     }
 
-    public void stopTask(ConnectorTaskId id) {
-        log.info("Stopping task {}", id);
+    public void stopTasks(Collection<ConnectorTaskId> ids) {
+        for (ConnectorTaskId id : ids)
+            stopTask(getTask(id));
+    }
 
-        WorkerTask task = getTask(id);
+    public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
+        long now = time.milliseconds();
+        long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+        for (ConnectorTaskId id : ids) {
+            long remaining = Math.max(0, deadline - time.milliseconds());
+            awaitStopTask(getTask(id), remaining);
+        }
+    }
+
+    private void awaitStopTask(WorkerTask task, long timeout) {
+        if (!task.awaitStop(timeout)) {
+            log.error("Graceful stop of task {} failed.", task.id());
+            task.cancel();
+        }
+        tasks.remove(task.id());
+    }
+
+    private void stopTask(WorkerTask task) {
+        log.info("Stopping task {}", task.id());
         if (task instanceof WorkerSourceTask)
-            sourceTaskOffsetCommitter.remove(id);
+            sourceTaskOffsetCommitter.remove(task.id());
         task.stop();
-        if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
-            log.error("Graceful stop of task {} failed.", task);
-        tasks.remove(id);
+    }
+
+    public void stopAndAwaitTask(ConnectorTaskId id) {
+        WorkerTask task = getTask(id);
+        stopTask(task);
+        awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
     }
 
     /**
@@ -400,4 +403,55 @@ public class Worker {
         return internalValueConverter;
     }
 
+    public String workerId() {
+        return workerId;
+    }
+
+    private static class WorkerConnector  {
+        private final String connName;
+        private final ConnectorStatus.Listener lifecycleListener;
+        private final ConnectorContext ctx;
+        private final Connector delegate;
+
+        public WorkerConnector(String connName,
+                               Connector delegate,
+                               ConnectorContext ctx,
+                               ConnectorStatus.Listener lifecycleListener) {
+            this.connName = connName;
+            this.ctx = ctx;
+            this.delegate = delegate;
+            this.lifecycleListener = lifecycleListener;
+        }
+
+        public void initialize() {
+            delegate.initialize(ctx);
+        }
+
+        public void start(Map<String, String> props) {
+            try {
+                delegate.start(props);
+                lifecycleListener.onStartup(connName);
+            } catch (Throwable t) {
+                log.error("Error while starting connector {}", connName, t);
+                lifecycleListener.onFailure(connName, t);
+            }
+        }
+
+        public void stop() {
+            try {
+                delegate.stop();
+                lifecycleListener.onShutdown(connName);
+            } catch (Throwable t) {
+                log.error("Error while shutting down connector {}", connName, t);
+                lifecycleListener.onFailure(connName, t);
+            }
+        }
+
+        @Override
+        public String toString() {
+            return delegate.toString();
+        }
+
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8c5bd9f..eb64355 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -76,11 +76,12 @@ class WorkerSinkTask extends WorkerTask {
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
+                          TaskStatus.Listener lifecycleListener,
                           WorkerConfig workerConfig,
                           Converter keyConverter,
                           Converter valueConverter,
                           Time time) {
-        super(id);
+        super(id, lifecycleListener);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -184,6 +185,7 @@ class WorkerSinkTask extends WorkerTask {
      * Initializes and starts the SinkTask.
      */
     protected void initializeAndStart() {
+        log.debug("Initializing task {} with config {}", id, taskConfig);
         String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
         if (topicsStr == null || topicsStr.isEmpty())
             throw new ConnectException("Sink tasks require a list of topics.");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 562e03e..8542f4c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -17,13 +17,13 @@
 
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
@@ -76,6 +76,7 @@ class WorkerSourceTask extends WorkerTask {
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
+                            TaskStatus.Listener lifecycleListener,
                             Converter keyConverter,
                             Converter valueConverter,
                             KafkaProducer<byte[], byte[]> producer,
@@ -83,7 +84,7 @@ class WorkerSourceTask extends WorkerTask {
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
                             Time time) {
-        super(id);
+        super(id, lifecycleListener);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -147,16 +148,13 @@ class WorkerSourceTask extends WorkerTask {
             }
         } catch (InterruptedException e) {
             // Ignore and allow to exit.
-        } catch (Throwable t) {
-            log.error("Task {} threw an uncaught and unrecoverable exception", id);
-            log.error("Task is being killed and will not recover until manually restarted:", t);
-            // It should still be safe to let this fall through and commit offsets since this exception would have
+        } finally {
+            // It should still be safe to commit offsets since any exception would have
             // simply resulted in not getting more records but all the existing records should be ok to flush
             // and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
             // to fail.
+            commitOffsets();
         }
-
-        commitOffsets();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ecaeb7b..cc69c0f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -37,13 +37,21 @@ abstract class WorkerTask implements Runnable {
     protected final ConnectorTaskId id;
     private final AtomicBoolean stopping;
     private final AtomicBoolean running;
+    private final AtomicBoolean cancelled;
     private final CountDownLatch shutdownLatch;
+    private final TaskStatus.Listener lifecycleListener;
 
-    public WorkerTask(ConnectorTaskId id) {
+    public WorkerTask(ConnectorTaskId id, TaskStatus.Listener lifecycleListener) {
         this.id = id;
         this.stopping = new AtomicBoolean(false);
         this.running = new AtomicBoolean(false);
+        this.cancelled = new AtomicBoolean(false);
         this.shutdownLatch = new CountDownLatch(1);
+        this.lifecycleListener = lifecycleListener;
+    }
+
+    public ConnectorTaskId id() {
+        return id;
     }
 
     /**
@@ -61,9 +69,17 @@ abstract class WorkerTask implements Runnable {
     }
 
     /**
+     * Cancel this task. This won't actually stop it, but it will prevent the state from being
+     * updated when it eventually does shutdown.
+     */
+    public void cancel() {
+        this.cancelled.set(true);
+    }
+
+    /**
      * Wait for this task to finish stopping.
      *
-     * @param timeoutMs
+     * @param timeoutMs time in milliseconds to await stop
      * @return true if successful, false if the timeout was reached
      */
     public boolean awaitStop(long timeoutMs) {
@@ -85,19 +101,23 @@ abstract class WorkerTask implements Runnable {
         return stopping.get();
     }
 
+    protected boolean isStopped() {
+        return !running.get();
+    }
+
     private void doClose() {
         try {
             close();
         } catch (Throwable t) {
-            log.error("Unhandled exception in task shutdown {}", id, t);
+            log.error("Task {} threw an uncaught and unrecoverable exception during shutdown", id, t);
+            throw t;
         } finally {
             running.set(false);
             shutdownLatch.countDown();
         }
     }
 
-    @Override
-    public void run() {
+    private void doRun() {
         if (!this.running.compareAndSet(false, true))
             throw new IllegalStateException("The task cannot be started while still running");
 
@@ -105,12 +125,27 @@ abstract class WorkerTask implements Runnable {
             if (stopping.get())
                 return;
 
+            lifecycleListener.onStartup(id);
             execute();
         } catch (Throwable t) {
-            log.error("Unhandled exception in task {}", id, t);
+            log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
+            log.error("Task is being killed and will not recover until manually restarted");
+            throw t;
         } finally {
             doClose();
         }
     }
 
+    @Override
+    public void run() {
+        try {
+            doRun();
+            if (!cancelled.get())
+                lifecycleListener.onShutdown(id);
+        } catch (Throwable t) {
+            if (!cancelled.get())
+                lifecycleListener.onFailure(id, t);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8b0525b..83ed714 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -17,17 +17,16 @@
 
 package org.apache.kafka.connect.runtime.distributed;
 
-import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
@@ -35,6 +34,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -79,7 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  *     (and therefore, also for creating, destroy, and scaling up/down connectors).
  * </p>
  */
-public class DistributedHerder implements Herder, Runnable {
+public class DistributedHerder extends AbstractHerder implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
 
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
@@ -108,15 +108,29 @@ public class DistributedHerder implements Herder, Runnable {
     // needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
     private Set<String> connectorConfigUpdates = new HashSet<>();
     private boolean needsReconfigRebalance;
+    private volatile int generation;
 
     private final ExecutorService forwardRequestExecutor;
 
-    public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
-        this(config, worker, null, null, restUrl, new SystemTime());
+    public DistributedHerder(DistributedConfig config,
+                             Time time,
+                             Worker worker,
+                             StatusBackingStore statusBackingStore,
+                             String restUrl) {
+        this(config, worker.workerId(), worker, statusBackingStore, null, null, restUrl, time);
     }
 
-    // public for testing
-    public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) {
+    // visible for testing
+    DistributedHerder(DistributedConfig config,
+                      String workerId,
+                      Worker worker,
+                      StatusBackingStore statusBackingStore,
+                      KafkaConfigStorage configStorage,
+                      WorkerGroupMember member,
+                      String restUrl,
+                      Time time) {
+        super(statusBackingStore, workerId);
+
         this.worker = worker;
         if (configStorage != null) {
             // For testing. Assume configuration has already been performed
@@ -131,7 +145,7 @@ public class DistributedHerder implements Herder, Runnable {
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
 
-        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener());
+        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener(), time);
         stopping = new AtomicBoolean(false);
 
         rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
@@ -146,11 +160,25 @@ public class DistributedHerder implements Herder, Runnable {
         thread.start();
     }
 
+    @Override
+    protected void startServices() {
+        super.startServices();
+        configStorage.start();
+    }
+
+    @Override
+    protected void stopServices() {
+        super.stopServices();
+        if (configStorage != null)
+            configStorage.stop();
+    }
+
+    @Override
     public void run() {
         try {
             log.info("Herder starting");
 
-            configStorage.start();
+            startServices();
 
             log.info("Herder started");
 
@@ -282,13 +310,10 @@ public class DistributedHerder implements Herder, Runnable {
                     log.error("Failed to shut down connector " + connName, t);
                 }
             }
-            for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
-                try {
-                    worker.stopTask(taskId);
-                } catch (Throwable t) {
-                    log.error("Failed to shut down task " + taskId, t);
-                }
-            }
+
+            Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
+            worker.stopTasks(tasks);
+            worker.awaitStopTasks(tasks);
 
             member.stop();
 
@@ -299,8 +324,7 @@ public class DistributedHerder implements Herder, Runnable {
                 request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
             }
 
-            if (configStorage != null)
-                configStorage.stop();
+            stopServices();
         }
     }
 
@@ -388,7 +412,7 @@ public class DistributedHerder implements Herder, Runnable {
     }
 
     @Override
-    public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace,
+    public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
                                    final Callback<Created<ConnectorInfo>> callback) {
         final Map<String, String> connConfig;
         if (config == null) {
@@ -515,6 +539,10 @@ public class DistributedHerder implements Herder, Runnable {
         );
     }
 
+    @Override
+    public int generation() {
+        return generation;
+    }
 
     // Should only be called from work thread, so synchronization should not be needed
     private boolean isLeader() {
@@ -649,7 +677,7 @@ public class DistributedHerder implements Herder, Runnable {
                 log.info("Starting task {}", taskId);
                 Map<String, String> configs = configState.taskConfig(taskId);
                 TaskConfig taskConfig = new TaskConfig(configs);
-                worker.addTask(taskId, taskConfig);
+                worker.startTask(taskId, taskConfig, this);
             } catch (ConfigException e) {
                 log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
                         "configuration. This task will not execute until reconfigured.", e);
@@ -666,7 +694,7 @@ public class DistributedHerder implements Herder, Runnable {
         ConnectorConfig connConfig = new ConnectorConfig(configs);
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
-        worker.addConnector(connConfig, ctx);
+        worker.startConnector(connConfig, ctx, this);
 
         // Immediately request configuration since this could be a brand new connector. However, also only update those
         // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
@@ -816,7 +844,7 @@ public class DistributedHerder implements Herder, Runnable {
                     callback.onCompletion(error, null);
             }
         };
-    };
+    }
 
 
     // Config callbacks are triggered from the KafkaConfigStorage thread
@@ -853,11 +881,22 @@ public class DistributedHerder implements Herder, Runnable {
         };
     }
 
+    private void updateDeletedConnectorStatus() {
+        ClusterConfigState snapshot = configStorage.snapshot();
+        Set<String> connectors = snapshot.connectors();
+        for (String connector : statusBackingStore.connectors()) {
+            if (!connectors.contains(connector)) {
+                log.debug("Cleaning status information for connector {}", connector);
+                onDeletion(connector);
+            }
+        }
+    }
+
     // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
     private WorkerRebalanceListener rebalanceListener() {
         return new WorkerRebalanceListener() {
             @Override
-            public void onAssigned(ConnectProtocol.Assignment assignment) {
+            public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
                 // This callback just logs the info and saves it. The actual response is handled in the main loop, which
                 // ensures the group member's logic for rebalancing can complete, potentially long-running steps to
                 // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
@@ -866,8 +905,16 @@ public class DistributedHerder implements Herder, Runnable {
                 log.info("Joined group and got assignment: {}", assignment);
                 synchronized (DistributedHerder.this) {
                     DistributedHerder.this.assignment = assignment;
+                    DistributedHerder.this.generation = generation;
                     rebalanceResolved = false;
                 }
+
+                // Delete the statuses of all connectors removed prior to the start of this reblaance. This has to
+                // be done after the rebalance completes to avoid race conditions as the previous generation attempts
+                // to change the state to UNASSIGNED after tasks have been stopped.
+                if (isLeader())
+                    updateDeletedConnectorStatus();
+
                 // We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
                 // sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
                 // main thread.
@@ -890,18 +937,24 @@ public class DistributedHerder implements Herder, Runnable {
                     // unnecessary repeated connections to the source/sink system.
                     for (String connectorName : connectors)
                         worker.stopConnector(connectorName);
+
                     // TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
                     // stopping them then state could continue to be reused when the task remains on this worker. For example,
                     // this would avoid having to close a connection and then reopen it when the task is assigned back to this
                     // worker again.
-                    for (ConnectorTaskId taskId : tasks)
-                        worker.stopTask(taskId);
+                    if (!tasks.isEmpty()) {
+                        worker.stopTasks(tasks); // trigger stop() for all tasks
+                        worker.awaitStopTasks(tasks); // await stopping tasks
+                    }
 
+                    // Ensure that all status updates have been pushed to the storage system before rebalancing.
+                    // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
+                    // completes.
+                    statusBackingStore.flush();
                     log.info("Finished stopping tasks in preparation for rebalance");
                 } else {
                     log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
                 }
-
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 79199a6..fa50fbf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -112,7 +112,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
         // up to date, try to rejoin again, leaving the group and backing off, etc.).
         rejoinRequested = false;
-        listener.onAssigned(assignmentSnapshot);
+        listener.onAssigned(assignmentSnapshot, generation);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4b24312..9f05040 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
 import org.slf4j.Logger;
@@ -67,9 +66,13 @@ public class WorkerGroupMember {
 
     private boolean stopped = false;
 
-    public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+    public WorkerGroupMember(DistributedConfig config,
+                             String restUrl,
+                             KafkaConfigStorage configStorage,
+                             WorkerRebalanceListener listener,
+                             Time time) {
         try {
-            this.time = new SystemTime();
+            this.time = time;
 
             String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
             clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
index 40f55d2..bc833e9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
@@ -29,7 +29,7 @@ public interface WorkerRebalanceListener {
      * Invoked when a new assignment is created by joining the Connect worker group. This is invoked for both successful
      * and unsuccessful assignments.
      */
-    void onAssigned(ConnectProtocol.Assignment assignment);
+    void onAssigned(ConnectProtocol.Assignment assignment, int generation);
 
     /**
      * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index a544fb0..dbac58f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -50,6 +50,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
+import java.net.URI;
 import java.net.URL;
 import java.util.List;
 import java.util.Map;
@@ -65,7 +66,6 @@ public class RestServer {
     private static final ObjectMapper JSON_SERDE = new ObjectMapper();
 
     private final WorkerConfig config;
-    private Herder herder;
     private Server jettyServer;
 
     /**
@@ -90,8 +90,6 @@ public class RestServer {
     public void start(Herder herder) {
         log.info("Starting REST server");
 
-        this.herder = herder;
-
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
@@ -151,7 +149,7 @@ public class RestServer {
      * Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
      * server, unless overrides for advertised hostname and/or port are provided via configs.
      */
-    public String advertisedUrl() {
+    public URI advertisedUrl() {
         UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
         String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
         if (advertisedHostname != null && !advertisedHostname.isEmpty())
@@ -161,10 +159,9 @@ public class RestServer {
             builder.port(advertisedPort);
         else
             builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
-        return builder.build().toString();
+        return builder.build();
     }
 
-
     /**
      * @param url               HTTP connection will be established with this url.
      * @param method            HTTP method ("GET", "POST", "PUT", etc.)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
new file mode 100644
index 0000000..179c0db
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class ConnectorStateInfo {
+
+    private final String name;
+    private final ConnectorState connector;
+    private final List<TaskState> tasks;
+
+    @JsonCreator
+    public ConnectorStateInfo(@JsonProperty("name") String name,
+                              @JsonProperty("connector") ConnectorState connector,
+                              @JsonProperty("tasks") List<TaskState> tasks) {
+        this.name = name;
+        this.connector = connector;
+        this.tasks = tasks;
+    }
+
+    @JsonProperty
+    public String name() {
+        return name;
+    }
+
+    @JsonProperty
+    public ConnectorState connector() {
+        return connector;
+    }
+
+    @JsonProperty
+    public List<TaskState> tasks() {
+        return tasks;
+    }
+
+    public abstract static class AbstractState {
+        private final String state;
+        private final String trace;
+        private final String workerId;
+
+        public AbstractState(String state, String workerId, String trace) {
+            this.state = state;
+            this.workerId = workerId;
+            this.trace = trace;
+        }
+
+        @JsonProperty
+        public String state() {
+            return state;
+        }
+
+        @JsonProperty("worker_id")
+        public String workerId() {
+            return workerId;
+        }
+
+        @JsonProperty
+        @JsonInclude(JsonInclude.Include.NON_EMPTY)
+        public String trace() {
+            return trace;
+        }
+    }
+
+    public static class ConnectorState extends AbstractState {
+        public ConnectorState(String state, String worker, String msg) {
+            super(state, worker, msg);
+        }
+    }
+
+    public static class TaskState extends AbstractState implements Comparable<TaskState> {
+        private final int id;
+
+        public TaskState(int id, String state, String worker, String msg) {
+            super(state, worker, msg);
+            this.id = id;
+        }
+
+        @JsonProperty
+        public int id() {
+            return id;
+        }
+
+        @Override
+        public int compareTo(TaskState that) {
+            return Integer.compare(this.id, that.id);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index c95b723..d0d940b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -23,9 +23,11 @@ import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
 import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -98,8 +100,7 @@ public class ConnectorsResource {
     public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
         herder.connectorInfo(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
-        });
+        return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null);
     }
 
     @GET
@@ -107,8 +108,13 @@ public class ConnectorsResource {
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
         herder.connectorConfig(connector, cb);
-        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
-        });
+        return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null);
+    }
+
+    @GET
+    @Path("/{connector}/status")
+    public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
+        return herder.connectorStatus(connector);
     }
 
     @PUT
@@ -145,6 +151,13 @@ public class ConnectorsResource {
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
     }
 
+    @GET
+    @Path("/{connector}/tasks/{task}/status")
+    public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String connector,
+                                                      @PathParam("task") Integer task) throws Throwable {
+        return herder.taskStatus(new ConnectorTaskId(connector, task));
+    }
+
     @DELETE
     @Path("/{connector}")
     public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 89847ab..707470f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -20,13 +20,15 @@ package org.apache.kafka.connect.runtime.standalone;
 import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
@@ -38,23 +40,33 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 /**
  * Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
  */
-public class StandaloneHerder implements Herder {
+public class StandaloneHerder extends AbstractHerder {
     private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
 
     private final Worker worker;
     private HashMap<String, ConnectorState> connectors = new HashMap<>();
 
     public StandaloneHerder(Worker worker) {
+        this(worker.workerId(), worker, new MemoryStatusBackingStore());
+    }
+
+    // visible for testing
+    StandaloneHerder(String workerId,
+                     Worker worker,
+                     StatusBackingStore statusBackingStore) {
+        super(statusBackingStore, workerId);
         this.worker = worker;
     }
 
     public synchronized void start() {
         log.info("Herder starting");
+        startServices();
         log.info("Herder started");
     }
 
@@ -78,6 +90,11 @@ public class StandaloneHerder implements Herder {
     }
 
     @Override
+    public int generation() {
+        return 0;
+    }
+
+    @Override
     public synchronized void connectors(Callback<Collection<String>> callback) {
         callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
     }
@@ -131,8 +148,10 @@ public class StandaloneHerder implements Herder {
                 if (config == null) // Deletion, kill tasks as well
                     removeConnectorTasks(connName);
                 worker.stopConnector(connName);
-                if (config == null)
+                if (config == null) {
                     connectors.remove(connName);
+                    onDeletion(connName);
+                }
             } else {
                 if (config == null) {
                     // Deletion, must already exist
@@ -194,7 +213,7 @@ public class StandaloneHerder implements Herder {
         ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         ConnectorState state = connectors.get(connName);
-        worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+        worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this);
         if (state == null) {
             connectors.put(connName, new ConnectorState(connectorProps, connConfig));
         } else {
@@ -219,7 +238,7 @@ public class StandaloneHerder implements Herder {
             ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
             TaskConfig config = new TaskConfig(taskConfigMap);
             try {
-                worker.addTask(taskId, config);
+                worker.startTask(taskId, config, this);
             } catch (Throwable e) {
                 log.error("Failed to add task {}: ", taskId, e);
                 // Swallow this so we can continue updating the rest of the tasks
@@ -230,19 +249,21 @@ public class StandaloneHerder implements Herder {
         }
     }
 
+    private Set<ConnectorTaskId> tasksFor(ConnectorState state) {
+        Set<ConnectorTaskId> tasks = new HashSet<>();
+        for (int i = 0; i < state.taskConfigs.size(); i++)
+            tasks.add(new ConnectorTaskId(state.name, i));
+        return tasks;
+    }
+
     private void removeConnectorTasks(String connName) {
         ConnectorState state = connectors.get(connName);
-        for (int i = 0; i < state.taskConfigs.size(); i++) {
-            ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
-            try {
-                worker.stopTask(taskId);
-            } catch (ConnectException e) {
-                log.error("Failed to stop task {}: ", taskId, e);
-                // Swallow this so we can continue stopping the rest of the tasks
-                // FIXME: Forcibly kill the task?
-            }
+        Set<ConnectorTaskId> tasks = tasksFor(state);
+        if (!tasks.isEmpty()) {
+            worker.stopTasks(tasks);
+            worker.awaitStopTasks(tasks);
+            state.taskConfigs = new ArrayList<>();
         }
-        state.taskConfigs = new ArrayList<>();
     }
 
     private void updateConnectorTasks(String connName) {


[2/3] kafka git commit: KAFKA-3093: Add Connect status tracking API

Posted by ew...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 4b60131..9bd191e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -20,6 +20,10 @@ package org.apache.kafka.connect.storage;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema;
@@ -219,15 +223,14 @@ public class KafkaConfigStorage {
 
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
         configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 65bd9d0..dfb8c51 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.util.Callback;
@@ -66,15 +68,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
 
         Map<String, Object> producerProps = new HashMap<>();
         producerProps.putAll(configs);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
-        producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
 
         Map<String, Object> consumerProps = new HashMap<>();
         consumerProps.putAll(configs);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
         offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
new file mode 100644
index 0000000..948a325
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * StatusBackingStore implementation which uses a compacted topic for storage
+ * of connector and task status information. When a state change is observed,
+ * the new state is written to the compacted topic. The new state will not be
+ * visible until it has been read back from the topic.
+ *
+ * In spite of their names, the putSafe() methods cannot guarantee the safety
+ * of the write (since Kafka itself cannot provide such guarantees currently),
+ * but it can avoid specific unsafe conditions. In particular, we putSafe()
+ * allows writes in the following conditions:
+ *
+ * 3) It is (probably) safe to overwrite the state if there is no previous
+ *    value.
+ * 1) It is (probably) safe to overwrite the state if the previous value was
+ *    set by a worker with the same workerId.
+ * 2) It is (probably) safe to overwrite the previous state if the current
+ *    generation is higher than the previous .
+ *
+ * Basically all these conditions do is reduce the window for conflicts. They
+ * obviously cannot take into account in-flight requests.
+ *
+ */
+public class KafkaStatusBackingStore implements StatusBackingStore {
+    private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
+
+    public static final String STATUS_TOPIC_CONFIG = "status.storage.topic";
+
+    private static final String TASK_STATUS_PREFIX = "status-task-";
+    private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
+
+    public static final String STATE_KEY_NAME = "state";
+    public static final String TRACE_KEY_NAME = "trace";
+    public static final String WORKER_ID_KEY_NAME = "worker_id";
+    public static final String GENERATION_KEY_NAME = "generation";
+
+    private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
+            .field(STATE_KEY_NAME, Schema.STRING_SCHEMA)
+            .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
+            .field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA)
+            .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA)
+            .build();
+
+    private final Time time;
+    private final Converter converter;
+    private final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
+    private final Map<String, CacheEntry<ConnectorStatus>> connectors;
+
+    private String topic;
+    private KafkaBasedLog<String, byte[]> kafkaLog;
+    private int generation;
+
+    public KafkaStatusBackingStore(Time time, Converter converter) {
+        this.time = time;
+        this.converter = converter;
+        this.tasks = new Table<>();
+        this.connectors = new HashMap<>();
+    }
+
+    // visible for testing
+    KafkaStatusBackingStore(Time time, Converter converter, String topic, KafkaBasedLog<String, byte[]> kafkaLog) {
+        this(time, converter);
+        this.kafkaLog = kafkaLog;
+        this.topic = topic;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        if (configs.get(STATUS_TOPIC_CONFIG) == null)
+            throw new ConnectException("Must specify topic for connector status.");
+        this.topic = (String) configs.get(STATUS_TOPIC_CONFIG);
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.putAll(configs);
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+        producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+
+        Map<String, Object> consumerProps = new HashMap<>();
+        consumerProps.putAll(configs);
+        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+        Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
+            @Override
+            public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
+                read(record);
+            }
+        };
+        this.kafkaLog = new KafkaBasedLog<>(topic, producerProps, consumerProps, readCallback, time);
+    }
+
+    @Override
+    public void start() {
+        kafkaLog.start();
+
+        // read to the end on startup to ensure that api requests see the most recent states
+        kafkaLog.readToEnd();
+    }
+
+    @Override
+    public void stop() {
+        kafkaLog.stop();
+    }
+
+    @Override
+    public void put(final ConnectorStatus status) {
+        sendConnectorStatus(status, false);
+    }
+
+    @Override
+    public void putSafe(final ConnectorStatus status) {
+        sendConnectorStatus(status, true);
+    }
+
+    @Override
+    public void put(final TaskStatus status) {
+        sendTaskStatus(status, false);
+    }
+
+    @Override
+    public void putSafe(final TaskStatus status) {
+        sendTaskStatus(status, true);
+    }
+
+    @Override
+    public void flush() {
+        kafkaLog.flush();
+    }
+
+    private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
+        String connector  = status.id();
+        CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+        String key = CONNECTOR_STATUS_PREFIX + connector;
+        send(key, status, entry, safeWrite);
+    }
+
+    private void sendTaskStatus(final TaskStatus status, boolean safeWrite) {
+        ConnectorTaskId taskId = status.id();
+        CacheEntry<TaskStatus> entry = getOrAdd(taskId);
+        String key = TASK_STATUS_PREFIX + taskId.connector() + "-" + taskId.task();
+        send(key, status, entry, safeWrite);
+    }
+
+    private <V extends AbstractStatus> void send(final String key,
+                                                 final V status,
+                                                 final CacheEntry<V> entry,
+                                                 final boolean safeWrite) {
+        final int sequence;
+        synchronized (this) {
+            this.generation = status.generation();
+            if (safeWrite && !entry.canWrite(status))
+                return;
+            sequence = entry.increment();
+        }
+
+        final byte[] value = status.state() == ConnectorStatus.State.DESTROYED ? null : serialize(status);
+
+        kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
+            @Override
+            public void onCompletion(RecordMetadata metadata, Exception exception) {
+                if (exception != null) {
+                    if (exception instanceof RetriableException) {
+                        synchronized (this) {
+                            if (entry.isDeleted()
+                                    || status.generation() != generation
+                                    || (safeWrite && !entry.canWrite(status, sequence)))
+                                return;
+                        }
+                        kafkaLog.send(key, value, this);
+                    } else {
+                        log.error("Failed to write status update", exception);
+                    }
+                }
+            }
+        });
+    }
+
+    private synchronized CacheEntry<ConnectorStatus> getOrAdd(String connector) {
+        CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+        if (entry == null) {
+            entry = new CacheEntry<>();
+            connectors.put(connector, entry);
+        }
+        return entry;
+    }
+
+    private synchronized void remove(String connector) {
+        CacheEntry<ConnectorStatus> removed = connectors.remove(connector);
+        if (removed != null)
+            removed.delete();
+
+        Map<Integer, CacheEntry<TaskStatus>> tasks = this.tasks.remove(connector);
+        if (tasks != null) {
+            for (CacheEntry<TaskStatus> taskEntry : tasks.values())
+                taskEntry.delete();
+        }
+    }
+
+    private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId task) {
+        CacheEntry<TaskStatus> entry = tasks.get(task.connector(), task.task());
+        if (entry == null) {
+            entry = new CacheEntry<>();
+            tasks.put(task.connector(), task.task(), entry);
+        }
+        return entry;
+    }
+
+    private synchronized void remove(ConnectorTaskId id) {
+        CacheEntry<TaskStatus> removed = tasks.remove(id.connector(), id.task());
+        if (removed != null)
+            removed.delete();
+    }
+
+    @Override
+    public synchronized TaskStatus get(ConnectorTaskId id) {
+        CacheEntry<TaskStatus> entry = tasks.get(id.connector(), id.task());
+        return entry == null ? null : entry.get();
+    }
+
+    @Override
+    public synchronized ConnectorStatus get(String connector) {
+        CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+        return entry == null ? null : entry.get();
+    }
+
+    @Override
+    public synchronized Collection<TaskStatus> getAll(String connector) {
+        List<TaskStatus> res = new ArrayList<>();
+        for (CacheEntry<TaskStatus> statusEntry : tasks.row(connector).values()) {
+            TaskStatus status = statusEntry.get();
+            if (status != null)
+                res.add(status);
+        }
+        return res;
+    }
+
+    @Override
+    public synchronized Set<String> connectors() {
+        return new HashSet<>(connectors.keySet());
+    }
+
+    private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
+        try {
+            SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+            if (!(schemaAndValue.value() instanceof Map)) {
+                log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+                return null;
+            }
+
+            Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+            TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+            String trace = (String) statusMap.get(TRACE_KEY_NAME);
+            String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+            int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+            return new ConnectorStatus(connector, state, trace, workerUrl, generation);
+        } catch (Exception e) {
+            log.error("Failed to deserialize connector status", e);
+            return null;
+        }
+    }
+
+    private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
+        try {
+            SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+            if (!(schemaAndValue.value() instanceof Map)) {
+                log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+                return null;
+            }
+            Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+            TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+            String trace = (String) statusMap.get(TRACE_KEY_NAME);
+            String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+            int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+            return new TaskStatus(taskId, state, workerUrl, generation, trace);
+        } catch (Exception e) {
+            log.error("Failed to deserialize task status", e);
+            return null;
+        }
+    }
+
+    private byte[] serialize(AbstractStatus status) {
+        Struct struct = new Struct(STATUS_SCHEMA_V0);
+        struct.put(STATE_KEY_NAME, status.state().name());
+        if (status.trace() != null)
+            struct.put(TRACE_KEY_NAME, status.trace());
+        struct.put(WORKER_ID_KEY_NAME, status.workerId());
+        struct.put(GENERATION_KEY_NAME, status.generation());
+        return converter.fromConnectData(topic, STATUS_SCHEMA_V0, struct);
+    }
+
+    private String parseConnectorStatusKey(String key) {
+        return key.substring(CONNECTOR_STATUS_PREFIX.length());
+    }
+
+    private ConnectorTaskId parseConnectorTaskId(String key) {
+        String[] parts = key.split("-");
+        if (parts.length < 4) return null;
+
+        try {
+            int taskNum = Integer.parseInt(parts[parts.length - 1]);
+            String connectorName = Utils.join(Arrays.copyOfRange(parts, 2, parts.length - 1), "-");
+            return new ConnectorTaskId(connectorName, taskNum);
+        } catch (NumberFormatException e) {
+            log.warn("Invalid task status key {}", key);
+            return null;
+        }
+    }
+
+    private void readConnectorStatus(String key, byte[] value) {
+        String connector = parseConnectorStatusKey(key);
+        if (connector == null || connector.isEmpty()) {
+            log.warn("Discarding record with invalid connector status key {}", key);
+            return;
+        }
+
+        if (value == null) {
+            log.trace("Removing status for connector {}", connector);
+            remove(connector);
+            return;
+        }
+
+        ConnectorStatus status = parseConnectorStatus(connector, value);
+        if (status == null)
+            return;
+
+        synchronized (KafkaStatusBackingStore.this) {
+            log.trace("Received connector {} status update {}", connector, status);
+            CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+            entry.put(status);
+        }
+    }
+
+    private void readTaskStatus(String key, byte[] value) {
+        ConnectorTaskId id = parseConnectorTaskId(key);
+        if (id == null) {
+            log.warn("Discarding record with invalid task status key {}", key);
+            return;
+        }
+
+        if (value == null) {
+            log.trace("Removing task status for {}", id);
+            remove(id);
+            return;
+        }
+
+        TaskStatus status = parseTaskStatus(id, value);
+        if (status == null) {
+            log.warn("Failed to parse task status with key {}", key);
+            return;
+        }
+
+        synchronized (KafkaStatusBackingStore.this) {
+            log.trace("Received task {} status update {}", id, status);
+            CacheEntry<TaskStatus> entry = getOrAdd(id);
+            entry.put(status);
+        }
+    }
+
+    // visible for testing
+    void read(ConsumerRecord<String, byte[]> record) {
+        String key = record.key();
+        if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
+            readConnectorStatus(key, record.value());
+        } else if (key.startsWith(TASK_STATUS_PREFIX)) {
+            readTaskStatus(key, record.value());
+        } else {
+            log.warn("Discarding record with invalid key {}", key);
+        }
+    }
+
+    private static class CacheEntry<T extends AbstractStatus> {
+        private T value = null;
+        private int sequence = 0;
+        private boolean deleted = false;
+
+        public int increment() {
+            return ++sequence;
+        }
+
+        public void put(T value) {
+            this.value = value;
+        }
+
+        public T get() {
+            return value;
+        }
+
+        public void delete() {
+            this.deleted = true;
+        }
+
+        public boolean isDeleted() {
+            return deleted;
+        }
+
+        public boolean canWrite(T status) {
+            return value != null &&
+                    (value.workerId().equals(status.workerId())
+                    || value.generation() <= status.generation());
+        }
+
+        public boolean canWrite(T status, int sequence) {
+            return canWrite(status) && this.sequence == sequence;
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
new file mode 100644
index 0000000..96b235b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.Table;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MemoryStatusBackingStore implements StatusBackingStore {
+    private final Table<String, Integer, TaskStatus> tasks;
+    private final Map<String, ConnectorStatus> connectors;
+
+    public MemoryStatusBackingStore() {
+        this.tasks = new Table<>();
+        this.connectors = new HashMap<>();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+
+    }
+
+    @Override
+    public void start() {
+
+    }
+
+    @Override
+    public void stop() {
+
+    }
+
+    @Override
+    public synchronized void put(ConnectorStatus status) {
+        if (status.state() == ConnectorStatus.State.DESTROYED)
+            connectors.remove(status.id());
+        else
+            connectors.put(status.id(), status);
+    }
+
+    @Override
+    public synchronized void putSafe(ConnectorStatus status) {
+        put(status);
+    }
+
+    @Override
+    public synchronized void put(TaskStatus status) {
+        if (status.state() == TaskStatus.State.DESTROYED)
+            tasks.remove(status.id().connector(), status.id().task());
+        else
+            tasks.put(status.id().connector(), status.id().task(), status);
+    }
+
+    @Override
+    public synchronized void putSafe(TaskStatus status) {
+        put(status);
+    }
+
+    @Override
+    public synchronized TaskStatus get(ConnectorTaskId id) {
+        return tasks.get(id.connector(), id.task());
+    }
+
+    @Override
+    public synchronized ConnectorStatus get(String connector) {
+        return connectors.get(connector);
+    }
+
+    @Override
+    public synchronized Collection<TaskStatus> getAll(String connector) {
+        return new HashSet<>(tasks.row(connector).values());
+    }
+
+    @Override
+    public synchronized Set<String> connectors() {
+        return new HashSet<>(connectors.keySet());
+    }
+
+    @Override
+    public void flush() {
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
new file mode 100644
index 0000000..6464f89
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface StatusBackingStore extends Configurable {
+
+    /**
+     * Start dependent services (if needed)
+     */
+    void start();
+
+    /**
+     * Stop dependent services (if needed)
+     */
+    void stop();
+
+    /**
+     * Set the state of the connector to the given value.
+     * @param status the status of the connector
+     */
+    void put(ConnectorStatus status);
+
+    /**
+     * Safely set the state of the connector to the given value. What is
+     * considered "safe" depends on the implementation, but basically it
+     * means that the store can provide higher assurance that another worker
+     * hasn't concurrently written any conflicting data.
+     * @param status the status of the connector
+     */
+    void putSafe(ConnectorStatus status);
+
+    /**
+     * Set the state of the connector to the given value.
+     * @param status the status of the task
+     */
+    void put(TaskStatus status);
+
+    /**
+     * Safely set the state of the task to the given value. What is
+     * considered "safe" depends on the implementation, but basically it
+     * means that the store can provide higher assurance that another worker
+     * hasn't concurrently written any conflicting data.
+     * @param status the status of the task
+     */
+    void putSafe(TaskStatus status);
+
+    /**
+     * Get the current state of the task.
+     * @param id the id of the task
+     * @return the state or null if there is none
+     */
+    TaskStatus get(ConnectorTaskId id);
+
+    /**
+     * Get the current state of the connector.
+     * @param connector the connector name
+     * @return the state or null if there is none
+     */
+    ConnectorStatus get(String connector);
+
+    /**
+     * Get the states of all tasks for the given connector.
+     * @param connector the connector name
+     * @return a map from task ids to their respective status
+     */
+    Collection<TaskStatus> getAll(String connector);
+
+    /**
+     * Get all cached connectors.
+     * @return the set of connector names
+     */
+    Set<String> connectors();
+
+    /**
+     * Flush any pending writes
+     */
+    void flush();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index c82645c..5ab60cd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -99,8 +99,11 @@ public class KafkaBasedLog<K, V> {
      * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
      * @param time Time interface
      */
-    public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
-                         Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+    public KafkaBasedLog(String topic,
+                         Map<String, Object> producerConfigs,
+                         Map<String, Object> consumerConfigs,
+                         Callback<ConsumerRecord<K, V>> consumedCallback,
+                         Time time) {
         this.topic = topic;
         this.producerConfigs = producerConfigs;
         this.consumerConfigs = consumerConfigs;
@@ -140,9 +143,9 @@ public class KafkaBasedLog<K, V> {
         thread = new WorkThread();
         thread.start();
 
-        log.info("Finished reading KafakBasedLog for topic " + topic);
+        log.info("Finished reading KafkaBasedLog for topic " + topic);
 
-        log.info("Started KafakBasedLog for topic " + topic);
+        log.info("Started KafkaBasedLog for topic " + topic);
     }
 
     public void stop() {
@@ -198,6 +201,13 @@ public class KafkaBasedLog<K, V> {
     }
 
     /**
+     * Flush the underlying producer to ensure that all pending writes have been sent.
+     */
+    public void flush() {
+        producer.flush();
+    }
+
+    /**
      * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
      * @return the future associated with the operation
      */
@@ -219,12 +229,18 @@ public class KafkaBasedLog<K, V> {
     private Producer<K, V> createProducer() {
         // Always require producer acks to all to ensure durable writes
         producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+
+        // Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
+        producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
         return new KafkaProducer<>(producerConfigs);
     }
 
     private Consumer<K, V> createConsumer() {
         // Always force reset to the beginning of the log since this class wants to consume all available log data
         consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        // Turn off autocommit since we always want to consume the full log
+        consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
         return new KafkaConsumer<>(consumerConfigs);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
new file mode 100644
index 0000000..f36d3e5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Table<R, C, V> {
+
+    private Map<R, Map<C, V>> table = new HashMap<>();
+
+    public V put(R row, C column, V value) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null) {
+            columns = new HashMap<>();
+            table.put(row, columns);
+        }
+        return columns.put(column, value);
+    }
+
+    public V get(R row, C column) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null)
+            return null;
+        return columns.get(column);
+    }
+
+    public Map<C, V> remove(R row) {
+        return table.remove(row);
+    }
+
+    public V remove(R row, C column) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null)
+            return null;
+
+        V value = columns.remove(column);
+        if (columns.isEmpty())
+            table.remove(row);
+        return value;
+    }
+
+    public Map<C, V> row(R row) {
+        Map<C, V> columns = table.get(row);
+        if (columns == null)
+            return Collections.emptyMap();
+        return Collections.unmodifiableMap(columns);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
new file mode 100644
index 0000000..f17023c
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AbstractHerderTest extends EasyMockSupport {
+
+    @Test
+    public void connectorStatus() {
+        String workerId = "workerId";
+        String connector = "connector";
+        int generation = 5;
+        ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+
+        StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(StatusBackingStore.class, String.class)
+                .withArgs(store, workerId)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(herder.generation()).andStubReturn(generation);
+
+        EasyMock.expect(store.get(connector))
+                .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+
+        EasyMock.expect(store.getAll(connector))
+                .andReturn(Collections.singletonList(
+                        new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
+
+        replayAll();
+
+
+        ConnectorStateInfo state = herder.connectorStatus(connector);
+
+        assertEquals(connector, state.name());
+        assertEquals("RUNNING", state.connector().state());
+        assertEquals(1, state.tasks().size());
+        assertEquals(workerId, state.connector().workerId());
+
+        ConnectorStateInfo.TaskState taskState = state.tasks().get(0);
+        assertEquals(0, taskState.id());
+        assertEquals("UNASSIGNED", taskState.state());
+        assertEquals(workerId, taskState.workerId());
+
+        verifyAll();
+    }
+
+    @Test
+    public void taskStatus() {
+        ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+        String workerId = "workerId";
+
+        StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+        AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+                .withConstructor(StatusBackingStore.class, String.class)
+                .withArgs(store, workerId)
+                .addMockedMethod("generation")
+                .createMock();
+
+        EasyMock.expect(herder.generation()).andStubReturn(5);
+
+        final Capture<TaskStatus> statusCapture = EasyMock.newCapture();
+        store.putSafe(EasyMock.capture(statusCapture));
+        EasyMock.expectLastCall();
+
+        EasyMock.expect(store.get(taskId)).andAnswer(new IAnswer<TaskStatus>() {
+            @Override
+            public TaskStatus answer() throws Throwable {
+                return statusCapture.getValue();
+            }
+        });
+
+        replayAll();
+
+        herder.onFailure(taskId, new RuntimeException());
+
+        ConnectorStateInfo.TaskState taskState = herder.taskStatus(taskId);
+        assertEquals(workerId, taskState.workerId());
+        assertEquals("FAILED", taskState.state());
+        assertEquals(0, taskState.id());
+        assertNotNull(taskState.trace());
+
+        verifyAll();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 978e3a1..6721609 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -98,6 +98,8 @@ public class WorkerSinkTaskTest {
     @Mock
     private Converter valueConverter;
     @Mock
+    private TaskStatus.Listener statusListener;
+    @Mock
     private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
 
@@ -116,7 +118,7 @@ public class WorkerSinkTaskTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
 
         recordsReturned = 0;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index e202209..77f1ed0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -97,11 +97,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
     private WorkerConfig workerConfig;
     @Mock private Converter keyConverter;
-    @Mock
-    private Converter valueConverter;
+    @Mock private Converter valueConverter;
     private WorkerSinkTask workerTask;
     @Mock private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+    @Mock private TaskStatus.Listener statusListener;
 
     private long recordsReturned;
 
@@ -120,7 +120,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
 
         recordsReturned = 0;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index c6eb0c5..8e9eb72 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.api.easymock.PowerMock;
 import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
 import org.powermock.modules.junit4.PowerMockRunner;
 import org.powermock.reflect.Whitebox;
 
@@ -90,6 +91,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Mock private OffsetStorageWriter offsetWriter;
     private WorkerSourceTask workerTask;
     @Mock private Future<RecordMetadata> sendFuture;
+    @MockStrict private TaskStatus.Listener statusListener;
 
     private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
 
@@ -113,7 +115,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private void createWorkerTask() {
-        workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, keyConverter, valueConverter, producer,
                 offsetReader, offsetWriter, config, new SystemTime());
     }
 
@@ -125,6 +127,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
         EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         final CountDownLatch pollLatch = expectPolls(10);
         // In this test, we don't flush, so nothing goes any further than the offset writer
@@ -133,6 +137,42 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        executor.submit(workerTask);
+        awaitPolls(pollLatch);
+        workerTask.stop();
+        assertEquals(true, workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testFailureInPoll() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        final CountDownLatch pollLatch = expectPolls(1);
+        RuntimeException exception = new RuntimeException();
+        EasyMock.expect(sourceTask.poll()).andThrow(exception);
+
+        statusListener.onFailure(taskId, exception);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -153,6 +193,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
         EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
@@ -164,6 +206,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -185,6 +230,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
         EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         // We'll wait for some data, then trigger a flush
         final CountDownLatch pollLatch = expectPolls(1);
@@ -194,6 +241,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         EasyMock.expectLastCall();
         expectOffsetFlush(true);
 
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
@@ -269,6 +319,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
         sourceTask.start(EMPTY_TASK_PROPS);
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
 
         EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index f5213a6..20e3fe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Test;
 
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
 
+import static org.easymock.EasyMock.expectLastCall;
 import static org.easymock.EasyMock.partialMockBuilder;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
@@ -33,22 +36,32 @@ public class WorkerTaskTest {
 
     @Test
     public void standardStartup() {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class)
-                .withArgs(new ConnectorTaskId("foo", 0))
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+                .withArgs(taskId, statusListener)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
 
         workerTask.initialize(EMPTY_TASK_PROPS);
-        EasyMock.expectLastCall();
+        expectLastCall();
 
         workerTask.execute();
-        EasyMock.expectLastCall();
+        expectLastCall();
+
+        statusListener.onStartup(taskId);
+        expectLastCall();
 
         workerTask.close();
-        EasyMock.expectLastCall();
+        expectLastCall();
+
+        statusListener.onShutdown(taskId);
+        expectLastCall();
 
         replay(workerTask);
 
@@ -62,9 +75,13 @@ public class WorkerTaskTest {
 
     @Test
     public void stopBeforeStarting() {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class)
-                .withArgs(new ConnectorTaskId("foo", 0))
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+                .withArgs(taskId, statusListener)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -88,5 +105,62 @@ public class WorkerTaskTest {
         verify(workerTask);
     }
 
+    @Test
+    public void cancelBeforeStopping() throws Exception {
+        ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
+        WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+                .withArgs(taskId, statusListener)
+                .addMockedMethod("initialize")
+                .addMockedMethod("execute")
+                .addMockedMethod("close")
+                .createStrictMock();
+
+        final CountDownLatch stopped = new CountDownLatch(1);
+        final Thread thread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    stopped.await();
+                } catch (Exception e) {
+                }
+            }
+        };
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        EasyMock.expectLastCall();
+
+        workerTask.execute();
+        expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                thread.start();
+                return null;
+            }
+        });
+
+        statusListener.onStartup(taskId);
+        expectLastCall();
+
+        workerTask.close();
+        expectLastCall();
+
+        // there should be no call to onShutdown()
+
+        replay(workerTask);
+
+        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.run();
+
+        workerTask.stop();
+        workerTask.cancel();
+        stopped.countDown();
+        thread.join();
+
+        verify(workerTask);
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f33347a..0ca405e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.kafka.connect.runtime;
 
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
@@ -60,10 +60,13 @@ public class WorkerTest extends ThreadedTest {
 
     private static final String CONNECTOR_ID = "test-connector";
     private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+    private static final String WORKER_ID = "localhost:8083";
 
     private WorkerConfig config;
     private Worker worker;
     private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
+    private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
+    private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
 
     @Before
     public void setup() {
@@ -80,17 +83,14 @@ public class WorkerTest extends ThreadedTest {
     }
 
     @Test
-    public void testAddRemoveConnector() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+    public void testStartAndStopConnector() throws Exception {
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -105,24 +105,29 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Remove
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
+        expectStopStorage();
+
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.addConnector(config, ctx);
+            worker.startConnector(config, ctx, connectorStatusListener);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -137,16 +142,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByAlias() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -161,22 +163,26 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Remove
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
-        PowerMock.replayAll();
+        expectStopStorage();
 
+        PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -189,16 +195,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddConnectorByShortAlias() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -213,21 +216,26 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Remove
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
+        expectStopStorage();
+
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -241,14 +249,11 @@ public class WorkerTest extends ThreadedTest {
 
     @Test(expected = ConnectException.class)
     public void testStopInvalidConnector() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         worker.stopConnector(CONNECTOR_ID);
@@ -256,16 +261,13 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testReconfigureConnectorTasks() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         Connector connector = PowerMock.createMock(Connector.class);
         ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
         PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
 
@@ -280,6 +282,9 @@ public class WorkerTest extends ThreadedTest {
         connector.start(props);
         EasyMock.expectLastCall();
 
+        connectorStatusListener.onStartup(CONNECTOR_ID);
+        EasyMock.expectLastCall();
+
         // Reconfigure
         EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
         Map<String, String> taskProps = new HashMap<>();
@@ -290,20 +295,22 @@ public class WorkerTest extends ThreadedTest {
         connector.stop();
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
+        connectorStatusListener.onShutdown(CONNECTOR_ID);
         EasyMock.expectLastCall();
 
+        expectStopStorage();
+
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.addConnector(config, ctx);
+        worker.startConnector(config, ctx, connectorStatusListener);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.addConnector(config, ctx);
+            worker.startConnector(config, ctx, connectorStatusListener);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -327,23 +334,21 @@ public class WorkerTest extends ThreadedTest {
 
     @Test
     public void testAddRemoveTask() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
-
-        ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+        expectStartStorage();
 
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+        EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
         PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
 
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+                EasyMock.eq(task),
+                EasyMock.anyObject(TaskStatus.Listener.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -356,6 +361,8 @@ public class WorkerTest extends ThreadedTest {
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.initialize(origProps);
         EasyMock.expectLastCall();
+        workerTask.run();
+        EasyMock.expectLastCall();
 
         // Remove
         workerTask.stop();
@@ -363,17 +370,16 @@ public class WorkerTest extends ThreadedTest {
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
+        expectStopStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.addTask(taskId, new TaskConfig(origProps));
-        assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
-        worker.stopTask(taskId);
+        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+        assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
+        worker.stopAndAwaitTask(TASK_ID);
         assertEquals(Collections.emptySet(), worker.taskIds());
         // Nothing should be left, so this should effectively be a nop
         worker.stop();
@@ -383,36 +389,33 @@ public class WorkerTest extends ThreadedTest {
 
     @Test(expected = ConnectException.class)
     public void testStopInvalidTask() {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
 
-        worker.stopTask(TASK_ID);
+        worker.stopAndAwaitTask(TASK_ID);
     }
 
     @Test
     public void testCleanupTasksOnStop() throws Exception {
-        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
-        EasyMock.expectLastCall();
-        offsetBackingStore.start();
-        EasyMock.expectLastCall();
+        expectStartStorage();
 
         // Create
         TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
         WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+        EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
-        PowerMock.mockStatic(Worker.class);
+        PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
         PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
         EasyMock.expect(task.version()).andReturn("1.0");
         
         PowerMock.expectNew(
-                WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
+                WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+                EasyMock.eq(task),
+                EasyMock.anyObject(TaskStatus.Listener.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -425,27 +428,41 @@ public class WorkerTest extends ThreadedTest {
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
         workerTask.initialize(origProps);
         EasyMock.expectLastCall();
+        workerTask.run();
+        EasyMock.expectLastCall();
 
         // Remove on Worker.stop()
         workerTask.stop();
         EasyMock.expectLastCall();
+
         EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
         // Note that in this case we *do not* commit offsets since it's an unclean shutdown
         EasyMock.expectLastCall();
 
-        offsetBackingStore.stop();
-        EasyMock.expectLastCall();
+        expectStopStorage();
 
         PowerMock.replayAll();
 
-        worker = new Worker(new MockTime(), config, offsetBackingStore);
+        worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.addTask(TASK_ID, new TaskConfig(origProps));
+        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
         worker.stop();
 
         PowerMock.verifyAll();
     }
 
+    private void expectStartStorage() {
+        offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+        EasyMock.expectLastCall();
+        offsetBackingStore.start();
+        EasyMock.expectLastCall();
+    }
+
+    private void expectStopStorage() {
+        offsetBackingStore.stop();
+        EasyMock.expectLastCall();
+    }
+
 
     /* Name here needs to be unique as we are testing the aliasing mechanism */
     private static class WorkerTestConnector extends Connector {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 76f9bc0..d439e96 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -31,6 +31,8 @@ import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
@@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue;
 public class DistributedHerderTest {
     private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
     static {
+        HERDER_CONFIG.put(KafkaStatusBackingStore.STATUS_TOPIC_CONFIG, "status-topic");
         HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
         HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
         HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
@@ -122,7 +125,10 @@ public class DistributedHerderTest {
     private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
             Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
 
+    private static final String WORKER_ID = "localhost:8083";
+
     @Mock private KafkaConfigStorage configStorage;
+    @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerGroupMember member;
     private MockTime time;
     private DistributedHerder herder;
@@ -139,11 +145,12 @@ public class DistributedHerderTest {
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
         time = new MockTime();
 
-        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
-                new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time);
+        herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
+                new DistributedConfig(HERDER_CONFIG), WORKER_ID, worker, statusBackingStore, configStorage, member, MEMBER_URL, time);
         connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
         taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
         rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
+        PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
     }
 
     @Test
@@ -152,10 +159,11 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -168,17 +176,55 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testRebalance() {
+        // Join group and get assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
+
+        // and the new assignment started
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testHaltCleansUpWorker() {
         EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
         EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
-        worker.stopTask(TASK1);
+        worker.stopTasks(Collections.singleton(TASK1));
+        PowerMock.expectLastCall();
+        worker.awaitStopTasks(Collections.singleton(TASK1));
         PowerMock.expectLastCall();
         member.stop();
         PowerMock.expectLastCall();
         configStorage.stop();
         PowerMock.expectLastCall();
+        statusBackingStore.stop();
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
@@ -242,7 +288,8 @@ public class DistributedHerderTest {
         // Start with one connector
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
@@ -286,7 +333,8 @@ public class DistributedHerderTest {
         // Performs rebalance and gets new assignment
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -312,7 +360,8 @@ public class DistributedHerderTest {
         // join
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -325,7 +374,8 @@ public class DistributedHerderTest {
         EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -360,8 +410,9 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         // Performs rebalance and gets new assignment
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
-                ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0));
-        worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
+                ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
+                Arrays.asList(TASK0));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -381,7 +432,8 @@ public class DistributedHerderTest {
         // Join group and as leader fail to do assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
-                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
         // Reading to end of log times out
         TestFuture<Void> readToEndFuture = new TestFuture<>();
         readToEndFuture.resolveOnGet(new TimeoutException());
@@ -393,10 +445,11 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
 
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -457,7 +510,8 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
-        worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
@@ -485,7 +539,8 @@ public class DistributedHerderTest {
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
         Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
-        worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
+        worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
@@ -534,13 +589,19 @@ public class DistributedHerderTest {
     }
 
 
-    private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+    private void expectRebalance(final long offset,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks) {
         expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);
     }
 
     // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
-    private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks,
-                                 final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+    private void expectRebalance(final Collection<String> revokedConnectors,
+                                 final List<ConnectorTaskId> revokedTasks,
+                                 final short error,
+                                 final long offset,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks) {
         member.ensureActive();
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
@@ -549,10 +610,30 @@ public class DistributedHerderTest {
                     rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
                 ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
                         error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
-                rebalanceListener.onAssigned(assignment);
+                rebalanceListener.onAssigned(assignment, 0);
                 return null;
             }
         });
+
+        if (revokedConnectors != null) {
+            for (String connector : revokedConnectors) {
+                worker.stopConnector(connector);
+                PowerMock.expectLastCall();
+            }
+        }
+
+        if (revokedTasks != null && !revokedTasks.isEmpty()) {
+            worker.stopTasks(revokedTasks);
+            PowerMock.expectLastCall();
+            worker.awaitStopTasks(revokedTasks);
+            PowerMock.expectLastCall();
+        }
+
+        if (revokedConnectors != null) {
+            statusBackingStore.flush();
+            PowerMock.expectLastCall();
+        }
+
         member.wakeup();
         PowerMock.expectLastCall();
     }