You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/20 23:10:21 UTC

[2/5] kafka git commit: KAFKA-2370: kafka connect pause/resume API

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
new file mode 100644
index 0000000..a5f7409
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.expectLastCall;
+
+@RunWith(EasyMockRunner.class)
+public class WorkerConnectorTest extends EasyMockSupport {
+
+    public static final String CONNECTOR = "connector";
+    public static final Map<String, String> CONFIG = new HashMap<>();
+    static {
+        CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+        CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
+    }
+    public static final ConnectorConfig CONNECTOR_CONFIG = new ConnectorConfig(CONFIG);
+
+    @Mock Connector connector;
+    @Mock ConnectorContext ctx;
+    @Mock ConnectorStatus.Listener listener;
+
+    @Test
+    public void testInitializeFailure() {
+        RuntimeException exception = new RuntimeException();
+
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall().andThrow(exception);
+
+        listener.onFailure(CONNECTOR, exception);
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testFailureIsFinalState() {
+        RuntimeException exception = new RuntimeException();
+
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall().andThrow(exception);
+
+        listener.onFailure(CONNECTOR, exception);
+        expectLastCall();
+
+        // expect no call to onStartup() after failure
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartupAndShutdown() {
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall();
+
+        listener.onStartup(CONNECTOR);
+        expectLastCall();
+
+        connector.stop();
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartupAndPause() {
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall();
+
+        listener.onStartup(CONNECTOR);
+        expectLastCall();
+
+        connector.stop();
+        expectLastCall();
+
+        listener.onPause(CONNECTOR);
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.transitionTo(TargetState.PAUSED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testOnResume() {
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        listener.onPause(CONNECTOR);
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall();
+
+        listener.onResume(CONNECTOR);
+        expectLastCall();
+
+        connector.stop();
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.PAUSED);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartupPaused() {
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        // connector never gets started
+
+        listener.onPause(CONNECTOR);
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.PAUSED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testStartupFailure() {
+        RuntimeException exception = new RuntimeException();
+
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall().andThrow(exception);
+
+        listener.onFailure(CONNECTOR, exception);
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testShutdownFailure() {
+        RuntimeException exception = new RuntimeException();
+
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall();
+
+        listener.onStartup(CONNECTOR);
+        expectLastCall();
+
+        connector.stop();
+        expectLastCall().andThrow(exception);
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testTransitionStartedToStarted() {
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall();
+
+        // expect only one call to onStartup()
+        listener.onStartup(CONNECTOR);
+        expectLastCall();
+
+        connector.stop();
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    @Test
+    public void testTransitionPausedToPaused() {
+        connector.initialize(EasyMock.notNull(ConnectorContext.class));
+        expectLastCall();
+
+        connector.start(CONFIG);
+        expectLastCall();
+
+        listener.onStartup(CONNECTOR);
+        expectLastCall();
+
+        connector.stop();
+        expectLastCall();
+
+        listener.onPause(CONNECTOR);
+        expectLastCall();
+
+        listener.onShutdown(CONNECTOR);
+        expectLastCall();
+
+        replayAll();
+
+        WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+        workerConnector.initialize(CONNECTOR_CONFIG);
+        workerConnector.transitionTo(TargetState.STARTED);
+        workerConnector.transitionTo(TargetState.PAUSED);
+        workerConnector.transitionTo(TargetState.PAUSED);
+        workerConnector.shutdown();
+
+        verifyAll();
+    }
+
+    private static abstract class TestConnector extends Connector {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 7bc83de..835e30f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.data.Schema;
@@ -49,14 +50,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
+import static java.util.Arrays.asList;
 import static java.util.Collections.singleton;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -84,10 +86,12 @@ public class WorkerSinkTaskTest {
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
     static {
         TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
     }
-
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private TargetState initialState = TargetState.STARTED;
     private Time time;
     private WorkerSinkTask workerTask;
     @Mock
@@ -120,12 +124,91 @@ public class WorkerSinkTaskTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, time);
 
         recordsReturned = 0;
     }
 
     @Test
+    public void testStartPaused() throws Exception {
+        workerTask = PowerMock.createPartialMock(
+                WorkerSinkTask.class, new String[]{"createConsumer"},
+                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, time);
+
+        expectInitializeTask();
+        expectPollInitialAssignment();
+
+        Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+        EasyMock.expect(consumer.assignment()).andReturn(partitions);
+        consumer.pause(partitions);
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.poll(Long.MAX_VALUE);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        expectInitializeTask();
+        expectPollInitialAssignment();
+
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+
+        // Pause
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+        expectConsumerWakeup();
+        EasyMock.expect(consumer.assignment()).andReturn(partitions);
+        consumer.pause(partitions);
+        PowerMock.expectLastCall();
+
+        // No records returned
+        expectConsumerPoll(0);
+        sinkTask.put(Collections.<SinkRecord>emptyList());
+        EasyMock.expectLastCall();
+
+        // And unpause
+        statusListener.onResume(taskId);
+        EasyMock.expectLastCall();
+        expectConsumerWakeup();
+        EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+        consumer.resume(singleton(TOPIC_PARTITION));
+        PowerMock.expectLastCall();
+        consumer.resume(singleton(TOPIC_PARTITION2));
+        PowerMock.expectLastCall();
+
+        expectConsumerPoll(1);
+        expectConvertMessages(1);
+        sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        workerTask.initializeAndStart();
+        workerTask.poll(Long.MAX_VALUE); // initial assignment
+        workerTask.poll(Long.MAX_VALUE); // fetch some data
+        workerTask.transitionTo(TargetState.PAUSED);
+        workerTask.poll(Long.MAX_VALUE); // wakeup
+        workerTask.poll(Long.MAX_VALUE); // now paused
+        workerTask.transitionTo(TargetState.STARTED);
+        workerTask.poll(Long.MAX_VALUE); // wakeup
+        workerTask.poll(Long.MAX_VALUE); // now unpaused
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testPollRedelivery() throws Exception {
         expectInitializeTask();
         expectPollInitialAssignment();
@@ -137,7 +220,7 @@ public class WorkerSinkTaskTest {
         sinkTask.put(EasyMock.capture(records));
         EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
         // Pause
-        HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+        HashSet<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
         EasyMock.expect(consumer.assignment()).andReturn(partitions);
         consumer.pause(partitions);
         PowerMock.expectLastCall();
@@ -155,7 +238,7 @@ public class WorkerSinkTaskTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.poll(Long.MAX_VALUE);
         workerTask.poll(Long.MAX_VALUE);
@@ -174,7 +257,7 @@ public class WorkerSinkTaskTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.poll(Long.MAX_VALUE);
         try {
@@ -197,7 +280,7 @@ public class WorkerSinkTaskTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.poll(Long.MAX_VALUE);
         try {
@@ -213,7 +296,7 @@ public class WorkerSinkTaskTest {
 
     private void expectInitializeTask() throws Exception {
         PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
-        consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
+        consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
         PowerMock.expectLastCall();
 
         sinkTask.initialize(EasyMock.capture(sinkTaskContext));
@@ -223,7 +306,7 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectRebalanceRevocationError(RuntimeException e) {
-        final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
 
         sinkTask.close(new HashSet<>(partitions));
         EasyMock.expectLastCall().andThrow(e);
@@ -239,7 +322,7 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectRebalanceAssignmentError(RuntimeException e) {
-        final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
 
         sinkTask.close(new HashSet<>(partitions));
         EasyMock.expectLastCall();
@@ -268,7 +351,7 @@ public class WorkerSinkTaskTest {
     }
 
     private void expectPollInitialAssignment() {
-        final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+        final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
 
         sinkTask.open(partitions);
         EasyMock.expectLastCall();
@@ -287,6 +370,12 @@ public class WorkerSinkTaskTest {
         EasyMock.expectLastCall();
     }
 
+    private void expectConsumerWakeup() {
+        consumer.wakeup();
+        EasyMock.expectLastCall();
+        EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new WakeupException());
+    }
+
     private void expectConsumerPoll(final int numMessages) {
         EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
                 new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@@ -309,4 +398,8 @@ public class WorkerSinkTaskTest {
         EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
         EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
     }
+
+    private abstract static class TestSinkTask extends SinkTask  {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 25f0bf4..25dbff5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -89,9 +89,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
     static {
         TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
     }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+    private TargetState initialState = TargetState.STARTED;
     private Time time;
     @Mock private SinkTask sinkTask;
     private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
@@ -105,6 +108,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
     private long recordsReturned;
 
+
     @SuppressWarnings("unchecked")
     @Override
     public void setup() {
@@ -121,7 +125,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, time);
 
         recordsReturned = 0;
     }
@@ -136,7 +140,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
         // First iteration initializes partition assignment
@@ -147,7 +151,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
             workerTask.iteration();
         }
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         // Verify contents match expected values, i.e. that they were translated properly. With max
@@ -180,7 +183,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
         // Initialize partition assignment
@@ -193,7 +196,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         // Commit finishes synchronously for testing so we can check this immediately
         assertEquals(0, workerTask.commitFailures());
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         assertEquals(2, capturedRecords.getValues().size());
@@ -220,7 +222,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
         // Initialize partition assignment
@@ -233,7 +235,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         assertEquals(1, workerTask.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         PowerMock.verifyAll();
@@ -259,7 +260,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
         // Initialize partition assignment
@@ -274,7 +275,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         assertEquals(1, workerTask.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         PowerMock.verifyAll();
@@ -292,7 +292,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
         // Initialize partition assignment
@@ -306,7 +306,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         assertEquals(1, workerTask.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         PowerMock.verifyAll();
@@ -325,7 +324,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
 
         // Initialize partition assignment
@@ -342,7 +341,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         assertEquals(1, workerTask.commitFailures());
         assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         PowerMock.verifyAll();
@@ -405,13 +403,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         PowerMock.verifyAll();
@@ -449,13 +446,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         expectStopTask(3);
         PowerMock.replayAll();
 
-        workerTask.initialize(TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.initializeAndStart();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.iteration();
         workerTask.stop();
-        workerTask.awaitStop(Long.MAX_VALUE);
         workerTask.close();
 
         PowerMock.verifyAll();
@@ -597,4 +593,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         return capturedCallback;
     }
 
+    private static abstract class TestSinkTask extends SinkTask {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 3dd07a6..0d805da 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -58,6 +58,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -94,7 +95,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
 
-    private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
     private static final List<SourceRecord> RECORDS = Arrays.asList(
             new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
     );
@@ -115,8 +121,94 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private void createWorkerTask() {
-        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, keyConverter, valueConverter, producer,
-                offsetReader, offsetWriter, config, new SystemTime());
+        createWorkerTask(TargetState.STARTED);
+    }
+
+    private void createWorkerTask(TargetState initialState) {
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter,
+                valueConverter, producer, offsetReader, offsetWriter, config, new SystemTime());
+    }
+
+    @Test
+    public void testStartPaused() throws Exception {
+        final CountDownLatch startupLatch = new CountDownLatch(1);
+
+        createWorkerTask(TargetState.PAUSED);
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+            @Override
+            public Void answer() throws Throwable {
+                startupLatch.countDown();
+                return null;
+            }
+        });
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+
+        // we shouldn't get any calls to poll()
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        executor.submit(workerTask);
+        assertTrue(startupLatch.await(5, TimeUnit.SECONDS));
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPause() throws Exception {
+        createWorkerTask();
+
+        sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+        EasyMock.expectLastCall();
+        sourceTask.start(TASK_PROPS);
+        EasyMock.expectLastCall();
+        statusListener.onStartup(taskId);
+        EasyMock.expectLastCall();
+
+        AtomicInteger count = new AtomicInteger(0);
+        CountDownLatch pollLatch = expectPolls(10, count);
+        // In this test, we don't flush, so nothing goes any further than the offset writer
+
+        statusListener.onPause(taskId);
+        EasyMock.expectLastCall();
+
+        sourceTask.stop();
+        EasyMock.expectLastCall();
+        expectOffsetFlush(true);
+
+        statusListener.onShutdown(taskId);
+        EasyMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.initialize(TASK_CONFIG);
+        executor.submit(workerTask);
+        awaitPolls(pollLatch);
+
+        workerTask.transitionTo(TargetState.PAUSED);
+
+        int priorCount = count.get();
+        Thread.sleep(100);
+        assertEquals(priorCount, count.get());
+
+        workerTask.stop();
+        assertTrue(workerTask.awaitStop(1000));
+
+        PowerMock.verifyAll();
     }
 
     @Test
@@ -125,7 +217,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
+        sourceTask.start(TASK_PROPS);
         EasyMock.expectLastCall();
         statusListener.onStartup(taskId);
         EasyMock.expectLastCall();
@@ -142,7 +234,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
         awaitPolls(pollLatch);
         workerTask.stop();
@@ -157,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
+        sourceTask.start(TASK_PROPS);
         EasyMock.expectLastCall();
         statusListener.onStartup(taskId);
         EasyMock.expectLastCall();
@@ -175,7 +267,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
         awaitPolls(pollLatch);
         workerTask.stop();
@@ -191,7 +283,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
+        sourceTask.start(TASK_PROPS);
         EasyMock.expectLastCall();
         statusListener.onStartup(taskId);
         EasyMock.expectLastCall();
@@ -210,7 +302,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
         awaitPolls(pollLatch);
         assertTrue(workerTask.commitOffsets());
@@ -227,7 +319,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
+        sourceTask.start(TASK_PROPS);
         EasyMock.expectLastCall();
         statusListener.onStartup(taskId);
         EasyMock.expectLastCall();
@@ -245,7 +337,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
         awaitPolls(pollLatch);
         assertTrue(workerTask.commitOffsets());
@@ -317,7 +409,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
         EasyMock.expectLastCall();
-        sourceTask.start(EMPTY_TASK_PROPS);
+        sourceTask.start(TASK_PROPS);
         EasyMock.expectLastCall();
 
         statusListener.onStartup(taskId);
@@ -336,7 +428,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
         PowerMock.replayAll();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         executor.submit(workerTask);
         // Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
         // exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
@@ -348,8 +440,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         PowerMock.verifyAll();
     }
 
-    private CountDownLatch expectPolls(int count) throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(count);
+    private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(minimum);
         // Note that we stub these to allow any number of calls because the thread will continue to
         // run. The count passed in + latch returned just makes sure we get *at least* that number of
         // calls
@@ -357,6 +449,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
                 .andStubAnswer(new IAnswer<List<SourceRecord>>() {
                     @Override
                     public List<SourceRecord> answer() throws Throwable {
+                        count.incrementAndGet();
                         latch.countDown();
                         return RECORDS;
                     }
@@ -366,6 +459,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         return latch;
     }
 
+    private CountDownLatch expectPolls(int count) throws InterruptedException {
+        return expectPolls(count, new AtomicInteger());
+    }
+
     @SuppressWarnings("unchecked")
     private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException {
         expectConvertKeyValue(false);
@@ -446,8 +543,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             convertValueExpect.andReturn(SERIALIZED_RECORD);
     }
 
-    private void awaitPolls(CountDownLatch latch) throws InterruptedException {
-        latch.await(1000, TimeUnit.MILLISECONDS);
+    private boolean awaitPolls(CountDownLatch latch) throws InterruptedException {
+        return latch.await(1000, TimeUnit.MILLISECONDS);
     }
 
     @SuppressWarnings("unchecked")
@@ -469,4 +566,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         }
     }
 
+    private abstract static class TestSourceTask extends SourceTask {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 20e3fe2..36803db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -16,12 +16,13 @@
  **/
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Test;
 
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 
@@ -32,7 +33,11 @@ import static org.easymock.EasyMock.verify;
 
 public class WorkerTaskTest {
 
-    private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
+    private static final Map<String, String> TASK_PROPS = new HashMap<>();
+    static {
+        TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
+    }
+    private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     @Test
     public void standardStartup() {
@@ -41,14 +46,14 @@ public class WorkerTaskTest {
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
-                .withArgs(taskId, statusListener)
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
+                .withArgs(taskId, statusListener, TargetState.STARTED)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         expectLastCall();
 
         workerTask.execute();
@@ -65,7 +70,7 @@ public class WorkerTaskTest {
 
         replay(workerTask);
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.run();
         workerTask.stop();
         workerTask.awaitStop(1000L);
@@ -80,14 +85,14 @@ public class WorkerTaskTest {
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
-                .withArgs(taskId, statusListener)
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
+                .withArgs(taskId, statusListener, TargetState.STARTED)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
                 .createStrictMock();
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         EasyMock.expectLastCall();
 
         workerTask.close();
@@ -95,7 +100,7 @@ public class WorkerTaskTest {
 
         replay(workerTask);
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.stop();
         workerTask.awaitStop(1000L);
 
@@ -112,8 +117,8 @@ public class WorkerTaskTest {
         TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
 
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
-                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
-                .withArgs(taskId, statusListener)
+                .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
+                .withArgs(taskId, statusListener, TargetState.STARTED)
                 .addMockedMethod("initialize")
                 .addMockedMethod("execute")
                 .addMockedMethod("close")
@@ -130,7 +135,7 @@ public class WorkerTaskTest {
             }
         };
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         EasyMock.expectLastCall();
 
         workerTask.execute();
@@ -152,7 +157,7 @@ public class WorkerTaskTest {
 
         replay(workerTask);
 
-        workerTask.initialize(EMPTY_TASK_PROPS);
+        workerTask.initialize(TASK_CONFIG);
         workerTask.run();
 
         workerTask.stop();
@@ -163,4 +168,7 @@ public class WorkerTaskTest {
         verify(workerTask);
     }
 
+    private static abstract class TestSinkTask extends SinkTask {
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 557d789..2004c99 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -102,7 +102,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
-        connector.initialize(ctx);
+        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -126,10 +126,10 @@ public class WorkerTest extends ThreadedTest {
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener);
+        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.startConnector(config, ctx, connectorStatusListener);
+            worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -160,7 +160,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
 
-        connector.initialize(ctx);
+        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -184,7 +184,7 @@ public class WorkerTest extends ThreadedTest {
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener);
+        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -213,7 +213,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
 
-        connector.initialize(ctx);
+        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -237,7 +237,7 @@ public class WorkerTest extends ThreadedTest {
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener);
+        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
 
         worker.stopConnector(CONNECTOR_ID);
@@ -279,7 +279,7 @@ public class WorkerTest extends ThreadedTest {
         props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
         props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
 
-        connector.initialize(ctx);
+        connector.initialize(EasyMock.anyObject(ConnectorContext.class));
         EasyMock.expectLastCall();
         connector.start(props);
         EasyMock.expectLastCall();
@@ -309,10 +309,10 @@ public class WorkerTest extends ThreadedTest {
 
         ConnectorConfig config = new ConnectorConfig(props);
         assertEquals(Collections.emptySet(), worker.connectorNames());
-        worker.startConnector(config, ctx, connectorStatusListener);
+        worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
         try {
-            worker.startConnector(config, ctx, connectorStatusListener);
+            worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
             fail("Should have thrown exception when trying to add connector with same name.");
         } catch (ConnectException e) {
             // expected
@@ -351,6 +351,7 @@ public class WorkerTest extends ThreadedTest {
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
                 EasyMock.anyObject(TaskStatus.Listener.class),
+                EasyMock.eq(TargetState.STARTED),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -361,7 +362,7 @@ public class WorkerTest extends ThreadedTest {
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
-        workerTask.initialize(origProps);
+        workerTask.initialize(new TaskConfig(origProps));
         EasyMock.expectLastCall();
         workerTask.run();
         EasyMock.expectLastCall();
@@ -379,7 +380,7 @@ public class WorkerTest extends ThreadedTest {
         worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
         assertEquals(Collections.emptySet(), worker.taskIds());
-        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
         assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
         worker.stopAndAwaitTask(TASK_ID);
         assertEquals(Collections.emptySet(), worker.taskIds());
@@ -418,6 +419,7 @@ public class WorkerTest extends ThreadedTest {
                 WorkerSourceTask.class, EasyMock.eq(TASK_ID),
                 EasyMock.eq(task),
                 EasyMock.anyObject(TaskStatus.Listener.class),
+                EasyMock.eq(TargetState.STARTED),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(Converter.class),
                 EasyMock.anyObject(KafkaProducer.class),
@@ -428,7 +430,7 @@ public class WorkerTest extends ThreadedTest {
                 .andReturn(workerTask);
         Map<String, String> origProps = new HashMap<>();
         origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
-        workerTask.initialize(origProps);
+        workerTask.initialize(new TaskConfig(origProps));
         EasyMock.expectLastCall();
         workerTask.run();
         EasyMock.expectLastCall();
@@ -447,7 +449,7 @@ public class WorkerTest extends ThreadedTest {
 
         worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
         worker.start();
-        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+        worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
         worker.stop();
 
         PowerMock.verifyAll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b667fa8..e62b663 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.AlreadyExistsException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -31,12 +32,12 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.FutureCallback;
-import org.apache.kafka.connect.util.TestFuture;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
@@ -48,7 +49,6 @@ import org.powermock.api.easymock.annotation.Mock;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -127,13 +127,15 @@ public class DistributedHerderTest {
         TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
     }
     private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+            Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
     private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
-            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+            Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED),
+            TASK_CONFIGS_MAP, Collections.<String>emptySet());
 
     private static final String WORKER_ID = "localhost:8083";
 
-    @Mock private KafkaConfigStorage configStorage;
+    @Mock private KafkaConfigBackingStore configStorage;
     @Mock private StatusBackingStore statusBackingStore;
     @Mock private WorkerGroupMember member;
     private MockTime time;
@@ -141,8 +143,7 @@ public class DistributedHerderTest {
     @Mock private Worker worker;
     @Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
 
-    private Callback<String> connectorConfigCallback;
-    private Callback<List<ConnectorTaskId>> taskConfigCallback;
+    private ConfigBackingStore.UpdateListener configUpdateListener;
     private WorkerRebalanceListener rebalanceListener;
 
     @Before
@@ -152,24 +153,27 @@ public class DistributedHerderTest {
         time = new MockTime();
 
         herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
-                new DistributedConfig(HERDER_CONFIG), WORKER_ID, worker, statusBackingStore, configStorage, member, MEMBER_URL, time);
-        connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
-        taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
-        rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
+                new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configStorage, member, MEMBER_URL, time);
+
+        configUpdateListener = herder.new ConfigUpdateListener();
+        rebalanceListener = herder.new RebalanceListener();
+
         PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
     }
 
     @Test
-    public void testJoinAssignment() {
+    public void testJoinAssignment() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -182,16 +186,17 @@ public class DistributedHerderTest {
     }
 
     @Test
-    public void testRebalance() {
+    public void testRebalance() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -201,9 +206,48 @@ public class DistributedHerderTest {
 
         // and the new assignment started
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        herder.tick();
+        herder.tick();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRebalanceFailedConnector() throws Exception {
+        // Join group and get assignment
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
+
+        // and the new assignment started
+        worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);
+
+        // worker is not running, so we should see no call to connectorTaskConfigs()
+
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -231,6 +275,8 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         statusBackingStore.stop();
         PowerMock.expectLastCall();
+        worker.stop();
+        PowerMock.expectLastCall();
 
         PowerMock.replayAll();
 
@@ -295,14 +341,15 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
         // And delete the connector
         member.wakeup();
         PowerMock.expectLastCall();
-        configStorage.putConnectorConfig(CONN1, null);
+        configStorage.removeConnectorConfig(CONN1);
         PowerMock.expectLastCall();
         putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));
         PowerMock.expectLastCall();
@@ -329,8 +376,9 @@ public class DistributedHerderTest {
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         // now handle the connector restart
         member.wakeup();
@@ -345,8 +393,9 @@ public class DistributedHerderTest {
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
 
         PowerMock.replayAll();
 
@@ -479,7 +528,7 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
 
         // now handle the task restart
@@ -494,7 +543,7 @@ public class DistributedHerderTest {
 
         worker.stopAndAwaitTask(TASK0);
         PowerMock.expectLastCall();
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
 
         PowerMock.replayAll();
@@ -637,7 +686,8 @@ public class DistributedHerderTest {
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
@@ -646,7 +696,7 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick(); // join
-        connectorConfigCallback.onCompletion(null, CONN1); // read updated config
+        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config
         herder.tick(); // apply config
         herder.tick(); // do rebalance
 
@@ -654,7 +704,7 @@ public class DistributedHerderTest {
     }
 
     @Test
-    public void testConnectorConfigUpdate() {
+    public void testConnectorConfigUpdate() throws Exception {
         // Connector config can be applied without any rebalance
 
         EasyMock.expect(member.memberId()).andStubReturn("member");
@@ -664,8 +714,9 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -678,8 +729,9 @@ public class DistributedHerderTest {
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall();
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -687,7 +739,7 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick(); // join
-        connectorConfigCallback.onCompletion(null, CONN1); // read updated config
+        configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config
         herder.tick(); // apply config
 
         PowerMock.verifyAll();
@@ -715,7 +767,7 @@ public class DistributedHerderTest {
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
                 Arrays.asList(TASK0));
-        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
@@ -723,7 +775,7 @@ public class DistributedHerderTest {
         PowerMock.replayAll();
 
         herder.tick(); // join
-        taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, TASK2)); // read updated config
+        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK0, TASK1, TASK2)); // read updated config
         herder.tick(); // apply config
         herder.tick(); // do rebalance
 
@@ -738,9 +790,8 @@ public class DistributedHerderTest {
                 ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
                 Collections.<ConnectorTaskId>emptyList());
         // Reading to end of log times out
-        TestFuture<Void> readToEndFuture = new TestFuture<>();
-        readToEndFuture.resolveOnGet(new TimeoutException());
-        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+        configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall().andThrow(new TimeoutException());
         member.maybeLeaveGroup();
         EasyMock.expectLastCall();
         PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
@@ -751,11 +802,12 @@ public class DistributedHerderTest {
         expectPostRebalanceCatchup(SNAPSHOT);
 
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
-        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         member.poll(EasyMock.anyInt());
         PowerMock.expectLastCall();
 
@@ -816,8 +868,9 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
         // list connectors, get connector info, get connector config, get task configs
@@ -834,7 +887,7 @@ public class DistributedHerderTest {
             @Override
             public Object answer() throws Throwable {
                 // Simulate response to writing config + waiting until end of log to be read
-                connectorConfigCallback.onCompletion(null, CONN1);
+                configUpdateListener.onConnectorConfigUpdate(CONN1);
                 return null;
             }
         });
@@ -845,8 +898,9 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
         Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
         worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
         EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
 
         member.poll(EasyMock.anyInt());
@@ -943,10 +997,9 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
     }
 
-    private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) {
-        TestFuture<Void> readToEndFuture = new TestFuture<>();
-        readToEndFuture.resolveOnGet((Void) null);
-        EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+    private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) throws TimeoutException {
+        configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+        EasyMock.expectLastCall();
         EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index bf33cb3..f7423ec 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -31,7 +31,8 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
@@ -76,7 +77,7 @@ public class WorkerCoordinatorTest {
     private Metrics metrics;
     private ConsumerNetworkClient consumerClient;
     private MockRebalanceListener rebalanceListener;
-    @Mock private KafkaConfigStorage configStorage;
+    @Mock private KafkaConfigBackingStore configStorage;
     private WorkerCoordinator coordinator;
 
     private ClusterConfigState configState1;
@@ -91,7 +92,7 @@ public class WorkerCoordinatorTest {
         this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
         this.metrics = new Metrics(time);
         this.rebalanceListener = new MockRebalanceListener();
-        this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
+        this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
 
         client.setNode(node);
 
@@ -110,6 +111,7 @@ public class WorkerCoordinatorTest {
         configState1 = new ClusterConfigState(
                 1L, Collections.singletonMap(connectorId, 1),
                 Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()),
+                Collections.singletonMap(connectorId, TargetState.STARTED),
                 Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()),
                 Collections.<String>emptySet()
         );
@@ -119,6 +121,9 @@ public class WorkerCoordinatorTest {
         Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>();
         configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>());
         configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>());
+        Map<String, TargetState> targetStates = new HashMap<>();
+        targetStates.put(connectorId, TargetState.STARTED);
+        targetStates.put(connectorId2, TargetState.STARTED);
         Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>();
         configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
         configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
@@ -126,6 +131,7 @@ public class WorkerCoordinatorTest {
         configState2 = new ClusterConfigState(
                 2L, configState2ConnectorTaskCounts,
                 configState2ConnectorConfigs,
+                targetStates,
                 configState2TaskConfigs,
                 Collections.<String>emptySet()
         );

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 05a64a1..10e5194 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.ConnectorStatus;
 import org.apache.kafka.connect.runtime.Herder;
 import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.TaskConfig;
 import org.apache.kafka.connect.runtime.TaskStatus;
 import org.apache.kafka.connect.runtime.Worker;
@@ -36,6 +37,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -80,7 +82,7 @@ public class StandaloneHerderTest {
 
     @Before
     public void setup() {
-        herder = new StandaloneHerder(WORKER_ID, worker, statusBackingStore);
+        herder = new StandaloneHerder(worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
     }
 
     @Test
@@ -163,7 +165,7 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall();
 
         worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
-                EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder));
+                EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
 
         PowerMock.replayAll();
@@ -212,7 +214,7 @@ public class StandaloneHerderTest {
 
         RuntimeException e = new RuntimeException();
         worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
-                EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder));
+                EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall().andThrow(e);
 
         PowerMock.replayAll();
@@ -240,7 +242,7 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall();
 
         Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
-        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder);
+        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
         EasyMock.expectLastCall();
 
         PowerMock.replayAll();
@@ -290,7 +292,7 @@ public class StandaloneHerderTest {
 
         RuntimeException e = new RuntimeException();
         Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
-        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder);
+        worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
         EasyMock.expectLastCall().andThrow(e);
 
         PowerMock.replayAll();
@@ -316,6 +318,11 @@ public class StandaloneHerderTest {
         // herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
         expectStop();
 
+        statusBackingStore.stop();
+        EasyMock.expectLastCall();
+        worker.stop();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
@@ -334,7 +341,7 @@ public class StandaloneHerderTest {
         Callback<List<TaskInfo>> taskConfigsCb = PowerMock.createMock(Callback.class);
 
         // Check accessors with empty worker
-        listConnectorsCb.onCompletion(null, Collections.EMPTY_LIST);
+        listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
         EasyMock.expectLastCall();
         connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<ConnectorInfo>isNull());
         EasyMock.expectLastCall();
@@ -349,7 +356,7 @@ public class StandaloneHerderTest {
         expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
 
         // Validate accessors with 1 connector
-        listConnectorsCb.onCompletion(null, Arrays.asList(CONNECTOR_NAME));
+        listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
         EasyMock.expectLastCall();
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         connectorInfoCb.onCompletion(null, connInfo);
@@ -399,8 +406,9 @@ public class StandaloneHerderTest {
         EasyMock.expectLastCall();
         Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
         worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
         // Generate same task config, which should result in no additional action to restart tasks
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
                 .andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
@@ -411,7 +419,6 @@ public class StandaloneHerderTest {
         connectorConfigCb.onCompletion(null, newConnConfig);
         EasyMock.expectLastCall();
 
-
         PowerMock.replayAll();
 
         herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
@@ -444,8 +451,9 @@ public class StandaloneHerderTest {
         Map<String, String> connectorProps = connectorConfig(name, connClass);
 
         worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
-                EasyMock.eq(herder));
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         EasyMock.expectLastCall();
+        EasyMock.expect(worker.isRunning(name)).andReturn(true);
 
         ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
         createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
@@ -457,15 +465,15 @@ public class StandaloneHerderTest {
         EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
                 .andReturn(Collections.singletonList(generatedTaskProps));
 
-        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder);
+        worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
         EasyMock.expectLastCall();
     }
 
     private void expectStop() {
         ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
-        worker.stopTasks(Collections.singleton(task));
+        worker.stopTasks(Collections.singletonList(task));
         EasyMock.expectLastCall();
-        worker.awaitStopTasks(Collections.singleton(task));
+        worker.awaitStopTasks(Collections.singletonList(task));
         EasyMock.expectLastCall();
         worker.stopConnector(CONNECTOR_NAME);
         EasyMock.expectLastCall();
@@ -475,7 +483,6 @@ public class StandaloneHerderTest {
         expectStop();
     }
 
-
     private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass) {
         HashMap<String, String> connectorProps = new HashMap<>();
         connectorProps.put(ConnectorConfig.NAME_CONFIG, name);