You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/20 23:10:21 UTC
[2/5] kafka git commit: KAFKA-2370: kafka connect pause/resume API
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
new file mode 100644
index 0000000..a5f7409
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockRunner;
+import org.easymock.EasyMockSupport;
+import org.easymock.Mock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.expectLastCall;
+
+@RunWith(EasyMockRunner.class)
+public class WorkerConnectorTest extends EasyMockSupport {
+
+ public static final String CONNECTOR = "connector";
+ public static final Map<String, String> CONFIG = new HashMap<>();
+ static {
+ CONFIG.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, TestConnector.class.getName());
+ CONFIG.put(ConnectorConfig.NAME_CONFIG, CONNECTOR);
+ }
+ public static final ConnectorConfig CONNECTOR_CONFIG = new ConnectorConfig(CONFIG);
+
+ @Mock Connector connector;
+ @Mock ConnectorContext ctx;
+ @Mock ConnectorStatus.Listener listener;
+
+ @Test
+ public void testInitializeFailure() {
+ RuntimeException exception = new RuntimeException();
+
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall().andThrow(exception);
+
+ listener.onFailure(CONNECTOR, exception);
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testFailureIsFinalState() {
+ RuntimeException exception = new RuntimeException();
+
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall().andThrow(exception);
+
+ listener.onFailure(CONNECTOR, exception);
+ expectLastCall();
+
+ // expect no call to onStartup() after failure
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testStartupAndShutdown() {
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall();
+
+ listener.onStartup(CONNECTOR);
+ expectLastCall();
+
+ connector.stop();
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testStartupAndPause() {
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall();
+
+ listener.onStartup(CONNECTOR);
+ expectLastCall();
+
+ connector.stop();
+ expectLastCall();
+
+ listener.onPause(CONNECTOR);
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.transitionTo(TargetState.PAUSED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testOnResume() {
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ listener.onPause(CONNECTOR);
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall();
+
+ listener.onResume(CONNECTOR);
+ expectLastCall();
+
+ connector.stop();
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.PAUSED);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testStartupPaused() {
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ // connector never gets started
+
+ listener.onPause(CONNECTOR);
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.PAUSED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testStartupFailure() {
+ RuntimeException exception = new RuntimeException();
+
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall().andThrow(exception);
+
+ listener.onFailure(CONNECTOR, exception);
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testShutdownFailure() {
+ RuntimeException exception = new RuntimeException();
+
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall();
+
+ listener.onStartup(CONNECTOR);
+ expectLastCall();
+
+ connector.stop();
+ expectLastCall().andThrow(exception);
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testTransitionStartedToStarted() {
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall();
+
+ // expect only one call to onStartup()
+ listener.onStartup(CONNECTOR);
+ expectLastCall();
+
+ connector.stop();
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ @Test
+ public void testTransitionPausedToPaused() {
+ connector.initialize(EasyMock.notNull(ConnectorContext.class));
+ expectLastCall();
+
+ connector.start(CONFIG);
+ expectLastCall();
+
+ listener.onStartup(CONNECTOR);
+ expectLastCall();
+
+ connector.stop();
+ expectLastCall();
+
+ listener.onPause(CONNECTOR);
+ expectLastCall();
+
+ listener.onShutdown(CONNECTOR);
+ expectLastCall();
+
+ replayAll();
+
+ WorkerConnector workerConnector = new WorkerConnector(CONNECTOR, connector, ctx, listener);
+
+ workerConnector.initialize(CONNECTOR_CONFIG);
+ workerConnector.transitionTo(TargetState.STARTED);
+ workerConnector.transitionTo(TargetState.PAUSED);
+ workerConnector.transitionTo(TargetState.PAUSED);
+ workerConnector.shutdown();
+
+ verifyAll();
+ }
+
+ private static abstract class TestConnector extends Connector {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 7bc83de..835e30f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
@@ -49,14 +50,15 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import static java.util.Arrays.asList;
import static java.util.Collections.singleton;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@@ -84,10 +86,12 @@ public class WorkerSinkTaskTest {
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
}
-
+ private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private TargetState initialState = TargetState.STARTED;
private Time time;
private WorkerSinkTask workerTask;
@Mock
@@ -120,12 +124,91 @@ public class WorkerSinkTaskTest {
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
- taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
+ taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
@Test
+ public void testStartPaused() throws Exception {
+ workerTask = PowerMock.createPartialMock(
+ WorkerSinkTask.class, new String[]{"createConsumer"},
+ taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, time);
+
+ expectInitializeTask();
+ expectPollInitialAssignment();
+
+ Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+ EasyMock.expect(consumer.assignment()).andReturn(partitions);
+ consumer.pause(partitions);
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.poll(Long.MAX_VALUE);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPause() throws Exception {
+ expectInitializeTask();
+ expectPollInitialAssignment();
+
+ expectConsumerPoll(1);
+ expectConvertMessages(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ Set<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+
+ // Pause
+ statusListener.onPause(taskId);
+ EasyMock.expectLastCall();
+ expectConsumerWakeup();
+ EasyMock.expect(consumer.assignment()).andReturn(partitions);
+ consumer.pause(partitions);
+ PowerMock.expectLastCall();
+
+ // No records returned
+ expectConsumerPoll(0);
+ sinkTask.put(Collections.<SinkRecord>emptyList());
+ EasyMock.expectLastCall();
+
+ // And unpause
+ statusListener.onResume(taskId);
+ EasyMock.expectLastCall();
+ expectConsumerWakeup();
+ EasyMock.expect(consumer.assignment()).andReturn(new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2)));
+ consumer.resume(singleton(TOPIC_PARTITION));
+ PowerMock.expectLastCall();
+ consumer.resume(singleton(TOPIC_PARTITION2));
+ PowerMock.expectLastCall();
+
+ expectConsumerPoll(1);
+ expectConvertMessages(1);
+ sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ workerTask.initializeAndStart();
+ workerTask.poll(Long.MAX_VALUE); // initial assignment
+ workerTask.poll(Long.MAX_VALUE); // fetch some data
+ workerTask.transitionTo(TargetState.PAUSED);
+ workerTask.poll(Long.MAX_VALUE); // wakeup
+ workerTask.poll(Long.MAX_VALUE); // now paused
+ workerTask.transitionTo(TargetState.STARTED);
+ workerTask.poll(Long.MAX_VALUE); // wakeup
+ workerTask.poll(Long.MAX_VALUE); // now unpaused
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testPollRedelivery() throws Exception {
expectInitializeTask();
expectPollInitialAssignment();
@@ -137,7 +220,7 @@ public class WorkerSinkTaskTest {
sinkTask.put(EasyMock.capture(records));
EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
// Pause
- HashSet<TopicPartition> partitions = new HashSet<>(Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2));
+ HashSet<TopicPartition> partitions = new HashSet<>(asList(TOPIC_PARTITION, TOPIC_PARTITION2));
EasyMock.expect(consumer.assignment()).andReturn(partitions);
consumer.pause(partitions);
PowerMock.expectLastCall();
@@ -155,7 +238,7 @@ public class WorkerSinkTaskTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
workerTask.poll(Long.MAX_VALUE);
@@ -174,7 +257,7 @@ public class WorkerSinkTaskTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
try {
@@ -197,7 +280,7 @@ public class WorkerSinkTaskTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.poll(Long.MAX_VALUE);
try {
@@ -213,7 +296,7 @@ public class WorkerSinkTaskTest {
private void expectInitializeTask() throws Exception {
PowerMock.expectPrivate(workerTask, "createConsumer").andReturn(consumer);
- consumer.subscribe(EasyMock.eq(Arrays.asList(TOPIC)), EasyMock.capture(rebalanceListener));
+ consumer.subscribe(EasyMock.eq(asList(TOPIC)), EasyMock.capture(rebalanceListener));
PowerMock.expectLastCall();
sinkTask.initialize(EasyMock.capture(sinkTaskContext));
@@ -223,7 +306,7 @@ public class WorkerSinkTaskTest {
}
private void expectRebalanceRevocationError(RuntimeException e) {
- final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+ final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
sinkTask.close(new HashSet<>(partitions));
EasyMock.expectLastCall().andThrow(e);
@@ -239,7 +322,7 @@ public class WorkerSinkTaskTest {
}
private void expectRebalanceAssignmentError(RuntimeException e) {
- final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+ final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
sinkTask.close(new HashSet<>(partitions));
EasyMock.expectLastCall();
@@ -268,7 +351,7 @@ public class WorkerSinkTaskTest {
}
private void expectPollInitialAssignment() {
- final List<TopicPartition> partitions = Arrays.asList(TOPIC_PARTITION, TOPIC_PARTITION2);
+ final List<TopicPartition> partitions = asList(TOPIC_PARTITION, TOPIC_PARTITION2);
sinkTask.open(partitions);
EasyMock.expectLastCall();
@@ -287,6 +370,12 @@ public class WorkerSinkTaskTest {
EasyMock.expectLastCall();
}
+ private void expectConsumerWakeup() {
+ consumer.wakeup();
+ EasyMock.expectLastCall();
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andThrow(new WakeupException());
+ }
+
private void expectConsumerPoll(final int numMessages) {
EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
new IAnswer<ConsumerRecords<byte[], byte[]>>() {
@@ -309,4 +398,8 @@ public class WorkerSinkTaskTest {
EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
}
+
+ private abstract static class TestSinkTask extends SinkTask {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 25f0bf4..25dbff5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -89,9 +89,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private static final Map<String, String> TASK_PROPS = new HashMap<>();
static {
TASK_PROPS.put(SinkConnector.TOPICS_CONFIG, TOPIC);
+ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
}
+ private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private TargetState initialState = TargetState.STARTED;
private Time time;
@Mock private SinkTask sinkTask;
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
@@ -105,6 +108,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private long recordsReturned;
+
@SuppressWarnings("unchecked")
@Override
public void setup() {
@@ -121,7 +125,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
- taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
+ taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
@@ -136,7 +140,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// First iteration initializes partition assignment
@@ -147,7 +151,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
workerTask.iteration();
}
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
// Verify contents match expected values, i.e. that they were translated properly. With max
@@ -180,7 +183,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initialize partition assignment
@@ -193,7 +196,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
// Commit finishes synchronously for testing so we can check this immediately
assertEquals(0, workerTask.commitFailures());
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
assertEquals(2, capturedRecords.getValues().size());
@@ -220,7 +222,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initialize partition assignment
@@ -233,7 +235,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
assertEquals(1, workerTask.commitFailures());
assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
PowerMock.verifyAll();
@@ -259,7 +260,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initialize partition assignment
@@ -274,7 +275,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
assertEquals(1, workerTask.commitFailures());
assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
PowerMock.verifyAll();
@@ -292,7 +292,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initialize partition assignment
@@ -306,7 +306,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
assertEquals(1, workerTask.commitFailures());
assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
PowerMock.verifyAll();
@@ -325,7 +324,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
// Initialize partition assignment
@@ -342,7 +341,6 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
assertEquals(1, workerTask.commitFailures());
assertEquals(false, Whitebox.getInternalState(workerTask, "committing"));
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
PowerMock.verifyAll();
@@ -405,13 +403,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
PowerMock.verifyAll();
@@ -449,13 +446,12 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
expectStopTask(3);
PowerMock.replayAll();
- workerTask.initialize(TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.initializeAndStart();
workerTask.iteration();
workerTask.iteration();
workerTask.iteration();
workerTask.stop();
- workerTask.awaitStop(Long.MAX_VALUE);
workerTask.close();
PowerMock.verifyAll();
@@ -597,4 +593,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
return capturedCallback;
}
+ private static abstract class TestSinkTask extends SinkTask {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 3dd07a6..0d805da 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -58,6 +58,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -94,7 +95,12 @@ public class WorkerSourceTaskTest extends ThreadedTest {
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
- private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
+ private static final Map<String, String> TASK_PROPS = new HashMap<>();
+ static {
+ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
+ }
+ private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
+
private static final List<SourceRecord> RECORDS = Arrays.asList(
new SourceRecord(PARTITION, OFFSET, "topic", null, KEY_SCHEMA, KEY, RECORD_SCHEMA, RECORD)
);
@@ -115,8 +121,94 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private void createWorkerTask() {
- workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, keyConverter, valueConverter, producer,
- offsetReader, offsetWriter, config, new SystemTime());
+ createWorkerTask(TargetState.STARTED);
+ }
+
+ private void createWorkerTask(TargetState initialState) {
+ workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter,
+ valueConverter, producer, offsetReader, offsetWriter, config, new SystemTime());
+ }
+
+ @Test
+ public void testStartPaused() throws Exception {
+ final CountDownLatch startupLatch = new CountDownLatch(1);
+
+ createWorkerTask(TargetState.PAUSED);
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(TASK_PROPS);
+ EasyMock.expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ startupLatch.countDown();
+ return null;
+ }
+ });
+ statusListener.onPause(taskId);
+ EasyMock.expectLastCall();
+
+ // we shouldn't get any calls to poll()
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ executor.submit(workerTask);
+ assertTrue(startupLatch.await(5, TimeUnit.SECONDS));
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPause() throws Exception {
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(TASK_PROPS);
+ EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
+
+ AtomicInteger count = new AtomicInteger(0);
+ CountDownLatch pollLatch = expectPolls(10, count);
+ // In this test, we don't flush, so nothing goes any further than the offset writer
+
+ statusListener.onPause(taskId);
+ EasyMock.expectLastCall();
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(TASK_CONFIG);
+ executor.submit(workerTask);
+ awaitPolls(pollLatch);
+
+ workerTask.transitionTo(TargetState.PAUSED);
+
+ int priorCount = count.get();
+ Thread.sleep(100);
+ assertEquals(priorCount, count.get());
+
+ workerTask.stop();
+ assertTrue(workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
}
@Test
@@ -125,7 +217,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
- sourceTask.start(EMPTY_TASK_PROPS);
+ sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
@@ -142,7 +234,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
workerTask.stop();
@@ -157,7 +249,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
- sourceTask.start(EMPTY_TASK_PROPS);
+ sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
@@ -175,7 +267,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
workerTask.stop();
@@ -191,7 +283,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
- sourceTask.start(EMPTY_TASK_PROPS);
+ sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
@@ -210,7 +302,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
assertTrue(workerTask.commitOffsets());
@@ -227,7 +319,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
- sourceTask.start(EMPTY_TASK_PROPS);
+ sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
EasyMock.expectLastCall();
@@ -245,7 +337,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
awaitPolls(pollLatch);
assertTrue(workerTask.commitOffsets());
@@ -317,7 +409,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
- sourceTask.start(EMPTY_TASK_PROPS);
+ sourceTask.start(TASK_PROPS);
EasyMock.expectLastCall();
statusListener.onStartup(taskId);
@@ -336,7 +428,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.replayAll();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
executor.submit(workerTask);
// Stopping immediately while the other thread has work to do should result in no polling, no offset commits,
// exiting the work thread immediately, and the stop() method will be invoked in the background thread since it
@@ -348,8 +440,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
PowerMock.verifyAll();
}
- private CountDownLatch expectPolls(int count) throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(count);
+ private CountDownLatch expectPolls(int minimum, final AtomicInteger count) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(minimum);
// Note that we stub these to allow any number of calls because the thread will continue to
// run. The count passed in + latch returned just makes sure we get *at least* that number of
// calls
@@ -357,6 +449,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
.andStubAnswer(new IAnswer<List<SourceRecord>>() {
@Override
public List<SourceRecord> answer() throws Throwable {
+ count.incrementAndGet();
latch.countDown();
return RECORDS;
}
@@ -366,6 +459,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
return latch;
}
+ private CountDownLatch expectPolls(int count) throws InterruptedException {
+ return expectPolls(count, new AtomicInteger());
+ }
+
@SuppressWarnings("unchecked")
private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException {
expectConvertKeyValue(false);
@@ -446,8 +543,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
convertValueExpect.andReturn(SERIALIZED_RECORD);
}
- private void awaitPolls(CountDownLatch latch) throws InterruptedException {
- latch.await(1000, TimeUnit.MILLISECONDS);
+ private boolean awaitPolls(CountDownLatch latch) throws InterruptedException {
+ return latch.await(1000, TimeUnit.MILLISECONDS);
}
@SuppressWarnings("unchecked")
@@ -469,4 +566,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
}
+ private abstract static class TestSourceTask extends SourceTask {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 20e3fe2..36803db 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -16,12 +16,13 @@
**/
package org.apache.kafka.connect.runtime;
+import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Test;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
@@ -32,7 +33,11 @@ import static org.easymock.EasyMock.verify;
public class WorkerTaskTest {
- private static final Map<String, String> EMPTY_TASK_PROPS = Collections.emptyMap();
+ private static final Map<String, String> TASK_PROPS = new HashMap<>();
+ static {
+ TASK_PROPS.put(TaskConfig.TASK_CLASS_CONFIG, TestSinkTask.class.getName());
+ }
+ private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
@Test
public void standardStartup() {
@@ -41,14 +46,14 @@ public class WorkerTaskTest {
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
- .withArgs(taskId, statusListener)
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
+ .withArgs(taskId, statusListener, TargetState.STARTED)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
.createStrictMock();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
expectLastCall();
workerTask.execute();
@@ -65,7 +70,7 @@ public class WorkerTaskTest {
replay(workerTask);
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.run();
workerTask.stop();
workerTask.awaitStop(1000L);
@@ -80,14 +85,14 @@ public class WorkerTaskTest {
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
- .withArgs(taskId, statusListener)
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
+ .withArgs(taskId, statusListener, TargetState.STARTED)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
.createStrictMock();
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
EasyMock.expectLastCall();
workerTask.close();
@@ -95,7 +100,7 @@ public class WorkerTaskTest {
replay(workerTask);
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.stop();
workerTask.awaitStop(1000L);
@@ -112,8 +117,8 @@ public class WorkerTaskTest {
TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
- .withArgs(taskId, statusListener)
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class, TargetState.class)
+ .withArgs(taskId, statusListener, TargetState.STARTED)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
@@ -130,7 +135,7 @@ public class WorkerTaskTest {
}
};
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
EasyMock.expectLastCall();
workerTask.execute();
@@ -152,7 +157,7 @@ public class WorkerTaskTest {
replay(workerTask);
- workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.initialize(TASK_CONFIG);
workerTask.run();
workerTask.stop();
@@ -163,4 +168,7 @@ public class WorkerTaskTest {
verify(workerTask);
}
+ private static abstract class TestSinkTask extends SinkTask {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index 557d789..2004c99 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -102,7 +102,7 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
- connector.initialize(ctx);
+ connector.initialize(EasyMock.anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
connector.start(props);
EasyMock.expectLastCall();
@@ -126,10 +126,10 @@ public class WorkerTest extends ThreadedTest {
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.startConnector(config, ctx, connectorStatusListener);
+ worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
try {
- worker.startConnector(config, ctx, connectorStatusListener);
+ worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
fail("Should have thrown exception when trying to add connector with same name.");
} catch (ConnectException e) {
// expected
@@ -160,7 +160,7 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTestConnector");
- connector.initialize(ctx);
+ connector.initialize(EasyMock.anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
connector.start(props);
EasyMock.expectLastCall();
@@ -184,7 +184,7 @@ public class WorkerTest extends ThreadedTest {
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.startConnector(config, ctx, connectorStatusListener);
+ worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
worker.stopConnector(CONNECTOR_ID);
@@ -213,7 +213,7 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, "WorkerTest");
- connector.initialize(ctx);
+ connector.initialize(EasyMock.anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
connector.start(props);
EasyMock.expectLastCall();
@@ -237,7 +237,7 @@ public class WorkerTest extends ThreadedTest {
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.startConnector(config, ctx, connectorStatusListener);
+ worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
worker.stopConnector(CONNECTOR_ID);
@@ -279,7 +279,7 @@ public class WorkerTest extends ThreadedTest {
props.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_ID);
props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, WorkerTestConnector.class.getName());
- connector.initialize(ctx);
+ connector.initialize(EasyMock.anyObject(ConnectorContext.class));
EasyMock.expectLastCall();
connector.start(props);
EasyMock.expectLastCall();
@@ -309,10 +309,10 @@ public class WorkerTest extends ThreadedTest {
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.startConnector(config, ctx, connectorStatusListener);
+ worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
try {
- worker.startConnector(config, ctx, connectorStatusListener);
+ worker.startConnector(config, ctx, connectorStatusListener, TargetState.STARTED);
fail("Should have thrown exception when trying to add connector with same name.");
} catch (ConnectException e) {
// expected
@@ -351,6 +351,7 @@ public class WorkerTest extends ThreadedTest {
WorkerSourceTask.class, EasyMock.eq(TASK_ID),
EasyMock.eq(task),
EasyMock.anyObject(TaskStatus.Listener.class),
+ EasyMock.eq(TargetState.STARTED),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
@@ -361,7 +362,7 @@ public class WorkerTest extends ThreadedTest {
.andReturn(workerTask);
Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
- workerTask.initialize(origProps);
+ workerTask.initialize(new TaskConfig(origProps));
EasyMock.expectLastCall();
workerTask.run();
EasyMock.expectLastCall();
@@ -379,7 +380,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
assertEquals(Collections.emptySet(), worker.taskIds());
- worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+ worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
worker.stopAndAwaitTask(TASK_ID);
assertEquals(Collections.emptySet(), worker.taskIds());
@@ -418,6 +419,7 @@ public class WorkerTest extends ThreadedTest {
WorkerSourceTask.class, EasyMock.eq(TASK_ID),
EasyMock.eq(task),
EasyMock.anyObject(TaskStatus.Listener.class),
+ EasyMock.eq(TargetState.STARTED),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
@@ -428,7 +430,7 @@ public class WorkerTest extends ThreadedTest {
.andReturn(workerTask);
Map<String, String> origProps = new HashMap<>();
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
- workerTask.initialize(origProps);
+ workerTask.initialize(new TaskConfig(origProps));
EasyMock.expectLastCall();
workerTask.run();
EasyMock.expectLastCall();
@@ -447,7 +449,7 @@ public class WorkerTest extends ThreadedTest {
worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
- worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+ worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener, TargetState.STARTED);
worker.stop();
PowerMock.verifyAll();
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index b667fa8..e62b663 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -31,12 +32,12 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
-import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
-import org.apache.kafka.connect.util.TestFuture;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
@@ -48,7 +49,6 @@ import org.powermock.api.easymock.annotation.Mock;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
import java.util.ArrayList;
import java.util.Arrays;
@@ -127,13 +127,15 @@ public class DistributedHerderTest {
TASK_CONFIGS_MAP.put(TASK2, TASK_CONFIG);
}
private static final ClusterConfigState SNAPSHOT = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
- Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), Collections.singletonMap(CONN1, TargetState.STARTED),
+ TASK_CONFIGS_MAP, Collections.<String>emptySet());
private static final String WORKER_ID = "localhost:8083";
- @Mock private KafkaConfigStorage configStorage;
+ @Mock private KafkaConfigBackingStore configStorage;
@Mock private StatusBackingStore statusBackingStore;
@Mock private WorkerGroupMember member;
private MockTime time;
@@ -141,8 +143,7 @@ public class DistributedHerderTest {
@Mock private Worker worker;
@Mock private Callback<Herder.Created<ConnectorInfo>> putConnectorCallback;
- private Callback<String> connectorConfigCallback;
- private Callback<List<ConnectorTaskId>> taskConfigCallback;
+ private ConfigBackingStore.UpdateListener configUpdateListener;
private WorkerRebalanceListener rebalanceListener;
@Before
@@ -152,24 +153,27 @@ public class DistributedHerderTest {
time = new MockTime();
herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
- new DistributedConfig(HERDER_CONFIG), WORKER_ID, worker, statusBackingStore, configStorage, member, MEMBER_URL, time);
- connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
- taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
- rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
+ new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, statusBackingStore, configStorage, member, MEMBER_URL, time);
+
+ configUpdateListener = herder.new ConfigUpdateListener();
+ rebalanceListener = herder.new RebalanceListener();
+
PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
}
@Test
- public void testJoinAssignment() {
+ public void testJoinAssignment() throws Exception {
// Join group and get assignment
EasyMock.expect(member.memberId()).andStubReturn("member");
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
expectPostRebalanceCatchup(SNAPSHOT);
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -182,16 +186,17 @@ public class DistributedHerderTest {
}
@Test
- public void testRebalance() {
+ public void testRebalance() throws Exception {
// Join group and get assignment
EasyMock.expect(member.memberId()).andStubReturn("member");
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
expectPostRebalanceCatchup(SNAPSHOT);
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -201,9 +206,48 @@ public class DistributedHerderTest {
// and the new assignment started
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testRebalanceFailedConnector() throws Exception {
+ // Join group and get assignment
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+ expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+ expectPostRebalanceCatchup(SNAPSHOT);
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
+ 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
+
+ // and the new assignment started
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(false);
+
+ // worker is not running, so we should see no call to connectorTaskConfigs()
+
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -231,6 +275,8 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
statusBackingStore.stop();
PowerMock.expectLastCall();
+ worker.stop();
+ PowerMock.expectLastCall();
PowerMock.replayAll();
@@ -295,14 +341,15 @@ public class DistributedHerderTest {
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
// And delete the connector
member.wakeup();
PowerMock.expectLastCall();
- configStorage.putConnectorConfig(CONN1, null);
+ configStorage.removeConnectorConfig(CONN1);
PowerMock.expectLastCall();
putConnectorCallback.onCompletion(null, new Herder.Created<ConnectorInfo>(false, null));
PowerMock.expectLastCall();
@@ -329,8 +376,9 @@ public class DistributedHerderTest {
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
// now handle the connector restart
member.wakeup();
@@ -345,8 +393,9 @@ public class DistributedHerderTest {
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
PowerMock.replayAll();
@@ -479,7 +528,7 @@ public class DistributedHerderTest {
expectPostRebalanceCatchup(SNAPSHOT);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
- worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
// now handle the task restart
@@ -494,7 +543,7 @@ public class DistributedHerderTest {
worker.stopAndAwaitTask(TASK0);
PowerMock.expectLastCall();
- worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
PowerMock.replayAll();
@@ -637,7 +686,8 @@ public class DistributedHerderTest {
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -646,7 +696,7 @@ public class DistributedHerderTest {
PowerMock.replayAll();
herder.tick(); // join
- connectorConfigCallback.onCompletion(null, CONN1); // read updated config
+ configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config
herder.tick(); // apply config
herder.tick(); // do rebalance
@@ -654,7 +704,7 @@ public class DistributedHerderTest {
}
@Test
- public void testConnectorConfigUpdate() {
+ public void testConnectorConfigUpdate() throws Exception {
// Connector config can be applied without any rebalance
EasyMock.expect(member.memberId()).andStubReturn("member");
@@ -664,8 +714,9 @@ public class DistributedHerderTest {
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -678,8 +729,9 @@ public class DistributedHerderTest {
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -687,7 +739,7 @@ public class DistributedHerderTest {
PowerMock.replayAll();
herder.tick(); // join
- connectorConfigCallback.onCompletion(null, CONN1); // read updated config
+ configUpdateListener.onConnectorConfigUpdate(CONN1); // read updated config
herder.tick(); // apply config
PowerMock.verifyAll();
@@ -715,7 +767,7 @@ public class DistributedHerderTest {
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
Arrays.asList(TASK0));
- worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -723,7 +775,7 @@ public class DistributedHerderTest {
PowerMock.replayAll();
herder.tick(); // join
- taskConfigCallback.onCompletion(null, Arrays.asList(TASK0, TASK1, TASK2)); // read updated config
+ configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK0, TASK1, TASK2)); // read updated config
herder.tick(); // apply config
herder.tick(); // do rebalance
@@ -738,9 +790,8 @@ public class DistributedHerderTest {
ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
Collections.<ConnectorTaskId>emptyList());
// Reading to end of log times out
- TestFuture<Void> readToEndFuture = new TestFuture<>();
- readToEndFuture.resolveOnGet(new TimeoutException());
- EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+ configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+ EasyMock.expectLastCall().andThrow(new TimeoutException());
member.maybeLeaveGroup();
EasyMock.expectLastCall();
PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
@@ -751,11 +802,12 @@ public class DistributedHerderTest {
expectPostRebalanceCatchup(SNAPSHOT);
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -816,8 +868,9 @@ public class DistributedHerderTest {
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
// list connectors, get connector info, get connector config, get task configs
@@ -834,7 +887,7 @@ public class DistributedHerderTest {
@Override
public Object answer() throws Throwable {
// Simulate response to writing config + waiting until end of log to be read
- connectorConfigCallback.onCompletion(null, CONN1);
+ configUpdateListener.onConnectorConfigUpdate(CONN1);
return null;
}
});
@@ -845,8 +898,9 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -943,10 +997,9 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
}
- private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) {
- TestFuture<Void> readToEndFuture = new TestFuture<>();
- readToEndFuture.resolveOnGet((Void) null);
- EasyMock.expect(configStorage.readToEnd()).andReturn(readToEndFuture);
+ private void expectPostRebalanceCatchup(final ClusterConfigState readToEndSnapshot) throws TimeoutException {
+ configStorage.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+ EasyMock.expectLastCall();
EasyMock.expect(configStorage.snapshot()).andReturn(readToEndSnapshot);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index bf33cb3..f7423ec 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -31,7 +31,8 @@ import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
@@ -76,7 +77,7 @@ public class WorkerCoordinatorTest {
private Metrics metrics;
private ConsumerNetworkClient consumerClient;
private MockRebalanceListener rebalanceListener;
- @Mock private KafkaConfigStorage configStorage;
+ @Mock private KafkaConfigBackingStore configStorage;
private WorkerCoordinator coordinator;
private ClusterConfigState configState1;
@@ -91,7 +92,7 @@ public class WorkerCoordinatorTest {
this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000);
this.metrics = new Metrics(time);
this.rebalanceListener = new MockRebalanceListener();
- this.configStorage = PowerMock.createMock(KafkaConfigStorage.class);
+ this.configStorage = PowerMock.createMock(KafkaConfigBackingStore.class);
client.setNode(node);
@@ -110,6 +111,7 @@ public class WorkerCoordinatorTest {
configState1 = new ClusterConfigState(
1L, Collections.singletonMap(connectorId, 1),
Collections.singletonMap(connectorId, (Map<String, String>) new HashMap<String, String>()),
+ Collections.singletonMap(connectorId, TargetState.STARTED),
Collections.singletonMap(taskId0, (Map<String, String>) new HashMap<String, String>()),
Collections.<String>emptySet()
);
@@ -119,6 +121,9 @@ public class WorkerCoordinatorTest {
Map<String, Map<String, String>> configState2ConnectorConfigs = new HashMap<>();
configState2ConnectorConfigs.put(connectorId, new HashMap<String, String>());
configState2ConnectorConfigs.put(connectorId2, new HashMap<String, String>());
+ Map<String, TargetState> targetStates = new HashMap<>();
+ targetStates.put(connectorId, TargetState.STARTED);
+ targetStates.put(connectorId2, TargetState.STARTED);
Map<ConnectorTaskId, Map<String, String>> configState2TaskConfigs = new HashMap<>();
configState2TaskConfigs.put(taskId0, new HashMap<String, String>());
configState2TaskConfigs.put(taskId1, new HashMap<String, String>());
@@ -126,6 +131,7 @@ public class WorkerCoordinatorTest {
configState2 = new ClusterConfigState(
2L, configState2ConnectorTaskCounts,
configState2ConnectorConfigs,
+ targetStates,
configState2TaskConfigs,
Collections.<String>emptySet()
);
http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 05a64a1..10e5194 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
+import org.apache.kafka.connect.runtime.TargetState;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
@@ -36,6 +37,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.MemoryConfigBackingStore;
import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -80,7 +82,7 @@ public class StandaloneHerderTest {
@Before
public void setup() {
- herder = new StandaloneHerder(WORKER_ID, worker, statusBackingStore);
+ herder = new StandaloneHerder(worker, WORKER_ID, statusBackingStore, new MemoryConfigBackingStore());
}
@Test
@@ -163,7 +165,7 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall();
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
- EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder));
+ EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
PowerMock.replayAll();
@@ -212,7 +214,7 @@ public class StandaloneHerderTest {
RuntimeException e = new RuntimeException();
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class))),
- EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder));
+ EasyMock.anyObject(HerderConnectorContext.class), EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall().andThrow(e);
PowerMock.replayAll();
@@ -240,7 +242,7 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall();
Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
- worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder);
+ worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
EasyMock.expectLastCall();
PowerMock.replayAll();
@@ -290,7 +292,7 @@ public class StandaloneHerderTest {
RuntimeException e = new RuntimeException();
Map<String, String> generatedTaskProps = taskConfig(BogusSourceTask.class, false);
- worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder);
+ worker.startTask(taskId, new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
EasyMock.expectLastCall().andThrow(e);
PowerMock.replayAll();
@@ -316,6 +318,11 @@ public class StandaloneHerderTest {
// herder.stop() should stop any running connectors and tasks even if destroyConnector was not invoked
expectStop();
+ statusBackingStore.stop();
+ EasyMock.expectLastCall();
+ worker.stop();
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
@@ -334,7 +341,7 @@ public class StandaloneHerderTest {
Callback<List<TaskInfo>> taskConfigsCb = PowerMock.createMock(Callback.class);
// Check accessors with empty worker
- listConnectorsCb.onCompletion(null, Collections.EMPTY_LIST);
+ listConnectorsCb.onCompletion(null, Collections.EMPTY_SET);
EasyMock.expectLastCall();
connectorInfoCb.onCompletion(EasyMock.<NotFoundException>anyObject(), EasyMock.<ConnectorInfo>isNull());
EasyMock.expectLastCall();
@@ -349,7 +356,7 @@ public class StandaloneHerderTest {
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
// Validate accessors with 1 connector
- listConnectorsCb.onCompletion(null, Arrays.asList(CONNECTOR_NAME));
+ listConnectorsCb.onCompletion(null, Collections.singleton(CONNECTOR_NAME));
EasyMock.expectLastCall();
ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connConfig, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
connectorInfoCb.onCompletion(null, connInfo);
@@ -399,8 +406,9 @@ public class StandaloneHerderTest {
EasyMock.expectLastCall();
Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(CONNECTOR_NAME)).andReturn(true);
// Generate same task config, which should result in no additional action to restart tasks
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
.andReturn(Collections.singletonList(taskConfig(BogusSourceTask.class, false)));
@@ -411,7 +419,6 @@ public class StandaloneHerderTest {
connectorConfigCb.onCompletion(null, newConnConfig);
EasyMock.expectLastCall();
-
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connConfig, false, createCallback);
@@ -444,8 +451,9 @@ public class StandaloneHerderTest {
Map<String, String> connectorProps = connectorConfig(name, connClass);
worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
- EasyMock.eq(herder));
+ EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
EasyMock.expectLastCall();
+ EasyMock.expect(worker.isRunning(name)).andReturn(true);
ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
@@ -457,15 +465,15 @@ public class StandaloneHerderTest {
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
.andReturn(Collections.singletonList(generatedTaskProps));
- worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder);
+ worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder, TargetState.STARTED);
EasyMock.expectLastCall();
}
private void expectStop() {
ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
- worker.stopTasks(Collections.singleton(task));
+ worker.stopTasks(Collections.singletonList(task));
EasyMock.expectLastCall();
- worker.awaitStopTasks(Collections.singleton(task));
+ worker.awaitStopTasks(Collections.singletonList(task));
EasyMock.expectLastCall();
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
@@ -475,7 +483,6 @@ public class StandaloneHerderTest {
expectStop();
}
-
private static HashMap<String, String> connectorConfig(String name, Class<? extends Connector> connClass) {
HashMap<String, String> connectorProps = new HashMap<>();
connectorProps.put(ConnectorConfig.NAME_CONFIG, name);