You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 07:47:48 UTC
[1/3] kafka git commit: KAFKA-3093: Add Connect status tracking API
Repository: kafka
Updated Branches:
refs/heads/trunk aeb9c2adc -> f7d019ed4
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 3eab095..abb62b9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -426,7 +426,7 @@ public class WorkerCoordinatorTest {
public int assignedCount = 0;
@Override
- public void onAssigned(ConnectProtocol.Assignment assignment) {
+ public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
this.assignment = assignment;
assignedCount++;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
index 8ef1d24..07d0e3d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerderTest.java
@@ -22,10 +22,13 @@ import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
+import org.apache.kafka.connect.runtime.TaskStatus;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
@@ -33,6 +36,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
@@ -54,8 +58,8 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@RunWith(PowerMockRunner.class)
@@ -64,22 +68,25 @@ public class StandaloneHerderTest {
private static final List<String> TOPICS_LIST = Arrays.asList("topic1", "topic2");
private static final String TOPICS_LIST_STR = "topic1,topic2";
private static final int DEFAULT_MAX_TASKS = 1;
+ private static final String WORKER_ID = "localhost:8083";
private StandaloneHerder herder;
- @Mock protected Worker worker;
+
private Connector connector;
+ @Mock protected Worker worker;
@Mock protected Callback<Herder.Created<ConnectorInfo>> createCallback;
+ @Mock protected StatusBackingStore statusBackingStore;
@Before
public void setup() {
- worker = PowerMock.createMock(Worker.class);
- herder = new StandaloneHerder(worker);
+ herder = new StandaloneHerder(WORKER_ID, worker, statusBackingStore);
}
@Test
public void testCreateSourceConnector() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
PowerMock.replayAll();
herder.putConnectorConfig(CONNECTOR_NAME, connectorConfig(CONNECTOR_NAME, BogusSourceConnector.class), false, createCallback);
@@ -121,6 +128,10 @@ public class StandaloneHerderTest {
public void testDestroyConnector() throws Exception {
connector = PowerMock.createMock(BogusSourceConnector.class);
expectAdd(CONNECTOR_NAME, BogusSourceConnector.class, BogusSourceTask.class, false);
+
+ EasyMock.expect(statusBackingStore.getAll(CONNECTOR_NAME)).andReturn(Collections.<TaskStatus>emptyList());
+ statusBackingStore.put(new ConnectorStatus(CONNECTOR_NAME, AbstractStatus.State.DESTROYED, WORKER_ID, 0));
+
expectDestroy();
PowerMock.replayAll();
@@ -232,7 +243,8 @@ public class StandaloneHerderTest {
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
Capture<ConnectorConfig> capturedConfig = EasyMock.newCapture();
- worker.addConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.capture(capturedConfig), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
EasyMock.expectLastCall();
// Generate same task config, which should result in no additional action to restart tasks
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
@@ -270,16 +282,19 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
- private void expectAdd(String name, Class<? extends Connector> connClass, Class<? extends Task> taskClass,
+ private void expectAdd(String name,
+ Class<? extends Connector> connClass,
+ Class<? extends Task> taskClass,
boolean sink) throws Exception {
Map<String, String> connectorProps = connectorConfig(name, connClass);
- worker.addConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class));
- PowerMock.expectLastCall();
+ worker.startConnector(EasyMock.eq(new ConnectorConfig(connectorProps)), EasyMock.anyObject(HerderConnectorContext.class),
+ EasyMock.eq(herder));
+ EasyMock.expectLastCall();
ConnectorInfo connInfo = new ConnectorInfo(CONNECTOR_NAME, connectorProps, Arrays.asList(new ConnectorTaskId(CONNECTOR_NAME, 0)));
createCallback.onCompletion(null, new Herder.Created<>(true, connInfo));
- PowerMock.expectLastCall();
+ EasyMock.expectLastCall();
// And we should instantiate the tasks. For a sink task, we should see added properties for
// the input topic partitions
@@ -287,12 +302,15 @@ public class StandaloneHerderTest {
EasyMock.expect(worker.connectorTaskConfigs(CONNECTOR_NAME, DEFAULT_MAX_TASKS, TOPICS_LIST))
.andReturn(Collections.singletonList(generatedTaskProps));
- worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps));
- PowerMock.expectLastCall();
+ worker.startTask(new ConnectorTaskId(CONNECTOR_NAME, 0), new TaskConfig(generatedTaskProps), herder);
+ EasyMock.expectLastCall();
}
private void expectStop() {
- worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
+ ConnectorTaskId task = new ConnectorTaskId(CONNECTOR_NAME, 0);
+ worker.stopTasks(Collections.singleton(task));
+ EasyMock.expectLastCall();
+ worker.awaitStopTasks(Collections.singleton(task));
EasyMock.expectLastCall();
worker.stopConnector(CONNECTOR_NAME);
EasyMock.expectLastCall();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
new file mode 100644
index 0000000..cdbab64
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaStatusBackingStoreTest.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.easymock.Capture;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.eq;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
+import static org.easymock.EasyMock.newCapture;
+import static org.junit.Assert.assertEquals;
+
+public class KafkaStatusBackingStoreTest extends EasyMockSupport {
+
+ private static final String STATUS_TOPIC = "status-topic";
+ private static final String WORKER_ID = "localhost:8083";
+ private static final String CONNECTOR = "conn";
+ private static final ConnectorTaskId TASK = new ConnectorTaskId(CONNECTOR, 0);
+
+ @Test
+ public void putConnectorState() {
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ byte[] value = new byte[0];
+ expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+ .andStubReturn(value);
+
+ final Capture<Callback> callbackCapture = newCapture();
+ kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, null);
+ return null;
+ }
+ });
+ replayAll();
+
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(status);
+
+ // state is not visible until read back from the log
+ assertEquals(null, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putConnectorStateRetriableFailure() {
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ byte[] value = new byte[0];
+ expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+ .andStubReturn(value);
+
+ final Capture<Callback> callbackCapture = newCapture();
+ kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, new TimeoutException());
+ return null;
+ }
+ })
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, null);
+ return null;
+ }
+ });
+ replayAll();
+
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(status);
+
+ // state is not visible until read back from the log
+ assertEquals(null, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putConnectorStateNonRetriableFailure() {
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ byte[] value = new byte[0];
+ expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+ .andStubReturn(value);
+
+ final Capture<Callback> callbackCapture = newCapture();
+ kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, new UnknownServerException());
+ return null;
+ }
+ });
+ replayAll();
+
+ // the error is logged and ignored
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(status);
+
+ // state is not visible until read back from the log
+ assertEquals(null, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putSafeConnectorIgnoresStaleStatus() {
+ byte[] value = new byte[0];
+ String otherWorkerId = "anotherhost:8083";
+
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ // the persisted came from a different host and has a newer generation
+ Map<String, Object> statusMap = new HashMap<>();
+ statusMap.put("worker_id", otherWorkerId);
+ statusMap.put("state", "RUNNING");
+ statusMap.put("generation", 1L);
+
+ expect(converter.toConnectData(STATUS_TOPIC, value))
+ .andReturn(new SchemaAndValue(null, statusMap));
+
+ // we're verifying that there is no call to KafkaBasedLog.send
+
+ replayAll();
+
+ store.read(consumerRecord(0, "status-connector-conn", value));
+ store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0));
+
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, otherWorkerId, 1);
+ assertEquals(status, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putSafeOverridesValueSetBySameWorker() {
+ final byte[] value = new byte[0];
+
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ // the persisted came from the same host, but has a newer generation
+ Map<String, Object> firstStatusRead = new HashMap<>();
+ firstStatusRead.put("worker_id", WORKER_ID);
+ firstStatusRead.put("state", "RUNNING");
+ firstStatusRead.put("generation", 1L);
+
+ Map<String, Object> secondStatusRead = new HashMap<>();
+ secondStatusRead.put("worker_id", WORKER_ID);
+ secondStatusRead.put("state", "UNASSIGNED");
+ secondStatusRead.put("generation", 0L);
+
+ expect(converter.toConnectData(STATUS_TOPIC, value))
+ .andReturn(new SchemaAndValue(null, firstStatusRead))
+ .andReturn(new SchemaAndValue(null, secondStatusRead));
+
+ expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+ .andStubReturn(value);
+
+ final Capture<Callback> callbackCapture = newCapture();
+ kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, null);
+ store.read(consumerRecord(1, "status-connector-conn", value));
+ return null;
+ }
+ });
+
+ replayAll();
+
+ store.read(consumerRecord(0, "status-connector-conn", value));
+ store.putSafe(new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0));
+
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0);
+ assertEquals(status, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putConnectorStateShouldOverride() {
+ final byte[] value = new byte[0];
+ String otherWorkerId = "anotherhost:8083";
+
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ final KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ // the persisted came from a different host and has a newer generation
+ Map<String, Object> firstStatusRead = new HashMap<>();
+ firstStatusRead.put("worker_id", otherWorkerId);
+ firstStatusRead.put("state", "RUNNING");
+ firstStatusRead.put("generation", 1L);
+
+ Map<String, Object> secondStatusRead = new HashMap<>();
+ secondStatusRead.put("worker_id", WORKER_ID);
+ secondStatusRead.put("state", "UNASSIGNED");
+ secondStatusRead.put("generation", 0L);
+
+ expect(converter.toConnectData(STATUS_TOPIC, value))
+ .andReturn(new SchemaAndValue(null, firstStatusRead))
+ .andReturn(new SchemaAndValue(null, secondStatusRead));
+
+ expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+ .andStubReturn(value);
+
+ final Capture<Callback> callbackCapture = newCapture();
+ kafkaBasedLog.send(eq("status-connector-conn"), eq(value), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, null);
+ store.read(consumerRecord(1, "status-connector-conn", value));
+ return null;
+ }
+ });
+ replayAll();
+
+ store.read(consumerRecord(0, "status-connector-conn", value));
+
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.UNASSIGNED, WORKER_ID, 0);
+ store.put(status);
+ assertEquals(status, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void readConnectorState() {
+ byte[] value = new byte[0];
+
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ Map<String, Object> statusMap = new HashMap<>();
+ statusMap.put("worker_id", WORKER_ID);
+ statusMap.put("state", "RUNNING");
+ statusMap.put("generation", 0L);
+
+ expect(converter.toConnectData(STATUS_TOPIC, value))
+ .andReturn(new SchemaAndValue(null, statusMap));
+
+ replayAll();
+
+ store.read(consumerRecord(0, "status-connector-conn", value));
+
+ ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
+ assertEquals(status, store.get(CONNECTOR));
+
+ verifyAll();
+ }
+
+ @Test
+ public void putTaskState() {
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ byte[] value = new byte[0];
+ expect(converter.fromConnectData(eq(STATUS_TOPIC), anyObject(Schema.class), anyObject(Struct.class)))
+ .andStubReturn(value);
+
+ final Capture<Callback> callbackCapture = newCapture();
+ kafkaBasedLog.send(eq("status-task-conn-0"), eq(value), capture(callbackCapture));
+ expectLastCall()
+ .andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ callbackCapture.getValue().onCompletion(null, null);
+ return null;
+ }
+ });
+ replayAll();
+
+ TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+ store.put(status);
+
+ // state is not visible until read back from the log
+ assertEquals(null, store.get(TASK));
+
+ verifyAll();
+ }
+
+ @Test
+ public void readTaskState() {
+ byte[] value = new byte[0];
+
+ KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
+ Converter converter = mock(Converter.class);
+ KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);
+
+ Map<String, Object> statusMap = new HashMap<>();
+ statusMap.put("worker_id", WORKER_ID);
+ statusMap.put("state", "RUNNING");
+ statusMap.put("generation", 0L);
+
+ expect(converter.toConnectData(STATUS_TOPIC, value))
+ .andReturn(new SchemaAndValue(null, statusMap));
+
+ replayAll();
+
+ store.read(consumerRecord(0, "status-task-conn-0", value));
+
+ TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
+ assertEquals(status, store.get(TASK));
+
+ verifyAll();
+ }
+
+ private static ConsumerRecord<String, byte[]> consumerRecord(long offset, String key, byte[] value) {
+ return new ConsumerRecord<>(STATUS_TOPIC, 0, offset, System.currentTimeMillis(),
+ TimestampType.CREATE_TIME, key, value);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
new file mode 100644
index 0000000..40aee37
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/MemoryStatusBackingStoreTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class MemoryStatusBackingStoreTest {
+
+ @Test
+ public void putAndGetConnectorStatus() {
+ MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+ ConnectorStatus status = new ConnectorStatus("connector", ConnectorStatus.State.RUNNING, "localhost:8083", 0);
+ store.put(status);
+ assertEquals(status, store.get("connector"));
+ }
+
+ @Test
+ public void putAndGetTaskStatus() {
+ MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+ ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+ TaskStatus status = new TaskStatus(taskId, ConnectorStatus.State.RUNNING, "localhost:8083", 0);
+ store.put(status);
+ assertEquals(status, store.get(taskId));
+ assertEquals(Collections.singleton(status), store.getAll("connector"));
+ }
+
+ @Test
+ public void deleteConnectorStatus() {
+ MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+ store.put(new ConnectorStatus("connector", ConnectorStatus.State.RUNNING, "localhost:8083", 0));
+ store.put(new ConnectorStatus("connector", ConnectorStatus.State.DESTROYED, "localhost:8083", 0));
+ assertNull(store.get("connector"));
+ }
+
+ @Test
+ public void deleteTaskStatus() {
+ MemoryStatusBackingStore store = new MemoryStatusBackingStore();
+ ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+ store.put(new TaskStatus(taskId, ConnectorStatus.State.RUNNING, "localhost:8083", 0));
+ store.put(new TaskStatus(taskId, ConnectorStatus.State.DESTROYED, "localhost:8083", 0));
+ assertNull(store.get(taskId));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
new file mode 100644
index 0000000..ee266b5
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TableTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class TableTest {
+
+ @Test
+ public void basicOperations() {
+ Table<String, Integer, String> table = new Table<>();
+ table.put("foo", 5, "bar");
+ table.put("foo", 6, "baz");
+ assertEquals("bar", table.get("foo", 5));
+ assertEquals("baz", table.get("foo", 6));
+
+ Map<Integer, String> row = table.row("foo");
+ assertEquals("bar", row.get(5));
+ assertEquals("baz", row.get(6));
+
+ assertEquals("bar", table.remove("foo", 5));
+ assertNull(table.get("foo", 5));
+ assertEquals("baz", table.remove("foo", 6));
+ assertNull(table.get("foo", 6));
+ assertTrue(table.row("foo").isEmpty());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/services/connect.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/services/connect.py b/tests/kafkatest/services/connect.py
index a6e902f..76336e1 100644
--- a/tests/kafkatest/services/connect.py
+++ b/tests/kafkatest/services/connect.py
@@ -181,10 +181,12 @@ class ConnectStandaloneService(ConnectServiceBase):
class ConnectDistributedService(ConnectServiceBase):
"""Runs Kafka Connect in distributed mode."""
- def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets", configs_topic="connect-configs"):
+ def __init__(self, context, num_nodes, kafka, files, offsets_topic="connect-offsets",
+ configs_topic="connect-configs", status_topic="connect-status"):
super(ConnectDistributedService, self).__init__(context, num_nodes, kafka, files)
self.offsets_topic = offsets_topic
self.configs_topic = configs_topic
+ self.status_topic = status_topic
def start_cmd(self, node):
cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % self.LOG4J_CONFIG_FILE
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/connect_distributed_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_distributed_test.py b/tests/kafkatest/tests/connect_distributed_test.py
index 1f82e63..9aa16ab 100644
--- a/tests/kafkatest/tests/connect_distributed_test.py
+++ b/tests/kafkatest/tests/connect_distributed_test.py
@@ -33,6 +33,7 @@ class ConnectDistributedTest(KafkaTest):
TOPIC = "test"
OFFSETS_TOPIC = "connect-offsets"
CONFIG_TOPIC = "connect-configs"
+ STATUS_TOPIC = "connect-status"
# Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
# across all nodes.
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/connect_rest_test.py
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/connect_rest_test.py b/tests/kafkatest/tests/connect_rest_test.py
index 8e713d4..69a8cb7 100644
--- a/tests/kafkatest/tests/connect_rest_test.py
+++ b/tests/kafkatest/tests/connect_rest_test.py
@@ -30,6 +30,7 @@ class ConnectRestApiTest(KafkaTest):
TOPIC = "test"
OFFSETS_TOPIC = "connect-offsets"
CONFIG_TOPIC = "connect-configs"
+ STATUS_TOPIC = "connect-status"
# Since tasks can be assigned to any node and we're testing with files, we need to make sure the content is the same
# across all nodes.
@@ -160,4 +161,5 @@ class ConnectRestApiTest(KafkaTest):
return []
def _config_dict_from_props(self, connector_props):
- return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
\ No newline at end of file
+ return dict([line.strip().split('=', 1) for line in connector_props.split('\n') if line.strip() and not line.strip().startswith('#')])
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/tests/kafkatest/tests/templates/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/tests/kafkatest/tests/templates/connect-distributed.properties b/tests/kafkatest/tests/templates/connect-distributed.properties
index 8a9f6c7..7a7440a 100644
--- a/tests/kafkatest/tests/templates/connect-distributed.properties
+++ b/tests/kafkatest/tests/templates/connect-distributed.properties
@@ -33,6 +33,7 @@ internal.value.converter.schemas.enable=false
offset.storage.topic={{ OFFSETS_TOPIC }}
config.storage.topic={{ CONFIG_TOPIC }}
+status.storage.topic={{ STATUS_TOPIC }}
# Make sure data gets flushed frequently so tests don't have to wait to ensure they see data in output systems
offset.flush.interval.ms=5000
[3/3] kafka git commit: KAFKA-3093: Add Connect status tracking API
Posted by ew...@apache.org.
KAFKA-3093: Add Connect status tracking API
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>
Closes #920 from hachikuji/KAFKA-3093
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f7d019ed
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f7d019ed
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f7d019ed
Branch: refs/heads/trunk
Commit: f7d019ed408fa988129be9af3689bfa4878bc627
Parents: aeb9c2a
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Feb 23 22:47:31 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 23 22:47:31 2016 -0800
----------------------------------------------------------------------
config/connect-distributed.properties | 3 +-
.../connect/connector/ConnectorContext.java | 7 +
.../kafka/connect/file/FileStreamSinkTask.java | 2 +-
.../kafka/connect/cli/ConnectDistributed.java | 28 +-
.../kafka/connect/cli/ConnectStandalone.java | 13 +-
.../kafka/connect/runtime/AbstractHerder.java | 156 +++++++
.../kafka/connect/runtime/AbstractStatus.java | 100 ++++
.../kafka/connect/runtime/ConnectorStatus.java | 58 +++
.../apache/kafka/connect/runtime/Herder.java | 16 +-
.../connect/runtime/HerderConnectorContext.java | 11 +-
.../kafka/connect/runtime/TaskStatus.java | 53 +++
.../apache/kafka/connect/runtime/Worker.java | 210 +++++----
.../kafka/connect/runtime/WorkerSinkTask.java | 4 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 18 +-
.../kafka/connect/runtime/WorkerTask.java | 47 +-
.../runtime/distributed/DistributedHerder.java | 107 +++--
.../runtime/distributed/WorkerCoordinator.java | 2 +-
.../runtime/distributed/WorkerGroupMember.java | 9 +-
.../distributed/WorkerRebalanceListener.java | 2 +-
.../kafka/connect/runtime/rest/RestServer.java | 9 +-
.../rest/entities/ConnectorStateInfo.java | 108 +++++
.../rest/resources/ConnectorsResource.java | 21 +-
.../runtime/standalone/StandaloneHerder.java | 51 +-
.../connect/storage/KafkaConfigStorage.java | 15 +-
.../storage/KafkaOffsetBackingStore.java | 13 +-
.../storage/KafkaStatusBackingStore.java | 461 +++++++++++++++++++
.../storage/MemoryStatusBackingStore.java | 105 +++++
.../connect/storage/StatusBackingStore.java | 100 ++++
.../kafka/connect/util/KafkaBasedLog.java | 24 +-
.../org/apache/kafka/connect/util/Table.java | 65 +++
.../connect/runtime/AbstractHerderTest.java | 116 +++++
.../connect/runtime/WorkerSinkTaskTest.java | 4 +-
.../runtime/WorkerSinkTaskThreadedTest.java | 6 +-
.../connect/runtime/WorkerSourceTaskTest.java | 54 ++-
.../kafka/connect/runtime/WorkerTaskTest.java | 88 +++-
.../kafka/connect/runtime/WorkerTest.java | 161 ++++---
.../distributed/DistributedHerderTest.java | 121 ++++-
.../distributed/WorkerCoordinatorTest.java | 2 +-
.../standalone/StandaloneHerderTest.java | 42 +-
.../storage/KafkaStatusBackingStoreTest.java | 373 +++++++++++++++
.../storage/MemoryStatusBackingStoreTest.java | 66 +++
.../apache/kafka/connect/util/TableTest.java | 48 ++
tests/kafkatest/services/connect.py | 4 +-
.../kafkatest/tests/connect_distributed_test.py | 1 +
tests/kafkatest/tests/connect_rest_test.py | 4 +-
.../templates/connect-distributed.properties | 1 +
46 files changed, 2611 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/config/connect-distributed.properties
----------------------------------------------------------------------
diff --git a/config/connect-distributed.properties b/config/connect-distributed.properties
index 9ec63db..46bd3bc 100644
--- a/config/connect-distributed.properties
+++ b/config/connect-distributed.properties
@@ -39,4 +39,5 @@ internal.value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
-config.storage.topic=connect-configs
\ No newline at end of file
+config.storage.topic=connect-configs
+status.storage.topic=connect-status
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
index 2a06484..c8a06e8 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectorContext.java
@@ -30,4 +30,11 @@ public interface ConnectorContext {
* added/removed) and the running Tasks will need to be modified.
*/
void requestTaskReconfiguration();
+
+ /**
+ * Raise an unrecoverable exception to the Connect framework. This will cause the status of the
+ * connector to transition to FAILED.
+ * @param e Exception to be raised.
+ */
+ void raiseError(Exception e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
index 83ba6d4..09d4ed8 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkTask.java
@@ -84,7 +84,7 @@ public class FileStreamSinkTask extends SinkTask {
@Override
public void stop() {
- if (outputStream != System.out)
+ if (outputStream != null && outputStream != System.out)
outputStream.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 5ad032e..bc5b75a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -18,6 +18,8 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.Connect;
import org.apache.kafka.connect.runtime.Worker;
@@ -25,9 +27,12 @@ import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
import java.util.Collections;
import java.util.Map;
@@ -54,12 +59,29 @@ public class ConnectDistributed {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+ Time time = new SystemTime();
DistributedConfig config = new DistributedConfig(workerProps);
- Worker worker = new Worker(config, new KafkaOffsetBackingStore());
+
RestServer rest = new RestServer(config);
- DistributedHerder herder = new DistributedHerder(config, worker, rest.advertisedUrl());
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+ KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+ offsetBackingStore.configure(config.originals());
+
+ Worker worker = new Worker(workerId, time, config, offsetBackingStore);
+
+ StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
+ statusBackingStore.configure(config.originals());
+
+ DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
final Connect connect = new Connect(worker, herder, rest);
- connect.start();
+ try {
+ connect.start();
+ } catch (Exception e) {
+ log.error("Failed to start Connect", e);
+ connect.stop();
+ }
// Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
connect.awaitStop();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index f89a72a..6c4335e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -18,9 +18,11 @@
package org.apache.kafka.connect.cli;
import org.apache.kafka.common.annotation.InterfaceStability;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.RestServer;
@@ -33,6 +35,7 @@ import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -63,9 +66,15 @@ public class ConnectStandalone {
Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
+ Time time = new SystemTime();
StandaloneConfig config = new StandaloneConfig(workerProps);
- Worker worker = new Worker(config, new FileOffsetBackingStore());
+
RestServer rest = new RestServer(config);
+ URI advertisedUrl = rest.advertisedUrl();
+ String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+ Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
+
Herder herder = new StandaloneHerder(worker);
final Connect connect = new Connect(worker, herder, rest);
connect.start();
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
new file mode 100644
index 0000000..ca85d87
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions
+ * must invoke the lifecycle hooks appropriately.
+ *
+ * This class takes the following approach for sending status updates to the backing store:
+ *
+ * 1) When the connector or task is starting, we overwrite the previous state blindly. This ensures that
+ * every rebalance will reset the state of tasks to the proper state. The intuition is that there should
+ * be less chance of write conflicts when the worker has just received its assignment and is starting tasks.
+ * In particular, this prevents us from depending on the generation absolutely. If the group disappears
+ * and the generation is reset, then we'll overwrite the status information with the older (and larger)
+ * generation with the updated one. The danger of this approach is that slow starting tasks may cause the
+ * status to be overwritten after a rebalance has completed.
+ *
+ * 2) If the connector or task fails or is shutdown, we use {@link StatusBackingStore#putSafe(ConnectorStatus)},
+ * which provides a little more protection if the worker is no longer in the group (in which case the
+ * task may have already been started on another worker). Obviously this is still racy. If the task has just
+ * started on another worker, we may not have the updated status cached yet. In this case, we'll overwrite
+ * the value which will cause the state to be inconsistent (most likely until the next rebalance). Until
+ * we have proper producer groups with fenced groups, there is not much else we can do.
+ */
+public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
+
+ protected final StatusBackingStore statusBackingStore;
+ private final String workerId;
+
+ public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) {
+ this.statusBackingStore = statusBackingStore;
+ this.workerId = workerId;
+ }
+
+ protected abstract int generation();
+
+ protected void startServices() {
+ this.statusBackingStore.start();
+ }
+
+ protected void stopServices() {
+ this.statusBackingStore.stop();
+ }
+
+ @Override
+ public void onStartup(String connector) {
+ statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.RUNNING,
+ workerId, generation()));
+ }
+
+ @Override
+ public void onShutdown(String connector) {
+ statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
+ workerId, generation()));
+ }
+
+ @Override
+ public void onFailure(String connector, Throwable cause) {
+ statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.FAILED,
+ trace(cause), workerId, generation()));
+ }
+
+ @Override
+ public void onStartup(ConnectorTaskId id) {
+ statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
+ }
+
+ @Override
+ public void onFailure(ConnectorTaskId id, Throwable cause) {
+ statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.FAILED, workerId, generation(), trace(cause)));
+ }
+
+ @Override
+ public void onShutdown(ConnectorTaskId id) {
+ statusBackingStore.putSafe(new TaskStatus(id, TaskStatus.State.UNASSIGNED, workerId, generation()));
+ }
+
+ @Override
+ public void onDeletion(String connector) {
+ for (TaskStatus status : statusBackingStore.getAll(connector))
+ statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
+ statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.DESTROYED, workerId, generation()));
+ }
+
+ @Override
+ public ConnectorStateInfo connectorStatus(String connName) {
+ ConnectorStatus connector = statusBackingStore.get(connName);
+ if (connector == null)
+ throw new NotFoundException("No status found for connector " + connName);
+
+ Collection<TaskStatus> tasks = statusBackingStore.getAll(connName);
+
+ ConnectorStateInfo.ConnectorState connectorState = new ConnectorStateInfo.ConnectorState(
+ connector.state().toString(), connector.workerId(), connector.trace());
+ List<ConnectorStateInfo.TaskState> taskStates = new ArrayList<>();
+
+ for (TaskStatus status : tasks) {
+ taskStates.add(new ConnectorStateInfo.TaskState(status.id().task(),
+ status.state().toString(), status.workerId(), status.trace()));
+ }
+
+ Collections.sort(taskStates);
+
+ return new ConnectorStateInfo(connName, connectorState, taskStates);
+ }
+
+ @Override
+ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
+ TaskStatus status = statusBackingStore.get(id);
+
+ if (status == null)
+ throw new NotFoundException("No status found for task " + id);
+
+ return new ConnectorStateInfo.TaskState(id.task(), status.state().toString(),
+ status.workerId(), status.trace());
+ }
+
+ private String trace(Throwable t) {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ t.printStackTrace(new PrintStream(output));
+ try {
+ return output.toString("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
new file mode 100644
index 0000000..4f31be1
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public abstract class AbstractStatus<T> {
+
+ public enum State {
+ UNASSIGNED,
+ RUNNING,
+ FAILED,
+ DESTROYED,
+ }
+
+ private final T id;
+ private final State state;
+ private final String trace;
+ private final String workerId;
+ private final int generation;
+
+ public AbstractStatus(T id,
+ State state,
+ String workerId,
+ int generation,
+ String trace) {
+ this.id = id;
+ this.state = state;
+ this.workerId = workerId;
+ this.generation = generation;
+ this.trace = trace;
+ }
+
+ public T id() {
+ return id;
+ }
+
+ public State state() {
+ return state;
+ }
+
+ public String trace() {
+ return trace;
+ }
+
+ public String workerId() {
+ return workerId;
+ }
+
+ public int generation() {
+ return generation;
+ }
+
+ @Override
+ public String toString() {
+ return "Status{" +
+ "id=" + id +
+ ", state=" + state +
+ ", workerId='" + workerId + '\'' +
+ ", generation=" + generation +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ AbstractStatus<?> that = (AbstractStatus<?>) o;
+
+ if (generation != that.generation) return false;
+ if (id != null ? !id.equals(that.id) : that.id != null) return false;
+ if (state != that.state) return false;
+ if (trace != null ? !trace.equals(that.trace) : that.trace != null) return false;
+ return workerId != null ? workerId.equals(that.workerId) : that.workerId == null;
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = id != null ? id.hashCode() : 0;
+ result = 31 * result + (state != null ? state.hashCode() : 0);
+ result = 31 * result + (trace != null ? trace.hashCode() : 0);
+ result = 31 * result + (workerId != null ? workerId.hashCode() : 0);
+ result = 31 * result + generation;
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
new file mode 100644
index 0000000..d9a2eba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+public class ConnectorStatus extends AbstractStatus<String> {
+
+ public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) {
+ super(connector, state, workerUrl, generation, msg);
+ }
+
+ public ConnectorStatus(String connector, State state, String workerUrl, int generation) {
+ super(connector, state, workerUrl, generation, null);
+ }
+
+ public interface Listener {
+
+ /**
+ * Invoked after connector has successfully been shutdown.
+ * @param connector The connector name
+ */
+ void onShutdown(String connector);
+
+ /**
+ * Invoked from the Connector using {@link org.apache.kafka.connect.connector.ConnectorContext#raiseError(Exception)}.
+ * Note that no shutdown event will follow after the task has been failed.
+ * @param connector The connector name
+ * @param cause Error raised from the connector.
+ */
+ void onFailure(String connector, Throwable cause);
+
+ /**
+ * Invoked after successful startup of the connector.
+ * @param connector The connector name
+ */
+ void onStartup(String connector);
+
+ /**
+ * Invoked when the connector is deleted through the REST API.
+ * @param connector The connector name
+ */
+ void onDeletion(String connector);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index fc0689c..95c7700 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -18,8 +18,10 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import java.util.Collection;
import java.util.List;
@@ -61,7 +63,7 @@ public interface Herder {
* @throws org.apache.kafka.connect.runtime.distributed.NotLeaderException if this node can not resolve the request
* (e.g., because it has not joined the cluster or does not have configs in sync with the group) and it is
* also not the leader
- * @throws ConnectException if this node is the leader, but still cannot resolve the
+ * @throws org.apache.kafka.connect.errors.ConnectException if this node is the leader, but still cannot resolve the
* request (e.g., it is not in sync with other worker's config state)
*/
void connectors(Callback<Collection<String>> callback);
@@ -113,6 +115,18 @@ public interface Herder {
*/
void putTaskConfigs(String connName, List<Map<String, String>> configs, Callback<Void> callback);
+ /**
+ * Lookup the current status of a connector.
+ * @param connName name of the connector
+ */
+ ConnectorStateInfo connectorStatus(String connName);
+
+ /**
+ * Lookup the status of the a task.
+ * @param id id of the task
+ */
+ ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id);
+
class Created<T> {
private final boolean created;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
index e3294ef..bd933f1 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderConnectorContext.java
@@ -24,10 +24,10 @@ import org.apache.kafka.connect.connector.ConnectorContext;
*/
public class HerderConnectorContext implements ConnectorContext {
- private Herder herder;
- private String connectorName;
+ private final AbstractHerder herder;
+ private final String connectorName;
- public HerderConnectorContext(Herder herder, String connectorName) {
+ public HerderConnectorContext(AbstractHerder herder, String connectorName) {
this.herder = herder;
this.connectorName = connectorName;
}
@@ -38,4 +38,9 @@ public class HerderConnectorContext implements ConnectorContext {
// Distributed herder will forward the request to the leader if needed
herder.requestTaskReconfiguration(connectorName);
}
+
+ @Override
+ public void raiseError(Exception e) {
+ herder.onFailure(connectorName, e);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
new file mode 100644
index 0000000..3542eb8
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
+
+ public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation, String trace) {
+ super(id, state, workerUrl, generation, trace);
+ }
+
+ public TaskStatus(ConnectorTaskId id, State state, String workerUrl, int generation) {
+ super(id, state, workerUrl, generation, null);
+ }
+
+ public interface Listener {
+
+ /**
+ * Invoked after successful startup of the task.
+ * @param id The id of the task
+ */
+ void onStartup(ConnectorTaskId id);
+
+ /**
+ * Invoked if the task raises an error. No shutdown event will follow.
+ * @param id The id of the task
+ * @param cause The error raised by the task.
+ */
+ void onFailure(ConnectorTaskId id, Throwable cause);
+
+ /**
+ * Invoked after successful shutdown of the task.
+ * @param id The id of the task
+ */
+ void onShutdown(ConnectorTaskId id);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 0a4bb7f..8e74fec 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -20,7 +20,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
@@ -66,25 +65,25 @@ public class Worker {
private final ExecutorService executor;
private final Time time;
+ private final String workerId;
private final WorkerConfig config;
+ private final Converter keyConverter;
+ private final Converter valueConverter;
+ private final Converter internalKeyConverter;
+ private final Converter internalValueConverter;
+ private final OffsetBackingStore offsetBackingStore;
- private Converter keyConverter;
- private Converter valueConverter;
- private Converter internalKeyConverter;
- private Converter internalValueConverter;
- private OffsetBackingStore offsetBackingStore;
- private HashMap<String, Connector> connectors = new HashMap<>();
+ private HashMap<String, WorkerConnector> connectors = new HashMap<>();
private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
private KafkaProducer<byte[], byte[]> producer;
private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
- public Worker(WorkerConfig config, OffsetBackingStore offsetBackingStore) {
- this(new SystemTime(), config, offsetBackingStore);
- }
-
- @SuppressWarnings("unchecked")
- public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore) {
+ public Worker(String workerId,
+ Time time,
+ WorkerConfig config,
+ OffsetBackingStore offsetBackingStore) {
this.executor = Executors.newCachedThreadPool();
+ this.workerId = workerId;
this.time = time;
this.config = config;
this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
@@ -132,34 +131,18 @@ public class Worker {
long started = time.milliseconds();
long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
- for (Map.Entry<String, Connector> entry : connectors.entrySet()) {
- Connector conn = entry.getValue();
+ for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
+ WorkerConnector conn = entry.getValue();
log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
"Worker is stopped.", conn);
- try {
- conn.stop();
- } catch (ConnectException e) {
- log.error("Error while shutting down connector " + conn, e);
- }
+ conn.stop();
}
- for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
- WorkerTask task = entry.getValue();
- log.warn("Shutting down task {} uncleanly; herder should have shut down "
- + "tasks before the Worker is stopped.", task);
- try {
- task.stop();
- } catch (ConnectException e) {
- log.error("Error while shutting down task " + task, e);
- }
- }
-
- for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
- WorkerTask task = entry.getValue();
- log.debug("Waiting for task {} to finish shutting down", task);
- if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
- log.error("Graceful shutdown of task {} failed.", task);
- }
+ Collection<ConnectorTaskId> taskIds = tasks.keySet();
+ log.warn("Shutting down tasks {} uncleanly; herder should have shut down "
+ + "tasks before the Worker is stopped.", taskIds);
+ stopTasks(taskIds);
+ awaitStopTasks(taskIds);
long timeoutMs = limit - time.milliseconds();
sourceTaskOffsetCommitter.close(timeoutMs);
@@ -169,16 +152,12 @@ public class Worker {
log.info("Worker stopped");
}
- public WorkerConfig config() {
- return config;
- }
-
/**
* Add a new connector.
* @param connConfig connector configuration
* @param ctx context for the connector
*/
- public void addConnector(ConnectorConfig connConfig, ConnectorContext ctx) {
+ public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener lifecycleListener) {
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
@@ -188,22 +167,25 @@ public class Worker {
throw new ConnectException("Connector with name " + connName + " already exists");
final Connector connector = instantiateConnector(connClass);
+ WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, lifecycleListener);
+
log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
- connector.initialize(ctx);
+ workerConnector.initialize();
try {
- connector.start(connConfig.originalsStrings());
+ workerConnector.start(connConfig.originalsStrings());
} catch (ConnectException e) {
throw new ConnectException("Connector threw an exception while starting", e);
}
- connectors.put(connName, connector);
+ connectors.put(connName, workerConnector);
log.info("Finished creating connector {}", connName);
}
/* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
public boolean isSinkConnector(String connName) {
- return SinkConnector.class.isAssignableFrom(connectors.get(connName).getClass());
+ WorkerConnector workerConnector = connectors.get(connName);
+ return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
}
private Class<? extends Connector> getConnectorClass(String connectorAlias) {
@@ -267,10 +249,11 @@ public class Worker {
public List<Map<String, String>> connectorTaskConfigs(String connName, int maxTasks, List<String> sinkTopics) {
log.trace("Reconfiguring connector tasks for {}", connName);
- Connector connector = connectors.get(connName);
- if (connector == null)
+ WorkerConnector workerConnector = connectors.get(connName);
+ if (workerConnector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
+ Connector connector = workerConnector.delegate;
List<Map<String, String>> result = new ArrayList<>();
String taskClassName = connector.taskClass().getName();
for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
@@ -286,16 +269,11 @@ public class Worker {
public void stopConnector(String connName) {
log.info("Stopping connector {}", connName);
- Connector connector = connectors.get(connName);
+ WorkerConnector connector = connectors.get(connName);
if (connector == null)
throw new ConnectException("Connector " + connName + " not found in this worker.");
- try {
- connector.stop();
- } catch (ConnectException e) {
- log.error("Error shutting down connector {}: ", connector, e);
- }
-
+ connector.stop();
connectors.remove(connName);
log.info("Stopped connector {}", connName);
@@ -313,7 +291,7 @@ public class Worker {
* @param id Globally unique ID for this task.
* @param taskConfig the parsed task configuration
*/
- public void addTask(ConnectorTaskId id, TaskConfig taskConfig) {
+ public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener lifecycleListener) {
log.info("Creating task {}", id);
if (tasks.containsKey(id)) {
@@ -327,33 +305,35 @@ public class Worker {
final Task task = instantiateTask(taskClass);
log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
+ final WorkerTask workerTask = buildWorkerTask(id, task, lifecycleListener);
+
+ // Start the task before adding modifying any state, any exceptions are caught higher up the
+ // call chain and there's no cleanup to do here
+ workerTask.initialize(taskConfig.originalsStrings());
+ executor.submit(workerTask);
+
+ if (task instanceof SourceTask) {
+ WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
+ sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
+ }
+ tasks.put(id, workerTask);
+ }
+
+ private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener lifecycleListener) {
// Decide which type of worker task we need based on the type of task.
- final WorkerTask workerTask;
if (task instanceof SourceTask) {
- SourceTask sourceTask = (SourceTask) task;
OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
- workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer,
+ return new WorkerSourceTask(id, (SourceTask) task, lifecycleListener, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, time);
} else if (task instanceof SinkTask) {
- workerTask = new WorkerSinkTask(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+ return new WorkerSinkTask(id, (SinkTask) task, lifecycleListener, config, keyConverter, valueConverter, time);
} else {
log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
}
-
- // Start the task before adding modifying any state, any exceptions are caught higher up the
- // call chain and there's no cleanup to do here
- workerTask.initialize(taskConfig.originalsStrings());
- executor.submit(workerTask);
-
- if (task instanceof SourceTask) {
- WorkerSourceTask workerSourceTask = (WorkerSourceTask) workerTask;
- sourceTaskOffsetCommitter.schedule(id, workerSourceTask);
- }
- tasks.put(id, workerTask);
}
private static Task instantiateTask(Class<? extends Task> taskClass) {
@@ -364,16 +344,39 @@ public class Worker {
}
}
- public void stopTask(ConnectorTaskId id) {
- log.info("Stopping task {}", id);
+ public void stopTasks(Collection<ConnectorTaskId> ids) {
+ for (ConnectorTaskId id : ids)
+ stopTask(getTask(id));
+ }
- WorkerTask task = getTask(id);
+ public void awaitStopTasks(Collection<ConnectorTaskId> ids) {
+ long now = time.milliseconds();
+ long deadline = now + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+ for (ConnectorTaskId id : ids) {
+ long remaining = Math.max(0, deadline - time.milliseconds());
+ awaitStopTask(getTask(id), remaining);
+ }
+ }
+
+ private void awaitStopTask(WorkerTask task, long timeout) {
+ if (!task.awaitStop(timeout)) {
+ log.error("Graceful stop of task {} failed.", task.id());
+ task.cancel();
+ }
+ tasks.remove(task.id());
+ }
+
+ private void stopTask(WorkerTask task) {
+ log.info("Stopping task {}", task.id());
if (task instanceof WorkerSourceTask)
- sourceTaskOffsetCommitter.remove(id);
+ sourceTaskOffsetCommitter.remove(task.id());
task.stop();
- if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
- log.error("Graceful stop of task {} failed.", task);
- tasks.remove(id);
+ }
+
+ public void stopAndAwaitTask(ConnectorTaskId id) {
+ WorkerTask task = getTask(id);
+ stopTask(task);
+ awaitStopTask(task, config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG));
}
/**
@@ -400,4 +403,55 @@ public class Worker {
return internalValueConverter;
}
+ public String workerId() {
+ return workerId;
+ }
+
+ private static class WorkerConnector {
+ private final String connName;
+ private final ConnectorStatus.Listener lifecycleListener;
+ private final ConnectorContext ctx;
+ private final Connector delegate;
+
+ public WorkerConnector(String connName,
+ Connector delegate,
+ ConnectorContext ctx,
+ ConnectorStatus.Listener lifecycleListener) {
+ this.connName = connName;
+ this.ctx = ctx;
+ this.delegate = delegate;
+ this.lifecycleListener = lifecycleListener;
+ }
+
+ public void initialize() {
+ delegate.initialize(ctx);
+ }
+
+ public void start(Map<String, String> props) {
+ try {
+ delegate.start(props);
+ lifecycleListener.onStartup(connName);
+ } catch (Throwable t) {
+ log.error("Error while starting connector {}", connName, t);
+ lifecycleListener.onFailure(connName, t);
+ }
+ }
+
+ public void stop() {
+ try {
+ delegate.stop();
+ lifecycleListener.onShutdown(connName);
+ } catch (Throwable t) {
+ log.error("Error while shutting down connector {}", connName, t);
+ lifecycleListener.onFailure(connName, t);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return delegate.toString();
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 8c5bd9f..eb64355 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -76,11 +76,12 @@ class WorkerSinkTask extends WorkerTask {
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
+ TaskStatus.Listener lifecycleListener,
WorkerConfig workerConfig,
Converter keyConverter,
Converter valueConverter,
Time time) {
- super(id);
+ super(id, lifecycleListener);
this.workerConfig = workerConfig;
this.task = task;
@@ -184,6 +185,7 @@ class WorkerSinkTask extends WorkerTask {
* Initializes and starts the SinkTask.
*/
protected void initializeAndStart() {
+ log.debug("Initializing task {} with config {}", id, taskConfig);
String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new ConnectException("Sink tasks require a list of topics.");
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 562e03e..8542f4c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -17,13 +17,13 @@
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.errors.RetriableException;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
@@ -76,6 +76,7 @@ class WorkerSourceTask extends WorkerTask {
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
+ TaskStatus.Listener lifecycleListener,
Converter keyConverter,
Converter valueConverter,
KafkaProducer<byte[], byte[]> producer,
@@ -83,7 +84,7 @@ class WorkerSourceTask extends WorkerTask {
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
Time time) {
- super(id);
+ super(id, lifecycleListener);
this.workerConfig = workerConfig;
this.task = task;
@@ -147,16 +148,13 @@ class WorkerSourceTask extends WorkerTask {
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
- } catch (Throwable t) {
- log.error("Task {} threw an uncaught and unrecoverable exception", id);
- log.error("Task is being killed and will not recover until manually restarted:", t);
- // It should still be safe to let this fall through and commit offsets since this exception would have
+ } finally {
+ // It should still be safe to commit offsets since any exception would have
// simply resulted in not getting more records but all the existing records should be ok to flush
// and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
// to fail.
+ commitOffsets();
}
-
- commitOffsets();
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index ecaeb7b..cc69c0f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -37,13 +37,21 @@ abstract class WorkerTask implements Runnable {
protected final ConnectorTaskId id;
private final AtomicBoolean stopping;
private final AtomicBoolean running;
+ private final AtomicBoolean cancelled;
private final CountDownLatch shutdownLatch;
+ private final TaskStatus.Listener lifecycleListener;
- public WorkerTask(ConnectorTaskId id) {
+ public WorkerTask(ConnectorTaskId id, TaskStatus.Listener lifecycleListener) {
this.id = id;
this.stopping = new AtomicBoolean(false);
this.running = new AtomicBoolean(false);
+ this.cancelled = new AtomicBoolean(false);
this.shutdownLatch = new CountDownLatch(1);
+ this.lifecycleListener = lifecycleListener;
+ }
+
+ public ConnectorTaskId id() {
+ return id;
}
/**
@@ -61,9 +69,17 @@ abstract class WorkerTask implements Runnable {
}
/**
+ * Cancel this task. This won't actually stop it, but it will prevent the state from being
+ * updated when it eventually does shutdown.
+ */
+ public void cancel() {
+ this.cancelled.set(true);
+ }
+
+ /**
* Wait for this task to finish stopping.
*
- * @param timeoutMs
+ * @param timeoutMs time in milliseconds to await stop
* @return true if successful, false if the timeout was reached
*/
public boolean awaitStop(long timeoutMs) {
@@ -85,19 +101,23 @@ abstract class WorkerTask implements Runnable {
return stopping.get();
}
+ protected boolean isStopped() {
+ return !running.get();
+ }
+
private void doClose() {
try {
close();
} catch (Throwable t) {
- log.error("Unhandled exception in task shutdown {}", id, t);
+ log.error("Task {} threw an uncaught and unrecoverable exception during shutdown", id, t);
+ throw t;
} finally {
running.set(false);
shutdownLatch.countDown();
}
}
- @Override
- public void run() {
+ private void doRun() {
if (!this.running.compareAndSet(false, true))
throw new IllegalStateException("The task cannot be started while still running");
@@ -105,12 +125,27 @@ abstract class WorkerTask implements Runnable {
if (stopping.get())
return;
+ lifecycleListener.onStartup(id);
execute();
} catch (Throwable t) {
- log.error("Unhandled exception in task {}", id, t);
+ log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
+ log.error("Task is being killed and will not recover until manually restarted");
+ throw t;
} finally {
doClose();
}
}
+ @Override
+ public void run() {
+ try {
+ doRun();
+ if (!cancelled.get())
+ lifecycleListener.onShutdown(id);
+ } catch (Throwable t) {
+ if (!cancelled.get())
+ lifecycleListener.onFailure(id, t);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 8b0525b..83ed714 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -17,17 +17,16 @@
package org.apache.kafka.connect.runtime.distributed;
-import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
@@ -35,6 +34,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -79,7 +79,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* (and therefore, also for creating, destroy, and scaling up/down connectors).
* </p>
*/
-public class DistributedHerder implements Herder, Runnable {
+public class DistributedHerder extends AbstractHerder implements Runnable {
private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
@@ -108,15 +108,29 @@ public class DistributedHerder implements Herder, Runnable {
// needed (e.g. task reconfiguration, which requires everyone to coordinate offset commits).
private Set<String> connectorConfigUpdates = new HashSet<>();
private boolean needsReconfigRebalance;
+ private volatile int generation;
private final ExecutorService forwardRequestExecutor;
- public DistributedHerder(DistributedConfig config, Worker worker, String restUrl) {
- this(config, worker, null, null, restUrl, new SystemTime());
+ public DistributedHerder(DistributedConfig config,
+ Time time,
+ Worker worker,
+ StatusBackingStore statusBackingStore,
+ String restUrl) {
+ this(config, worker.workerId(), worker, statusBackingStore, null, null, restUrl, time);
}
- // public for testing
- public DistributedHerder(DistributedConfig config, Worker worker, KafkaConfigStorage configStorage, WorkerGroupMember member, String restUrl, Time time) {
+ // visible for testing
+ DistributedHerder(DistributedConfig config,
+ String workerId,
+ Worker worker,
+ StatusBackingStore statusBackingStore,
+ KafkaConfigStorage configStorage,
+ WorkerGroupMember member,
+ String restUrl,
+ Time time) {
+ super(statusBackingStore, workerId);
+
this.worker = worker;
if (configStorage != null) {
// For testing. Assume configuration has already been performed
@@ -131,7 +145,7 @@ public class DistributedHerder implements Herder, Runnable {
this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
- this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener());
+ this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configStorage, rebalanceListener(), time);
stopping = new AtomicBoolean(false);
rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
@@ -146,11 +160,25 @@ public class DistributedHerder implements Herder, Runnable {
thread.start();
}
+ @Override
+ protected void startServices() {
+ super.startServices();
+ configStorage.start();
+ }
+
+ @Override
+ protected void stopServices() {
+ super.stopServices();
+ if (configStorage != null)
+ configStorage.stop();
+ }
+
+ @Override
public void run() {
try {
log.info("Herder starting");
- configStorage.start();
+ startServices();
log.info("Herder started");
@@ -282,13 +310,10 @@ public class DistributedHerder implements Herder, Runnable {
log.error("Failed to shut down connector " + connName, t);
}
}
- for (ConnectorTaskId taskId : new HashSet<>(worker.taskIds())) {
- try {
- worker.stopTask(taskId);
- } catch (Throwable t) {
- log.error("Failed to shut down task " + taskId, t);
- }
- }
+
+ Set<ConnectorTaskId> tasks = new HashSet<>(worker.taskIds());
+ worker.stopTasks(tasks);
+ worker.awaitStopTasks(tasks);
member.stop();
@@ -299,8 +324,7 @@ public class DistributedHerder implements Herder, Runnable {
request.callback().onCompletion(new ConnectException("Worker is shutting down"), null);
}
- if (configStorage != null)
- configStorage.stop();
+ stopServices();
}
}
@@ -388,7 +412,7 @@ public class DistributedHerder implements Herder, Runnable {
}
@Override
- public void putConnectorConfig(final String connName, Map<String, String> config, final boolean allowReplace,
+ public void putConnectorConfig(final String connName, final Map<String, String> config, final boolean allowReplace,
final Callback<Created<ConnectorInfo>> callback) {
final Map<String, String> connConfig;
if (config == null) {
@@ -515,6 +539,10 @@ public class DistributedHerder implements Herder, Runnable {
);
}
+ @Override
+ public int generation() {
+ return generation;
+ }
// Should only be called from work thread, so synchronization should not be needed
private boolean isLeader() {
@@ -649,7 +677,7 @@ public class DistributedHerder implements Herder, Runnable {
log.info("Starting task {}", taskId);
Map<String, String> configs = configState.taskConfig(taskId);
TaskConfig taskConfig = new TaskConfig(configs);
- worker.addTask(taskId, taskConfig);
+ worker.startTask(taskId, taskConfig, this);
} catch (ConfigException e) {
log.error("Couldn't instantiate task " + taskId + " because it has an invalid task " +
"configuration. This task will not execute until reconfigured.", e);
@@ -666,7 +694,7 @@ public class DistributedHerder implements Herder, Runnable {
ConnectorConfig connConfig = new ConnectorConfig(configs);
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName);
- worker.addConnector(connConfig, ctx);
+ worker.startConnector(connConfig, ctx, this);
// Immediately request configuration since this could be a brand new connector. However, also only update those
// task configs if they are actually different from the existing ones to avoid unnecessary updates when this is
@@ -816,7 +844,7 @@ public class DistributedHerder implements Herder, Runnable {
callback.onCompletion(error, null);
}
};
- };
+ }
// Config callbacks are triggered from the KafkaConfigStorage thread
@@ -853,11 +881,22 @@ public class DistributedHerder implements Herder, Runnable {
};
}
+ private void updateDeletedConnectorStatus() {
+ ClusterConfigState snapshot = configStorage.snapshot();
+ Set<String> connectors = snapshot.connectors();
+ for (String connector : statusBackingStore.connectors()) {
+ if (!connectors.contains(connector)) {
+ log.debug("Cleaning status information for connector {}", connector);
+ onDeletion(connector);
+ }
+ }
+ }
+
// Rebalances are triggered internally from the group member, so these are always executed in the work thread.
private WorkerRebalanceListener rebalanceListener() {
return new WorkerRebalanceListener() {
@Override
- public void onAssigned(ConnectProtocol.Assignment assignment) {
+ public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
// This callback just logs the info and saves it. The actual response is handled in the main loop, which
// ensures the group member's logic for rebalancing can complete, potentially long-running steps to
// catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
@@ -866,8 +905,16 @@ public class DistributedHerder implements Herder, Runnable {
log.info("Joined group and got assignment: {}", assignment);
synchronized (DistributedHerder.this) {
DistributedHerder.this.assignment = assignment;
+ DistributedHerder.this.generation = generation;
rebalanceResolved = false;
}
+
+ // Delete the statuses of all connectors removed prior to the start of this reblaance. This has to
+ // be done after the rebalance completes to avoid race conditions as the previous generation attempts
+ // to change the state to UNASSIGNED after tasks have been stopped.
+ if (isLeader())
+ updateDeletedConnectorStatus();
+
// We *must* interrupt any poll() call since this could occur when the poll starts, and we might then
// sleep in the poll() for a long time. Forcing a wakeup ensures we'll get to process this event in the
// main thread.
@@ -890,18 +937,24 @@ public class DistributedHerder implements Herder, Runnable {
// unnecessary repeated connections to the source/sink system.
for (String connectorName : connectors)
worker.stopConnector(connectorName);
+
// TODO: We need to at least commit task offsets, but if we could commit offsets & pause them instead of
// stopping them then state could continue to be reused when the task remains on this worker. For example,
// this would avoid having to close a connection and then reopen it when the task is assigned back to this
// worker again.
- for (ConnectorTaskId taskId : tasks)
- worker.stopTask(taskId);
+ if (!tasks.isEmpty()) {
+ worker.stopTasks(tasks); // trigger stop() for all tasks
+ worker.awaitStopTasks(tasks); // await stopping tasks
+ }
+ // Ensure that all status updates have been pushed to the storage system before rebalancing.
+ // Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
+ // completes.
+ statusBackingStore.flush();
log.info("Finished stopping tasks in preparation for rebalance");
} else {
log.info("Wasn't unable to resume work after last rebalance, can skip stopping connectors and tasks");
}
-
}
};
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 79199a6..fa50fbf 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -112,7 +112,7 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
// tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
// up to date, try to rejoin again, leaving the group and backing off, etc.).
rejoinRequested = false;
- listener.onAssigned(assignmentSnapshot);
+ listener.onAssigned(assignmentSnapshot, generation);
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 4b24312..9f05040 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -31,7 +31,6 @@ import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.utils.AppInfoParser;
-import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
import org.slf4j.Logger;
@@ -67,9 +66,13 @@ public class WorkerGroupMember {
private boolean stopped = false;
- public WorkerGroupMember(DistributedConfig config, String restUrl, KafkaConfigStorage configStorage, WorkerRebalanceListener listener) {
+ public WorkerGroupMember(DistributedConfig config,
+ String restUrl,
+ KafkaConfigStorage configStorage,
+ WorkerRebalanceListener listener,
+ Time time) {
try {
- this.time = new SystemTime();
+ this.time = time;
String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
index 40f55d2..bc833e9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
@@ -29,7 +29,7 @@ public interface WorkerRebalanceListener {
* Invoked when a new assignment is created by joining the Connect worker group. This is invoked for both successful
* and unsuccessful assignments.
*/
- void onAssigned(ConnectProtocol.Assignment assignment);
+ void onAssigned(ConnectProtocol.Assignment assignment, int generation);
/**
* Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index a544fb0..dbac58f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -50,6 +50,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
+import java.net.URI;
import java.net.URL;
import java.util.List;
import java.util.Map;
@@ -65,7 +66,6 @@ public class RestServer {
private static final ObjectMapper JSON_SERDE = new ObjectMapper();
private final WorkerConfig config;
- private Herder herder;
private Server jettyServer;
/**
@@ -90,8 +90,6 @@ public class RestServer {
public void start(Herder herder) {
log.info("Starting REST server");
- this.herder = herder;
-
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(new JacksonJsonProvider());
@@ -151,7 +149,7 @@ public class RestServer {
* Get the URL to advertise to other workers and clients. This uses the default connector from the embedded Jetty
* server, unless overrides for advertised hostname and/or port are provided via configs.
*/
- public String advertisedUrl() {
+ public URI advertisedUrl() {
UriBuilder builder = UriBuilder.fromUri(jettyServer.getURI());
String advertisedHostname = config.getString(WorkerConfig.REST_ADVERTISED_HOST_NAME_CONFIG);
if (advertisedHostname != null && !advertisedHostname.isEmpty())
@@ -161,10 +159,9 @@ public class RestServer {
builder.port(advertisedPort);
else
builder.port(config.getInt(WorkerConfig.REST_PORT_CONFIG));
- return builder.build().toString();
+ return builder.build();
}
-
/**
* @param url HTTP connection will be established with this url.
* @param method HTTP method ("GET", "POST", "PUT", etc.)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
new file mode 100644
index 0000000..179c0db
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+public class ConnectorStateInfo {
+
+ private final String name;
+ private final ConnectorState connector;
+ private final List<TaskState> tasks;
+
+ @JsonCreator
+ public ConnectorStateInfo(@JsonProperty("name") String name,
+ @JsonProperty("connector") ConnectorState connector,
+ @JsonProperty("tasks") List<TaskState> tasks) {
+ this.name = name;
+ this.connector = connector;
+ this.tasks = tasks;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public ConnectorState connector() {
+ return connector;
+ }
+
+ @JsonProperty
+ public List<TaskState> tasks() {
+ return tasks;
+ }
+
+ public abstract static class AbstractState {
+ private final String state;
+ private final String trace;
+ private final String workerId;
+
+ public AbstractState(String state, String workerId, String trace) {
+ this.state = state;
+ this.workerId = workerId;
+ this.trace = trace;
+ }
+
+ @JsonProperty
+ public String state() {
+ return state;
+ }
+
+ @JsonProperty("worker_id")
+ public String workerId() {
+ return workerId;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public String trace() {
+ return trace;
+ }
+ }
+
+ public static class ConnectorState extends AbstractState {
+ public ConnectorState(String state, String worker, String msg) {
+ super(state, worker, msg);
+ }
+ }
+
+ public static class TaskState extends AbstractState implements Comparable<TaskState> {
+ private final int id;
+
+ public TaskState(int id, String state, String worker, String msg) {
+ super(state, worker, msg);
+ this.id = id;
+ }
+
+ @JsonProperty
+ public int id() {
+ return id;
+ }
+
+ @Override
+ public int compareTo(TaskState that) {
+ return Integer.compare(this.id, that.id);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index c95b723..d0d940b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -23,9 +23,11 @@ import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,8 +100,7 @@ public class ConnectorsResource {
public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference<ConnectorInfo>() {
- });
+ return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null);
}
@GET
@@ -107,8 +108,13 @@ public class ConnectorsResource {
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
- return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference<Map<String, String>>() {
- });
+ return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null);
+ }
+
+ @GET
+ @Path("/{connector}/status")
+ public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
+ return herder.connectorStatus(connector);
}
@PUT
@@ -145,6 +151,13 @@ public class ConnectorsResource {
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs);
}
+ @GET
+ @Path("/{connector}/tasks/{task}/status")
+ public ConnectorStateInfo.TaskState getTaskStatus(@PathParam("connector") String connector,
+ @PathParam("task") Integer task) throws Throwable {
+ return herder.taskStatus(new ConnectorTaskId(connector, task));
+ }
+
@DELETE
@Path("/{connector}")
public void destroyConnector(final @PathParam("connector") String connector) throws Throwable {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
index 89847ab..707470f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java
@@ -20,13 +20,15 @@ package org.apache.kafka.connect.runtime.standalone;
import org.apache.kafka.connect.errors.AlreadyExistsException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.AbstractHerder;
import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.HerderConnectorContext;
import org.apache.kafka.connect.runtime.TaskConfig;
import org.apache.kafka.connect.runtime.Worker;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.storage.MemoryStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
@@ -38,23 +40,33 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Single process, in-memory "herder". Useful for a standalone Kafka Connect process.
*/
-public class StandaloneHerder implements Herder {
+public class StandaloneHerder extends AbstractHerder {
private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
private final Worker worker;
private HashMap<String, ConnectorState> connectors = new HashMap<>();
public StandaloneHerder(Worker worker) {
+ this(worker.workerId(), worker, new MemoryStatusBackingStore());
+ }
+
+ // visible for testing
+ StandaloneHerder(String workerId,
+ Worker worker,
+ StatusBackingStore statusBackingStore) {
+ super(statusBackingStore, workerId);
this.worker = worker;
}
public synchronized void start() {
log.info("Herder starting");
+ startServices();
log.info("Herder started");
}
@@ -78,6 +90,11 @@ public class StandaloneHerder implements Herder {
}
@Override
+ public int generation() {
+ return 0;
+ }
+
+ @Override
public synchronized void connectors(Callback<Collection<String>> callback) {
callback.onCompletion(null, new ArrayList<>(connectors.keySet()));
}
@@ -131,8 +148,10 @@ public class StandaloneHerder implements Herder {
if (config == null) // Deletion, kill tasks as well
removeConnectorTasks(connName);
worker.stopConnector(connName);
- if (config == null)
+ if (config == null) {
connectors.remove(connName);
+ onDeletion(connName);
+ }
} else {
if (config == null) {
// Deletion, must already exist
@@ -194,7 +213,7 @@ public class StandaloneHerder implements Herder {
ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
ConnectorState state = connectors.get(connName);
- worker.addConnector(connConfig, new HerderConnectorContext(this, connName));
+ worker.startConnector(connConfig, new HerderConnectorContext(this, connName), this);
if (state == null) {
connectors.put(connName, new ConnectorState(connectorProps, connConfig));
} else {
@@ -219,7 +238,7 @@ public class StandaloneHerder implements Herder {
ConnectorTaskId taskId = new ConnectorTaskId(connName, index);
TaskConfig config = new TaskConfig(taskConfigMap);
try {
- worker.addTask(taskId, config);
+ worker.startTask(taskId, config, this);
} catch (Throwable e) {
log.error("Failed to add task {}: ", taskId, e);
// Swallow this so we can continue updating the rest of the tasks
@@ -230,19 +249,21 @@ public class StandaloneHerder implements Herder {
}
}
+ private Set<ConnectorTaskId> tasksFor(ConnectorState state) {
+ Set<ConnectorTaskId> tasks = new HashSet<>();
+ for (int i = 0; i < state.taskConfigs.size(); i++)
+ tasks.add(new ConnectorTaskId(state.name, i));
+ return tasks;
+ }
+
private void removeConnectorTasks(String connName) {
ConnectorState state = connectors.get(connName);
- for (int i = 0; i < state.taskConfigs.size(); i++) {
- ConnectorTaskId taskId = new ConnectorTaskId(connName, i);
- try {
- worker.stopTask(taskId);
- } catch (ConnectException e) {
- log.error("Failed to stop task {}: ", taskId, e);
- // Swallow this so we can continue stopping the rest of the tasks
- // FIXME: Forcibly kill the task?
- }
+ Set<ConnectorTaskId> tasks = tasksFor(state);
+ if (!tasks.isEmpty()) {
+ worker.stopTasks(tasks);
+ worker.awaitStopTasks(tasks);
+ state.taskConfigs = new ArrayList<>();
}
- state.taskConfigs = new ArrayList<>();
}
private void updateConnectorTasks(String connName) {
[2/3] kafka git commit: KAFKA-3093: Add Connect status tracking API
Posted by ew...@apache.org.
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 4b60131..9bd191e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -20,6 +20,10 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
@@ -219,15 +223,14 @@ public class KafkaConfigStorage {
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(configs);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(configs);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 65bd9d0..dfb8c51 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback;
@@ -66,15 +68,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(configs);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(configs);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
new file mode 100644
index 0000000..948a325
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * StatusBackingStore implementation which uses a compacted topic for storage
+ * of connector and task status information. When a state change is observed,
+ * the new state is written to the compacted topic. The new state will not be
+ * visible until it has been read back from the topic.
+ *
+ * In spite of their names, the putSafe() methods cannot guarantee the safety
+ * of the write (since Kafka itself cannot provide such guarantees currently),
+ * but it can avoid specific unsafe conditions. In particular, we putSafe()
+ * allows writes in the following conditions:
+ *
+ * 3) It is (probably) safe to overwrite the state if there is no previous
+ * value.
+ * 1) It is (probably) safe to overwrite the state if the previous value was
+ * set by a worker with the same workerId.
+ * 2) It is (probably) safe to overwrite the previous state if the current
+ * generation is higher than the previous .
+ *
+ * Basically all these conditions do is reduce the window for conflicts. They
+ * obviously cannot take into account in-flight requests.
+ *
+ */
+public class KafkaStatusBackingStore implements StatusBackingStore {
+ private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
+
+ public static final String STATUS_TOPIC_CONFIG = "status.storage.topic";
+
+ private static final String TASK_STATUS_PREFIX = "status-task-";
+ private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
+
+ public static final String STATE_KEY_NAME = "state";
+ public static final String TRACE_KEY_NAME = "trace";
+ public static final String WORKER_ID_KEY_NAME = "worker_id";
+ public static final String GENERATION_KEY_NAME = "generation";
+
+ private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
+ .field(STATE_KEY_NAME, Schema.STRING_SCHEMA)
+ .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
+ .field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA)
+ .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA)
+ .build();
+
+ private final Time time;
+ private final Converter converter;
+ private final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
+ private final Map<String, CacheEntry<ConnectorStatus>> connectors;
+
+ private String topic;
+ private KafkaBasedLog<String, byte[]> kafkaLog;
+ private int generation;
+
+ public KafkaStatusBackingStore(Time time, Converter converter) {
+ this.time = time;
+ this.converter = converter;
+ this.tasks = new Table<>();
+ this.connectors = new HashMap<>();
+ }
+
+ // visible for testing
+ KafkaStatusBackingStore(Time time, Converter converter, String topic, KafkaBasedLog<String, byte[]> kafkaLog) {
+ this(time, converter);
+ this.kafkaLog = kafkaLog;
+ this.topic = topic;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ if (configs.get(STATUS_TOPIC_CONFIG) == null)
+ throw new ConnectException("Must specify topic for connector status.");
+ this.topic = (String) configs.get(STATUS_TOPIC_CONFIG);
+
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.putAll(configs);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(configs);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+ Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
+ read(record);
+ }
+ };
+ this.kafkaLog = new KafkaBasedLog<>(topic, producerProps, consumerProps, readCallback, time);
+ }
+
+ @Override
+ public void start() {
+ kafkaLog.start();
+
+ // read to the end on startup to ensure that api requests see the most recent states
+ kafkaLog.readToEnd();
+ }
+
+ @Override
+ public void stop() {
+ kafkaLog.stop();
+ }
+
+ @Override
+ public void put(final ConnectorStatus status) {
+ sendConnectorStatus(status, false);
+ }
+
+ @Override
+ public void putSafe(final ConnectorStatus status) {
+ sendConnectorStatus(status, true);
+ }
+
+ @Override
+ public void put(final TaskStatus status) {
+ sendTaskStatus(status, false);
+ }
+
+ @Override
+ public void putSafe(final TaskStatus status) {
+ sendTaskStatus(status, true);
+ }
+
+ @Override
+ public void flush() {
+ kafkaLog.flush();
+ }
+
+ private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
+ String connector = status.id();
+ CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+ String key = CONNECTOR_STATUS_PREFIX + connector;
+ send(key, status, entry, safeWrite);
+ }
+
+ private void sendTaskStatus(final TaskStatus status, boolean safeWrite) {
+ ConnectorTaskId taskId = status.id();
+ CacheEntry<TaskStatus> entry = getOrAdd(taskId);
+ String key = TASK_STATUS_PREFIX + taskId.connector() + "-" + taskId.task();
+ send(key, status, entry, safeWrite);
+ }
+
+ private <V extends AbstractStatus> void send(final String key,
+ final V status,
+ final CacheEntry<V> entry,
+ final boolean safeWrite) {
+ final int sequence;
+ synchronized (this) {
+ this.generation = status.generation();
+ if (safeWrite && !entry.canWrite(status))
+ return;
+ sequence = entry.increment();
+ }
+
+ final byte[] value = status.state() == ConnectorStatus.State.DESTROYED ? null : serialize(status);
+
+ kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ if (exception instanceof RetriableException) {
+ synchronized (this) {
+ if (entry.isDeleted()
+ || status.generation() != generation
+ || (safeWrite && !entry.canWrite(status, sequence)))
+ return;
+ }
+ kafkaLog.send(key, value, this);
+ } else {
+ log.error("Failed to write status update", exception);
+ }
+ }
+ }
+ });
+ }
+
+ private synchronized CacheEntry<ConnectorStatus> getOrAdd(String connector) {
+ CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+ if (entry == null) {
+ entry = new CacheEntry<>();
+ connectors.put(connector, entry);
+ }
+ return entry;
+ }
+
+ private synchronized void remove(String connector) {
+ CacheEntry<ConnectorStatus> removed = connectors.remove(connector);
+ if (removed != null)
+ removed.delete();
+
+ Map<Integer, CacheEntry<TaskStatus>> tasks = this.tasks.remove(connector);
+ if (tasks != null) {
+ for (CacheEntry<TaskStatus> taskEntry : tasks.values())
+ taskEntry.delete();
+ }
+ }
+
+ private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId task) {
+ CacheEntry<TaskStatus> entry = tasks.get(task.connector(), task.task());
+ if (entry == null) {
+ entry = new CacheEntry<>();
+ tasks.put(task.connector(), task.task(), entry);
+ }
+ return entry;
+ }
+
+ private synchronized void remove(ConnectorTaskId id) {
+ CacheEntry<TaskStatus> removed = tasks.remove(id.connector(), id.task());
+ if (removed != null)
+ removed.delete();
+ }
+
+ @Override
+ public synchronized TaskStatus get(ConnectorTaskId id) {
+ CacheEntry<TaskStatus> entry = tasks.get(id.connector(), id.task());
+ return entry == null ? null : entry.get();
+ }
+
+ @Override
+ public synchronized ConnectorStatus get(String connector) {
+ CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+ return entry == null ? null : entry.get();
+ }
+
+ @Override
+ public synchronized Collection<TaskStatus> getAll(String connector) {
+ List<TaskStatus> res = new ArrayList<>();
+ for (CacheEntry<TaskStatus> statusEntry : tasks.row(connector).values()) {
+ TaskStatus status = statusEntry.get();
+ if (status != null)
+ res.add(status);
+ }
+ return res;
+ }
+
+ @Override
+ public synchronized Set<String> connectors() {
+ return new HashSet<>(connectors.keySet());
+ }
+
+ private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
+ try {
+ SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+ if (!(schemaAndValue.value() instanceof Map)) {
+ log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+ return null;
+ }
+
+ Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+ TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+ String trace = (String) statusMap.get(TRACE_KEY_NAME);
+ String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+ int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+ return new ConnectorStatus(connector, state, trace, workerUrl, generation);
+ } catch (Exception e) {
+ log.error("Failed to deserialize connector status", e);
+ return null;
+ }
+ }
+
+ private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
+ try {
+ SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+ if (!(schemaAndValue.value() instanceof Map)) {
+ log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+ return null;
+ }
+ Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+ TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+ String trace = (String) statusMap.get(TRACE_KEY_NAME);
+ String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+ int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+ return new TaskStatus(taskId, state, workerUrl, generation, trace);
+ } catch (Exception e) {
+ log.error("Failed to deserialize task status", e);
+ return null;
+ }
+ }
+
+ private byte[] serialize(AbstractStatus status) {
+ Struct struct = new Struct(STATUS_SCHEMA_V0);
+ struct.put(STATE_KEY_NAME, status.state().name());
+ if (status.trace() != null)
+ struct.put(TRACE_KEY_NAME, status.trace());
+ struct.put(WORKER_ID_KEY_NAME, status.workerId());
+ struct.put(GENERATION_KEY_NAME, status.generation());
+ return converter.fromConnectData(topic, STATUS_SCHEMA_V0, struct);
+ }
+
+ private String parseConnectorStatusKey(String key) {
+ return key.substring(CONNECTOR_STATUS_PREFIX.length());
+ }
+
+ private ConnectorTaskId parseConnectorTaskId(String key) {
+ String[] parts = key.split("-");
+ if (parts.length < 4) return null;
+
+ try {
+ int taskNum = Integer.parseInt(parts[parts.length - 1]);
+ String connectorName = Utils.join(Arrays.copyOfRange(parts, 2, parts.length - 1), "-");
+ return new ConnectorTaskId(connectorName, taskNum);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid task status key {}", key);
+ return null;
+ }
+ }
+
+ private void readConnectorStatus(String key, byte[] value) {
+ String connector = parseConnectorStatusKey(key);
+ if (connector == null || connector.isEmpty()) {
+ log.warn("Discarding record with invalid connector status key {}", key);
+ return;
+ }
+
+ if (value == null) {
+ log.trace("Removing status for connector {}", connector);
+ remove(connector);
+ return;
+ }
+
+ ConnectorStatus status = parseConnectorStatus(connector, value);
+ if (status == null)
+ return;
+
+ synchronized (KafkaStatusBackingStore.this) {
+ log.trace("Received connector {} status update {}", connector, status);
+ CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+ entry.put(status);
+ }
+ }
+
+ private void readTaskStatus(String key, byte[] value) {
+ ConnectorTaskId id = parseConnectorTaskId(key);
+ if (id == null) {
+ log.warn("Discarding record with invalid task status key {}", key);
+ return;
+ }
+
+ if (value == null) {
+ log.trace("Removing task status for {}", id);
+ remove(id);
+ return;
+ }
+
+ TaskStatus status = parseTaskStatus(id, value);
+ if (status == null) {
+ log.warn("Failed to parse task status with key {}", key);
+ return;
+ }
+
+ synchronized (KafkaStatusBackingStore.this) {
+ log.trace("Received task {} status update {}", id, status);
+ CacheEntry<TaskStatus> entry = getOrAdd(id);
+ entry.put(status);
+ }
+ }
+
+ // visible for testing
+ void read(ConsumerRecord<String, byte[]> record) {
+ String key = record.key();
+ if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
+ readConnectorStatus(key, record.value());
+ } else if (key.startsWith(TASK_STATUS_PREFIX)) {
+ readTaskStatus(key, record.value());
+ } else {
+ log.warn("Discarding record with invalid key {}", key);
+ }
+ }
+
+ private static class CacheEntry<T extends AbstractStatus> {
+ private T value = null;
+ private int sequence = 0;
+ private boolean deleted = false;
+
+ public int increment() {
+ return ++sequence;
+ }
+
+ public void put(T value) {
+ this.value = value;
+ }
+
+ public T get() {
+ return value;
+ }
+
+ public void delete() {
+ this.deleted = true;
+ }
+
+ public boolean isDeleted() {
+ return deleted;
+ }
+
+ public boolean canWrite(T status) {
+ return value != null &&
+ (value.workerId().equals(status.workerId())
+ || value.generation() <= status.generation());
+ }
+
+ public boolean canWrite(T status, int sequence) {
+ return canWrite(status) && this.sequence == sequence;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
new file mode 100644
index 0000000..96b235b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.Table;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MemoryStatusBackingStore implements StatusBackingStore {
+ private final Table<String, Integer, TaskStatus> tasks;
+ private final Map<String, ConnectorStatus> connectors;
+
+ public MemoryStatusBackingStore() {
+ this.tasks = new Table<>();
+ this.connectors = new HashMap<>();
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public synchronized void put(ConnectorStatus status) {
+ if (status.state() == ConnectorStatus.State.DESTROYED)
+ connectors.remove(status.id());
+ else
+ connectors.put(status.id(), status);
+ }
+
+ @Override
+ public synchronized void putSafe(ConnectorStatus status) {
+ put(status);
+ }
+
+ @Override
+ public synchronized void put(TaskStatus status) {
+ if (status.state() == TaskStatus.State.DESTROYED)
+ tasks.remove(status.id().connector(), status.id().task());
+ else
+ tasks.put(status.id().connector(), status.id().task(), status);
+ }
+
+ @Override
+ public synchronized void putSafe(TaskStatus status) {
+ put(status);
+ }
+
+ @Override
+ public synchronized TaskStatus get(ConnectorTaskId id) {
+ return tasks.get(id.connector(), id.task());
+ }
+
+ @Override
+ public synchronized ConnectorStatus get(String connector) {
+ return connectors.get(connector);
+ }
+
+ @Override
+ public synchronized Collection<TaskStatus> getAll(String connector) {
+ return new HashSet<>(tasks.row(connector).values());
+ }
+
+ @Override
+ public synchronized Set<String> connectors() {
+ return new HashSet<>(connectors.keySet());
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
new file mode 100644
index 0000000..6464f89
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface StatusBackingStore extends Configurable {
+
+ /**
+ * Start dependent services (if needed)
+ */
+ void start();
+
+ /**
+ * Stop dependent services (if needed)
+ */
+ void stop();
+
+ /**
+ * Set the state of the connector to the given value.
+ * @param status the status of the connector
+ */
+ void put(ConnectorStatus status);
+
+ /**
+ * Safely set the state of the connector to the given value. What is
+ * considered "safe" depends on the implementation, but basically it
+ * means that the store can provide higher assurance that another worker
+ * hasn't concurrently written any conflicting data.
+ * @param status the status of the connector
+ */
+ void putSafe(ConnectorStatus status);
+
+ /**
+ * Set the state of the connector to the given value.
+ * @param status the status of the task
+ */
+ void put(TaskStatus status);
+
+ /**
+ * Safely set the state of the task to the given value. What is
+ * considered "safe" depends on the implementation, but basically it
+ * means that the store can provide higher assurance that another worker
+ * hasn't concurrently written any conflicting data.
+ * @param status the status of the task
+ */
+ void putSafe(TaskStatus status);
+
+ /**
+ * Get the current state of the task.
+ * @param id the id of the task
+ * @return the state or null if there is none
+ */
+ TaskStatus get(ConnectorTaskId id);
+
+ /**
+ * Get the current state of the connector.
+ * @param connector the connector name
+ * @return the state or null if there is none
+ */
+ ConnectorStatus get(String connector);
+
+ /**
+ * Get the states of all tasks for the given connector.
+ * @param connector the connector name
+ * @return a map from task ids to their respective status
+ */
+ Collection<TaskStatus> getAll(String connector);
+
+ /**
+ * Get all cached connectors.
+ * @return the set of connector names
+ */
+ Set<String> connectors();
+
+ /**
+ * Flush any pending writes
+ */
+ void flush();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index c82645c..5ab60cd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -99,8 +99,11 @@ public class KafkaBasedLog<K, V> {
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
*/
- public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
- Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+ public KafkaBasedLog(String topic,
+ Map<String, Object> producerConfigs,
+ Map<String, Object> consumerConfigs,
+ Callback<ConsumerRecord<K, V>> consumedCallback,
+ Time time) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
@@ -140,9 +143,9 @@ public class KafkaBasedLog<K, V> {
thread = new WorkThread();
thread.start();
- log.info("Finished reading KafakBasedLog for topic " + topic);
+ log.info("Finished reading KafkaBasedLog for topic " + topic);
- log.info("Started KafakBasedLog for topic " + topic);
+ log.info("Started KafkaBasedLog for topic " + topic);
}
public void stop() {
@@ -198,6 +201,13 @@ public class KafkaBasedLog<K, V> {
}
/**
+ * Flush the underlying producer to ensure that all pending writes have been sent.
+ */
+ public void flush() {
+ producer.flush();
+ }
+
+ /**
* Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
* @return the future associated with the operation
*/
@@ -219,12 +229,18 @@ public class KafkaBasedLog<K, V> {
private Producer<K, V> createProducer() {
// Always require producer acks to all to ensure durable writes
producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+
+ // Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
+ producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new KafkaProducer<>(producerConfigs);
}
private Consumer<K, V> createConsumer() {
// Always force reset to the beginning of the log since this class wants to consume all available log data
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Turn off autocommit since we always want to consume the full log
+ consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(consumerConfigs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
new file mode 100644
index 0000000..f36d3e5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Table<R, C, V> {
+
+ private Map<R, Map<C, V>> table = new HashMap<>();
+
+ public V put(R row, C column, V value) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null) {
+ columns = new HashMap<>();
+ table.put(row, columns);
+ }
+ return columns.put(column, value);
+ }
+
+ public V get(R row, C column) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null)
+ return null;
+ return columns.get(column);
+ }
+
+ public Map<C, V> remove(R row) {
+ return table.remove(row);
+ }
+
+ public V remove(R row, C column) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null)
+ return null;
+
+ V value = columns.remove(column);
+ if (columns.isEmpty())
+ table.remove(row);
+ return value;
+ }
+
+ public Map<C, V> row(R row) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null)
+ return Collections.emptyMap();
+ return Collections.unmodifiableMap(columns);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
new file mode 100644
index 0000000..f17023c
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AbstractHerderTest extends EasyMockSupport {
+
+ @Test
+ public void connectorStatus() {
+ String workerId = "workerId";
+ String connector = "connector";
+ int generation = 5;
+ ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+
+ StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(StatusBackingStore.class, String.class)
+ .withArgs(store, workerId)
+ .addMockedMethod("generation")
+ .createMock();
+
+ EasyMock.expect(herder.generation()).andStubReturn(generation);
+
+ EasyMock.expect(store.get(connector))
+ .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+
+ EasyMock.expect(store.getAll(connector))
+ .andReturn(Collections.singletonList(
+ new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
+
+ replayAll();
+
+
+ ConnectorStateInfo state = herder.connectorStatus(connector);
+
+ assertEquals(connector, state.name());
+ assertEquals("RUNNING", state.connector().state());
+ assertEquals(1, state.tasks().size());
+ assertEquals(workerId, state.connector().workerId());
+
+ ConnectorStateInfo.TaskState taskState = state.tasks().get(0);
+ assertEquals(0, taskState.id());
+ assertEquals("UNASSIGNED", taskState.state());
+ assertEquals(workerId, taskState.workerId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void taskStatus() {
+ ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+ String workerId = "workerId";
+
+ StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(StatusBackingStore.class, String.class)
+ .withArgs(store, workerId)
+ .addMockedMethod("generation")
+ .createMock();
+
+ EasyMock.expect(herder.generation()).andStubReturn(5);
+
+ final Capture<TaskStatus> statusCapture = EasyMock.newCapture();
+ store.putSafe(EasyMock.capture(statusCapture));
+ EasyMock.expectLastCall();
+
+ EasyMock.expect(store.get(taskId)).andAnswer(new IAnswer<TaskStatus>() {
+ @Override
+ public TaskStatus answer() throws Throwable {
+ return statusCapture.getValue();
+ }
+ });
+
+ replayAll();
+
+ herder.onFailure(taskId, new RuntimeException());
+
+ ConnectorStateInfo.TaskState taskState = herder.taskStatus(taskId);
+ assertEquals(workerId, taskState.workerId());
+ assertEquals("FAILED", taskState.state());
+ assertEquals(0, taskState.id());
+ assertNotNull(taskState.trace());
+
+ verifyAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 978e3a1..6721609 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -98,6 +98,8 @@ public class WorkerSinkTaskTest {
@Mock
private Converter valueConverter;
@Mock
+ private TaskStatus.Listener statusListener;
+ @Mock
private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@@ -116,7 +118,7 @@ public class WorkerSinkTaskTest {
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
- taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+ taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index e202209..77f1ed0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -97,11 +97,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
@Mock private Converter keyConverter;
- @Mock
- private Converter valueConverter;
+ @Mock private Converter valueConverter;
private WorkerSinkTask workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+ @Mock private TaskStatus.Listener statusListener;
private long recordsReturned;
@@ -120,7 +120,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
- taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+ taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index c6eb0c5..8e9eb72 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
@@ -90,6 +91,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Mock private OffsetStorageWriter offsetWriter;
private WorkerSourceTask workerTask;
@Mock private Future<RecordMetadata> sendFuture;
+ @MockStrict private TaskStatus.Listener statusListener;
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
@@ -113,7 +115,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private void createWorkerTask() {
- workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
+ workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, new SystemTime());
}
@@ -125,6 +127,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
final CountDownLatch pollLatch = expectPolls(10);
// In this test, we don't flush, so nothing goes any further than the offset writer
@@ -133,6 +137,42 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(EMPTY_TASK_PROPS);
+ executor.submit(workerTask);
+ awaitPolls(pollLatch);
+ workerTask.stop();
+ assertEquals(true, workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPoll() throws Exception {
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
+
+ final CountDownLatch pollLatch = expectPolls(1);
+ RuntimeException exception = new RuntimeException();
+ EasyMock.expect(sourceTask.poll()).andThrow(exception);
+
+ statusListener.onFailure(taskId, exception);
+ EasyMock.expectLastCall();
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
PowerMock.replayAll();
workerTask.initialize(EMPTY_TASK_PROPS);
@@ -153,6 +193,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
// We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1);
@@ -164,6 +206,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(EMPTY_TASK_PROPS);
@@ -185,6 +230,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
// We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1);
@@ -194,6 +241,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(EMPTY_TASK_PROPS);
@@ -269,6 +319,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index f5213a6..20e3fe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.partialMockBuilder;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
@@ -33,22 +36,32 @@ public class WorkerTaskTest {
@Test
public void standardStartup() {
+ ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+ TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class)
- .withArgs(new ConnectorTaskId("foo", 0))
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+ .withArgs(taskId, statusListener)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
.createStrictMock();
workerTask.initialize(EMPTY_TASK_PROPS);
- EasyMock.expectLastCall();
+ expectLastCall();
workerTask.execute();
- EasyMock.expectLastCall();
+ expectLastCall();
+
+ statusListener.onStartup(taskId);
+ expectLastCall();
workerTask.close();
- EasyMock.expectLastCall();
+ expectLastCall();
+
+ statusListener.onShutdown(taskId);
+ expectLastCall();
replay(workerTask);
@@ -62,9 +75,13 @@ public class WorkerTaskTest {
@Test
public void stopBeforeStarting() {
+ ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+ TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class)
- .withArgs(new ConnectorTaskId("foo", 0))
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+ .withArgs(taskId, statusListener)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
@@ -88,5 +105,62 @@ public class WorkerTaskTest {
verify(workerTask);
}
+ @Test
+ public void cancelBeforeStopping() throws Exception {
+ ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+ TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
+ WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+ .withArgs(taskId, statusListener)
+ .addMockedMethod("initialize")
+ .addMockedMethod("execute")
+ .addMockedMethod("close")
+ .createStrictMock();
+
+ final CountDownLatch stopped = new CountDownLatch(1);
+ final Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ stopped.await();
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ workerTask.initialize(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+
+ workerTask.execute();
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ thread.start();
+ return null;
+ }
+ });
+
+ statusListener.onStartup(taskId);
+ expectLastCall();
+
+ workerTask.close();
+ expectLastCall();
+
+ // there should be no call to onShutdown()
+
+ replay(workerTask);
+
+ workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.run();
+
+ workerTask.stop();
+ workerTask.cancel();
+ stopped.countDown();
+ thread.join();
+
+ verify(workerTask);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f33347a..0ca405e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
@@ -60,10 +60,13 @@ public class WorkerTest extends ThreadedTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+ private static final String WORKER_ID = "localhost:8083";
private WorkerConfig config;
private Worker worker;
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
+ private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
+ private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
@Before
public void setup() {
@@ -80,17 +83,14 @@ public class WorkerTest extends ThreadedTest {
}
@Test
- public void testAddRemoveConnector() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ public void testStartAndStopConnector() throws Exception {
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -105,24 +105,29 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Remove
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
+ expectStopStorage();
+
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
try {
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
fail("Should have thrown exception when trying to add connector with same name.");
} catch (ConnectException e) {
// expected
@@ -137,16 +142,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddConnectorByAlias() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -161,22 +163,26 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Remove
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
- PowerMock.replayAll();
+ expectStopStorage();
+ PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
worker.stopConnector(CONNECTOR_ID);
@@ -189,16 +195,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddConnectorByShortAlias() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -213,21 +216,26 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Remove
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
+ expectStopStorage();
+
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
worker.stopConnector(CONNECTOR_ID);
@@ -241,14 +249,11 @@ public class WorkerTest extends ThreadedTest {
@Test(expected = ConnectException.class)
public void testStopInvalidConnector() {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
worker.stopConnector(CONNECTOR_ID);
@@ -256,16 +261,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testReconfigureConnectorTasks() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -280,6 +282,9 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Reconfigure
EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
Map<String, String> taskProps = new HashMap<>();
@@ -290,20 +295,22 @@ public class WorkerTest extends ThreadedTest {
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
+ expectStopStorage();
+
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
try {
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
fail("Should have thrown exception when trying to add connector with same name.");
} catch (ConnectException e) {
// expected
@@ -327,23 +334,21 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddRemoveTask() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
-
- ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ expectStartStorage();
// Create
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+ EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
EasyMock.expect(task.version()).andReturn("1.0");
PowerMock.expectNew(
- WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+ WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+ EasyMock.eq(task),
+ EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
@@ -356,6 +361,8 @@ public class WorkerTest extends ThreadedTest {
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.initialize(origProps);
EasyMock.expectLastCall();
+ workerTask.run();
+ EasyMock.expectLastCall();
// Remove
workerTask.stop();
@@ -363,17 +370,16 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
- offsetBackingStore.stop();
- EasyMock.expectLastCall();
+ expectStopStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
assertEquals(Collections.emptySet(), worker.taskIds());
- worker.addTask(taskId, new TaskConfig(origProps));
- assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
- worker.stopTask(taskId);
+ worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+ assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
+ worker.stopAndAwaitTask(TASK_ID);
assertEquals(Collections.emptySet(), worker.taskIds());
// Nothing should be left, so this should effectively be a nop
worker.stop();
@@ -383,36 +389,33 @@ public class WorkerTest extends ThreadedTest {
@Test(expected = ConnectException.class)
public void testStopInvalidTask() {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
- worker.stopTask(TASK_ID);
+ worker.stopAndAwaitTask(TASK_ID);
}
@Test
public void testCleanupTasksOnStop() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+ EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
EasyMock.expect(task.version()).andReturn("1.0");
PowerMock.expectNew(
- WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
+ WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+ EasyMock.eq(task),
+ EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
@@ -425,27 +428,41 @@ public class WorkerTest extends ThreadedTest {
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.initialize(origProps);
EasyMock.expectLastCall();
+ workerTask.run();
+ EasyMock.expectLastCall();
// Remove on Worker.stop()
workerTask.stop();
EasyMock.expectLastCall();
+
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
// Note that in this case we *do not* commit offsets since it's an unclean shutdown
EasyMock.expectLastCall();
- offsetBackingStore.stop();
- EasyMock.expectLastCall();
+ expectStopStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
- worker.addTask(TASK_ID, new TaskConfig(origProps));
+ worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
worker.stop();
PowerMock.verifyAll();
}
+ private void expectStartStorage() {
+ offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+ EasyMock.expectLastCall();
+ offsetBackingStore.start();
+ EasyMock.expectLastCall();
+ }
+
+ private void expectStopStorage() {
+ offsetBackingStore.stop();
+ EasyMock.expectLastCall();
+ }
+
/* Name here needs to be unique as we are testing the aliasing mechanism */
private static class WorkerTestConnector extends Connector {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 76f9bc0..d439e96 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -31,6 +31,8 @@ import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
@@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue;
public class DistributedHerderTest {
private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
static {
+ HERDER_CONFIG.put(KafkaStatusBackingStore.STATUS_TOPIC_CONFIG, "status-topic");
HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
@@ -122,7 +125,10 @@ public class DistributedHerderTest {
private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ private static final String WORKER_ID = "localhost:8083";
+
@Mock private KafkaConfigStorage configStorage;
+ @Mock private StatusBackingStore statusBackingStore;
@Mock private WorkerGroupMember member;
private MockTime time;
private DistributedHerder herder;
@@ -139,11 +145,12 @@ public class DistributedHerderTest {
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
time = new MockTime();
- herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
- new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time);
+ herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
+ new DistributedConfig(HERDER_CONFIG), WORKER_ID, worker, statusBackingStore, configStorage, member, MEMBER_URL, time);
connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
+ PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
}
@Test
@@ -152,10 +159,11 @@ public class DistributedHerderTest {
EasyMock.expect(member.memberId()).andStubReturn("member");
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -168,17 +176,55 @@ public class DistributedHerderTest {
}
@Test
+ public void testRebalance() {
+ // Join group and get assignment
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+ expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+ expectPostRebalanceCatchup(SNAPSHOT);
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
+ 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
+
+ // and the new assignment started
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testHaltCleansUpWorker() {
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
- worker.stopTask(TASK1);
+ worker.stopTasks(Collections.singleton(TASK1));
+ PowerMock.expectLastCall();
+ worker.awaitStopTasks(Collections.singleton(TASK1));
PowerMock.expectLastCall();
member.stop();
PowerMock.expectLastCall();
configStorage.stop();
PowerMock.expectLastCall();
+ statusBackingStore.stop();
+ PowerMock.expectLastCall();
PowerMock.replayAll();
@@ -242,7 +288,8 @@ public class DistributedHerderTest {
// Start with one connector
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
@@ -286,7 +333,8 @@ public class DistributedHerderTest {
// Performs rebalance and gets new assignment
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -312,7 +360,8 @@ public class DistributedHerderTest {
// join
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -325,7 +374,8 @@ public class DistributedHerderTest {
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -360,8 +410,9 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// Performs rebalance and gets new assignment
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
- ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0));
- worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
+ ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
+ Arrays.asList(TASK0));
+ worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -381,7 +432,8 @@ public class DistributedHerderTest {
// Join group and as leader fail to do assignment
EasyMock.expect(member.memberId()).andStubReturn("leader");
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
- ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+ ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+ Collections.<ConnectorTaskId>emptyList());
// Reading to end of log times out
TestFuture<Void> readToEndFuture = new TestFuture<>();
readToEndFuture.resolveOnGet(new TimeoutException());
@@ -393,10 +445,11 @@ public class DistributedHerderTest {
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -457,7 +510,8 @@ public class DistributedHerderTest {
EasyMock.expect(member.memberId()).andStubReturn("leader");
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
@@ -485,7 +539,8 @@ public class DistributedHerderTest {
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
- worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
@@ -534,13 +589,19 @@ public class DistributedHerderTest {
}
- private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+ private void expectRebalance(final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks) {
expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);
}
// Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
- private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks,
- final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+ private void expectRebalance(final Collection<String> revokedConnectors,
+ final List<ConnectorTaskId> revokedTasks,
+ final short error,
+ final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks) {
member.ensureActive();
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
@@ -549,10 +610,30 @@ public class DistributedHerderTest {
rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
- rebalanceListener.onAssigned(assignment);
+ rebalanceListener.onAssigned(assignment, 0);
return null;
}
});
+
+ if (revokedConnectors != null) {
+ for (String connector : revokedConnectors) {
+ worker.stopConnector(connector);
+ PowerMock.expectLastCall();
+ }
+ }
+
+ if (revokedTasks != null && !revokedTasks.isEmpty()) {
+ worker.stopTasks(revokedTasks);
+ PowerMock.expectLastCall();
+ worker.awaitStopTasks(revokedTasks);
+ PowerMock.expectLastCall();
+ }
+
+ if (revokedConnectors != null) {
+ statusBackingStore.flush();
+ PowerMock.expectLastCall();
+ }
+
member.wakeup();
PowerMock.expectLastCall();
}