You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/24 07:47:49 UTC
[2/3] kafka git commit: KAFKA-3093: Add Connect status tracking API
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
index 4b60131..9bd191e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java
@@ -20,6 +20,10 @@ package org.apache.kafka.connect.storage;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.Schema;
@@ -219,15 +223,14 @@ public class KafkaConfigStorage {
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(configs);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(configs);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 65bd9d0..dfb8c51 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -21,6 +21,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.util.Callback;
@@ -66,15 +68,14 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
Map<String, Object> producerProps = new HashMap<>();
producerProps.putAll(configs);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.putAll(configs);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
new file mode 100644
index 0000000..948a325
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaStatusBackingStore.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * StatusBackingStore implementation which uses a compacted topic for storage
+ * of connector and task status information. When a state change is observed,
+ * the new state is written to the compacted topic. The new state will not be
+ * visible until it has been read back from the topic.
+ *
+ * In spite of their names, the putSafe() methods cannot guarantee the safety
+ * of the write (since Kafka itself cannot provide such guarantees currently),
+ * but it can avoid specific unsafe conditions. In particular, we putSafe()
+ * allows writes in the following conditions:
+ *
+ * 3) It is (probably) safe to overwrite the state if there is no previous
+ * value.
+ * 1) It is (probably) safe to overwrite the state if the previous value was
+ * set by a worker with the same workerId.
+ * 2) It is (probably) safe to overwrite the previous state if the current
+ * generation is higher than the previous .
+ *
+ * Basically all these conditions do is reduce the window for conflicts. They
+ * obviously cannot take into account in-flight requests.
+ *
+ */
+public class KafkaStatusBackingStore implements StatusBackingStore {
+ private static final Logger log = LoggerFactory.getLogger(KafkaStatusBackingStore.class);
+
+ public static final String STATUS_TOPIC_CONFIG = "status.storage.topic";
+
+ private static final String TASK_STATUS_PREFIX = "status-task-";
+ private static final String CONNECTOR_STATUS_PREFIX = "status-connector-";
+
+ public static final String STATE_KEY_NAME = "state";
+ public static final String TRACE_KEY_NAME = "trace";
+ public static final String WORKER_ID_KEY_NAME = "worker_id";
+ public static final String GENERATION_KEY_NAME = "generation";
+
+ private static final Schema STATUS_SCHEMA_V0 = SchemaBuilder.struct()
+ .field(STATE_KEY_NAME, Schema.STRING_SCHEMA)
+ .field(TRACE_KEY_NAME, SchemaBuilder.string().optional().build())
+ .field(WORKER_ID_KEY_NAME, Schema.STRING_SCHEMA)
+ .field(GENERATION_KEY_NAME, Schema.INT32_SCHEMA)
+ .build();
+
+ private final Time time;
+ private final Converter converter;
+ private final Table<String, Integer, CacheEntry<TaskStatus>> tasks;
+ private final Map<String, CacheEntry<ConnectorStatus>> connectors;
+
+ private String topic;
+ private KafkaBasedLog<String, byte[]> kafkaLog;
+ private int generation;
+
+ public KafkaStatusBackingStore(Time time, Converter converter) {
+ this.time = time;
+ this.converter = converter;
+ this.tasks = new Table<>();
+ this.connectors = new HashMap<>();
+ }
+
+ // visible for testing
+ KafkaStatusBackingStore(Time time, Converter converter, String topic, KafkaBasedLog<String, byte[]> kafkaLog) {
+ this(time, converter);
+ this.kafkaLog = kafkaLog;
+ this.topic = topic;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ if (configs.get(STATUS_TOPIC_CONFIG) == null)
+ throw new ConnectException("Must specify topic for connector status.");
+ this.topic = (String) configs.get(STATUS_TOPIC_CONFIG);
+
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.putAll(configs);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ producerProps.put(ProducerConfig.RETRIES_CONFIG, 0); // we handle retries in this class
+
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(configs);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+
+ Callback<ConsumerRecord<String, byte[]>> readCallback = new Callback<ConsumerRecord<String, byte[]>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<String, byte[]> record) {
+ read(record);
+ }
+ };
+ this.kafkaLog = new KafkaBasedLog<>(topic, producerProps, consumerProps, readCallback, time);
+ }
+
+ @Override
+ public void start() {
+ kafkaLog.start();
+
+ // read to the end on startup to ensure that api requests see the most recent states
+ kafkaLog.readToEnd();
+ }
+
+ @Override
+ public void stop() {
+ kafkaLog.stop();
+ }
+
+ @Override
+ public void put(final ConnectorStatus status) {
+ sendConnectorStatus(status, false);
+ }
+
+ @Override
+ public void putSafe(final ConnectorStatus status) {
+ sendConnectorStatus(status, true);
+ }
+
+ @Override
+ public void put(final TaskStatus status) {
+ sendTaskStatus(status, false);
+ }
+
+ @Override
+ public void putSafe(final TaskStatus status) {
+ sendTaskStatus(status, true);
+ }
+
+ @Override
+ public void flush() {
+ kafkaLog.flush();
+ }
+
+ private void sendConnectorStatus(final ConnectorStatus status, boolean safeWrite) {
+ String connector = status.id();
+ CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+ String key = CONNECTOR_STATUS_PREFIX + connector;
+ send(key, status, entry, safeWrite);
+ }
+
+ private void sendTaskStatus(final TaskStatus status, boolean safeWrite) {
+ ConnectorTaskId taskId = status.id();
+ CacheEntry<TaskStatus> entry = getOrAdd(taskId);
+ String key = TASK_STATUS_PREFIX + taskId.connector() + "-" + taskId.task();
+ send(key, status, entry, safeWrite);
+ }
+
+ private <V extends AbstractStatus> void send(final String key,
+ final V status,
+ final CacheEntry<V> entry,
+ final boolean safeWrite) {
+ final int sequence;
+ synchronized (this) {
+ this.generation = status.generation();
+ if (safeWrite && !entry.canWrite(status))
+ return;
+ sequence = entry.increment();
+ }
+
+ final byte[] value = status.state() == ConnectorStatus.State.DESTROYED ? null : serialize(status);
+
+ kafkaLog.send(key, value, new org.apache.kafka.clients.producer.Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ if (exception instanceof RetriableException) {
+ synchronized (this) {
+ if (entry.isDeleted()
+ || status.generation() != generation
+ || (safeWrite && !entry.canWrite(status, sequence)))
+ return;
+ }
+ kafkaLog.send(key, value, this);
+ } else {
+ log.error("Failed to write status update", exception);
+ }
+ }
+ }
+ });
+ }
+
+ private synchronized CacheEntry<ConnectorStatus> getOrAdd(String connector) {
+ CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+ if (entry == null) {
+ entry = new CacheEntry<>();
+ connectors.put(connector, entry);
+ }
+ return entry;
+ }
+
+ private synchronized void remove(String connector) {
+ CacheEntry<ConnectorStatus> removed = connectors.remove(connector);
+ if (removed != null)
+ removed.delete();
+
+ Map<Integer, CacheEntry<TaskStatus>> tasks = this.tasks.remove(connector);
+ if (tasks != null) {
+ for (CacheEntry<TaskStatus> taskEntry : tasks.values())
+ taskEntry.delete();
+ }
+ }
+
+ private synchronized CacheEntry<TaskStatus> getOrAdd(ConnectorTaskId task) {
+ CacheEntry<TaskStatus> entry = tasks.get(task.connector(), task.task());
+ if (entry == null) {
+ entry = new CacheEntry<>();
+ tasks.put(task.connector(), task.task(), entry);
+ }
+ return entry;
+ }
+
+ private synchronized void remove(ConnectorTaskId id) {
+ CacheEntry<TaskStatus> removed = tasks.remove(id.connector(), id.task());
+ if (removed != null)
+ removed.delete();
+ }
+
+ @Override
+ public synchronized TaskStatus get(ConnectorTaskId id) {
+ CacheEntry<TaskStatus> entry = tasks.get(id.connector(), id.task());
+ return entry == null ? null : entry.get();
+ }
+
+ @Override
+ public synchronized ConnectorStatus get(String connector) {
+ CacheEntry<ConnectorStatus> entry = connectors.get(connector);
+ return entry == null ? null : entry.get();
+ }
+
+ @Override
+ public synchronized Collection<TaskStatus> getAll(String connector) {
+ List<TaskStatus> res = new ArrayList<>();
+ for (CacheEntry<TaskStatus> statusEntry : tasks.row(connector).values()) {
+ TaskStatus status = statusEntry.get();
+ if (status != null)
+ res.add(status);
+ }
+ return res;
+ }
+
+ @Override
+ public synchronized Set<String> connectors() {
+ return new HashSet<>(connectors.keySet());
+ }
+
+ private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
+ try {
+ SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+ if (!(schemaAndValue.value() instanceof Map)) {
+ log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+ return null;
+ }
+
+ Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+ TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+ String trace = (String) statusMap.get(TRACE_KEY_NAME);
+ String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+ int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+ return new ConnectorStatus(connector, state, trace, workerUrl, generation);
+ } catch (Exception e) {
+ log.error("Failed to deserialize connector status", e);
+ return null;
+ }
+ }
+
+ private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
+ try {
+ SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
+ if (!(schemaAndValue.value() instanceof Map)) {
+ log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
+ return null;
+ }
+ Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
+ TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
+ String trace = (String) statusMap.get(TRACE_KEY_NAME);
+ String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
+ int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
+ return new TaskStatus(taskId, state, workerUrl, generation, trace);
+ } catch (Exception e) {
+ log.error("Failed to deserialize task status", e);
+ return null;
+ }
+ }
+
+ private byte[] serialize(AbstractStatus status) {
+ Struct struct = new Struct(STATUS_SCHEMA_V0);
+ struct.put(STATE_KEY_NAME, status.state().name());
+ if (status.trace() != null)
+ struct.put(TRACE_KEY_NAME, status.trace());
+ struct.put(WORKER_ID_KEY_NAME, status.workerId());
+ struct.put(GENERATION_KEY_NAME, status.generation());
+ return converter.fromConnectData(topic, STATUS_SCHEMA_V0, struct);
+ }
+
+ private String parseConnectorStatusKey(String key) {
+ return key.substring(CONNECTOR_STATUS_PREFIX.length());
+ }
+
+ private ConnectorTaskId parseConnectorTaskId(String key) {
+ String[] parts = key.split("-");
+ if (parts.length < 4) return null;
+
+ try {
+ int taskNum = Integer.parseInt(parts[parts.length - 1]);
+ String connectorName = Utils.join(Arrays.copyOfRange(parts, 2, parts.length - 1), "-");
+ return new ConnectorTaskId(connectorName, taskNum);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid task status key {}", key);
+ return null;
+ }
+ }
+
+ private void readConnectorStatus(String key, byte[] value) {
+ String connector = parseConnectorStatusKey(key);
+ if (connector == null || connector.isEmpty()) {
+ log.warn("Discarding record with invalid connector status key {}", key);
+ return;
+ }
+
+ if (value == null) {
+ log.trace("Removing status for connector {}", connector);
+ remove(connector);
+ return;
+ }
+
+ ConnectorStatus status = parseConnectorStatus(connector, value);
+ if (status == null)
+ return;
+
+ synchronized (KafkaStatusBackingStore.this) {
+ log.trace("Received connector {} status update {}", connector, status);
+ CacheEntry<ConnectorStatus> entry = getOrAdd(connector);
+ entry.put(status);
+ }
+ }
+
+ private void readTaskStatus(String key, byte[] value) {
+ ConnectorTaskId id = parseConnectorTaskId(key);
+ if (id == null) {
+ log.warn("Discarding record with invalid task status key {}", key);
+ return;
+ }
+
+ if (value == null) {
+ log.trace("Removing task status for {}", id);
+ remove(id);
+ return;
+ }
+
+ TaskStatus status = parseTaskStatus(id, value);
+ if (status == null) {
+ log.warn("Failed to parse task status with key {}", key);
+ return;
+ }
+
+ synchronized (KafkaStatusBackingStore.this) {
+ log.trace("Received task {} status update {}", id, status);
+ CacheEntry<TaskStatus> entry = getOrAdd(id);
+ entry.put(status);
+ }
+ }
+
+ // visible for testing
+ void read(ConsumerRecord<String, byte[]> record) {
+ String key = record.key();
+ if (key.startsWith(CONNECTOR_STATUS_PREFIX)) {
+ readConnectorStatus(key, record.value());
+ } else if (key.startsWith(TASK_STATUS_PREFIX)) {
+ readTaskStatus(key, record.value());
+ } else {
+ log.warn("Discarding record with invalid key {}", key);
+ }
+ }
+
+ private static class CacheEntry<T extends AbstractStatus> {
+ private T value = null;
+ private int sequence = 0;
+ private boolean deleted = false;
+
+ public int increment() {
+ return ++sequence;
+ }
+
+ public void put(T value) {
+ this.value = value;
+ }
+
+ public T get() {
+ return value;
+ }
+
+ public void delete() {
+ this.deleted = true;
+ }
+
+ public boolean isDeleted() {
+ return deleted;
+ }
+
+ public boolean canWrite(T status) {
+ return value != null &&
+ (value.workerId().equals(status.workerId())
+ || value.generation() <= status.generation());
+ }
+
+ public boolean canWrite(T status, int sequence) {
+ return canWrite(status) && this.sequence == sequence;
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
new file mode 100644
index 0000000..96b235b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryStatusBackingStore.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.Table;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+public class MemoryStatusBackingStore implements StatusBackingStore {
+ private final Table<String, Integer, TaskStatus> tasks;
+ private final Map<String, ConnectorStatus> connectors;
+
+ public MemoryStatusBackingStore() {
+ this.tasks = new Table<>();
+ this.connectors = new HashMap<>();
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+
+ }
+
+ @Override
+ public void start() {
+
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public synchronized void put(ConnectorStatus status) {
+ if (status.state() == ConnectorStatus.State.DESTROYED)
+ connectors.remove(status.id());
+ else
+ connectors.put(status.id(), status);
+ }
+
+ @Override
+ public synchronized void putSafe(ConnectorStatus status) {
+ put(status);
+ }
+
+ @Override
+ public synchronized void put(TaskStatus status) {
+ if (status.state() == TaskStatus.State.DESTROYED)
+ tasks.remove(status.id().connector(), status.id().task());
+ else
+ tasks.put(status.id().connector(), status.id().task(), status);
+ }
+
+ @Override
+ public synchronized void putSafe(TaskStatus status) {
+ put(status);
+ }
+
+ @Override
+ public synchronized TaskStatus get(ConnectorTaskId id) {
+ return tasks.get(id.connector(), id.task());
+ }
+
+ @Override
+ public synchronized ConnectorStatus get(String connector) {
+ return connectors.get(connector);
+ }
+
+ @Override
+ public synchronized Collection<TaskStatus> getAll(String connector) {
+ return new HashSet<>(tasks.row(connector).values());
+ }
+
+ @Override
+ public synchronized Set<String> connectors() {
+ return new HashSet<>(connectors.keySet());
+ }
+
+ @Override
+ public void flush() {
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
new file mode 100644
index 0000000..6464f89
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/StatusBackingStore.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.connect.runtime.ConnectorStatus;
+import org.apache.kafka.connect.runtime.TaskStatus;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Collection;
+import java.util.Set;
+
+public interface StatusBackingStore extends Configurable {
+
+ /**
+ * Start dependent services (if needed)
+ */
+ void start();
+
+ /**
+ * Stop dependent services (if needed)
+ */
+ void stop();
+
+ /**
+ * Set the state of the connector to the given value.
+ * @param status the status of the connector
+ */
+ void put(ConnectorStatus status);
+
+ /**
+ * Safely set the state of the connector to the given value. What is
+ * considered "safe" depends on the implementation, but basically it
+ * means that the store can provide higher assurance that another worker
+ * hasn't concurrently written any conflicting data.
+ * @param status the status of the connector
+ */
+ void putSafe(ConnectorStatus status);
+
+ /**
+ * Set the state of the connector to the given value.
+ * @param status the status of the task
+ */
+ void put(TaskStatus status);
+
+ /**
+ * Safely set the state of the task to the given value. What is
+ * considered "safe" depends on the implementation, but basically it
+ * means that the store can provide higher assurance that another worker
+ * hasn't concurrently written any conflicting data.
+ * @param status the status of the task
+ */
+ void putSafe(TaskStatus status);
+
+ /**
+ * Get the current state of the task.
+ * @param id the id of the task
+ * @return the state or null if there is none
+ */
+ TaskStatus get(ConnectorTaskId id);
+
+ /**
+ * Get the current state of the connector.
+ * @param connector the connector name
+ * @return the state or null if there is none
+ */
+ ConnectorStatus get(String connector);
+
+ /**
+ * Get the states of all tasks for the given connector.
+ * @param connector the connector name
+ * @return a map from task ids to their respective status
+ */
+ Collection<TaskStatus> getAll(String connector);
+
+ /**
+ * Get all cached connectors.
+ * @return the set of connector names
+ */
+ Set<String> connectors();
+
+ /**
+ * Flush any pending writes
+ */
+ void flush();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
index c82645c..5ab60cd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java
@@ -99,8 +99,11 @@ public class KafkaBasedLog<K, V> {
* @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
* @param time Time interface
*/
- public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
- Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+ public KafkaBasedLog(String topic,
+ Map<String, Object> producerConfigs,
+ Map<String, Object> consumerConfigs,
+ Callback<ConsumerRecord<K, V>> consumedCallback,
+ Time time) {
this.topic = topic;
this.producerConfigs = producerConfigs;
this.consumerConfigs = consumerConfigs;
@@ -140,9 +143,9 @@ public class KafkaBasedLog<K, V> {
thread = new WorkThread();
thread.start();
- log.info("Finished reading KafakBasedLog for topic " + topic);
+ log.info("Finished reading KafkaBasedLog for topic " + topic);
- log.info("Started KafakBasedLog for topic " + topic);
+ log.info("Started KafkaBasedLog for topic " + topic);
}
public void stop() {
@@ -198,6 +201,13 @@ public class KafkaBasedLog<K, V> {
}
/**
+ * Flush the underlying producer to ensure that all pending writes have been sent.
+ */
+ public void flush() {
+ producer.flush();
+ }
+
+ /**
* Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
* @return the future associated with the operation
*/
@@ -219,12 +229,18 @@ public class KafkaBasedLog<K, V> {
private Producer<K, V> createProducer() {
// Always require producer acks to all to ensure durable writes
producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+
+ // Don't allow more than one in-flight request to prevent reordering on retry (if enabled)
+ producerConfigs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1);
return new KafkaProducer<>(producerConfigs);
}
private Consumer<K, V> createConsumer() {
// Always force reset to the beginning of the log since this class wants to consume all available log data
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+ // Turn off autocommit since we always want to consume the full log
+ consumerConfigs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new KafkaConsumer<>(consumerConfigs);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
new file mode 100644
index 0000000..f36d3e5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/Table.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class Table<R, C, V> {
+
+ private Map<R, Map<C, V>> table = new HashMap<>();
+
+ public V put(R row, C column, V value) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null) {
+ columns = new HashMap<>();
+ table.put(row, columns);
+ }
+ return columns.put(column, value);
+ }
+
+ public V get(R row, C column) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null)
+ return null;
+ return columns.get(column);
+ }
+
+ public Map<C, V> remove(R row) {
+ return table.remove(row);
+ }
+
+ public V remove(R row, C column) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null)
+ return null;
+
+ V value = columns.remove(column);
+ if (columns.isEmpty())
+ table.remove(row);
+ return value;
+ }
+
+ public Map<C, V> row(R row) {
+ Map<C, V> columns = table.get(row);
+ if (columns == null)
+ return Collections.emptyMap();
+ return Collections.unmodifiableMap(columns);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
new file mode 100644
index 0000000..f17023c
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StatusBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.EasyMockSupport;
+import org.easymock.IAnswer;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class AbstractHerderTest extends EasyMockSupport {
+
+ @Test
+ public void connectorStatus() {
+ String workerId = "workerId";
+ String connector = "connector";
+ int generation = 5;
+ ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
+
+ StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(StatusBackingStore.class, String.class)
+ .withArgs(store, workerId)
+ .addMockedMethod("generation")
+ .createMock();
+
+ EasyMock.expect(herder.generation()).andStubReturn(generation);
+
+ EasyMock.expect(store.get(connector))
+ .andReturn(new ConnectorStatus(connector, AbstractStatus.State.RUNNING, workerId, generation));
+
+ EasyMock.expect(store.getAll(connector))
+ .andReturn(Collections.singletonList(
+ new TaskStatus(taskId, AbstractStatus.State.UNASSIGNED, workerId, generation)));
+
+ replayAll();
+
+
+ ConnectorStateInfo state = herder.connectorStatus(connector);
+
+ assertEquals(connector, state.name());
+ assertEquals("RUNNING", state.connector().state());
+ assertEquals(1, state.tasks().size());
+ assertEquals(workerId, state.connector().workerId());
+
+ ConnectorStateInfo.TaskState taskState = state.tasks().get(0);
+ assertEquals(0, taskState.id());
+ assertEquals("UNASSIGNED", taskState.state());
+ assertEquals(workerId, taskState.workerId());
+
+ verifyAll();
+ }
+
+ @Test
+ public void taskStatus() {
+ ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
+ String workerId = "workerId";
+
+ StatusBackingStore store = strictMock(StatusBackingStore.class);
+
+ AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
+ .withConstructor(StatusBackingStore.class, String.class)
+ .withArgs(store, workerId)
+ .addMockedMethod("generation")
+ .createMock();
+
+ EasyMock.expect(herder.generation()).andStubReturn(5);
+
+ final Capture<TaskStatus> statusCapture = EasyMock.newCapture();
+ store.putSafe(EasyMock.capture(statusCapture));
+ EasyMock.expectLastCall();
+
+ EasyMock.expect(store.get(taskId)).andAnswer(new IAnswer<TaskStatus>() {
+ @Override
+ public TaskStatus answer() throws Throwable {
+ return statusCapture.getValue();
+ }
+ });
+
+ replayAll();
+
+ herder.onFailure(taskId, new RuntimeException());
+
+ ConnectorStateInfo.TaskState taskState = herder.taskStatus(taskId);
+ assertEquals(workerId, taskState.workerId());
+ assertEquals("FAILED", taskState.state());
+ assertEquals(0, taskState.id());
+ assertNotNull(taskState.trace());
+
+ verifyAll();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 978e3a1..6721609 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -98,6 +98,8 @@ public class WorkerSinkTaskTest {
@Mock
private Converter valueConverter;
@Mock
+ private TaskStatus.Listener statusListener;
+ @Mock
private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@@ -116,7 +118,7 @@ public class WorkerSinkTaskTest {
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
- taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+ taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index e202209..77f1ed0 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -97,11 +97,11 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
private Capture<WorkerSinkTaskContext> sinkTaskContext = EasyMock.newCapture();
private WorkerConfig workerConfig;
@Mock private Converter keyConverter;
- @Mock
- private Converter valueConverter;
+ @Mock private Converter valueConverter;
private WorkerSinkTask workerTask;
@Mock private KafkaConsumer<byte[], byte[]> consumer;
private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
+ @Mock private TaskStatus.Listener statusListener;
private long recordsReturned;
@@ -120,7 +120,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
workerConfig = new StandaloneConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer"},
- taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+ taskId, sinkTask, statusListener, workerConfig, keyConverter, valueConverter, time);
recordsReturned = 0;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index c6eb0c5..8e9eb72 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.api.easymock.annotation.MockStrict;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
@@ -90,6 +91,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Mock private OffsetStorageWriter offsetWriter;
private WorkerSourceTask workerTask;
@Mock private Future<RecordMetadata> sendFuture;
+ @MockStrict private TaskStatus.Listener statusListener;
private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
@@ -113,7 +115,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
private void createWorkerTask() {
- workerTask = new WorkerSourceTask(taskId, sourceTask, keyConverter, valueConverter, producer,
+ workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, keyConverter, valueConverter, producer,
offsetReader, offsetWriter, config, new SystemTime());
}
@@ -125,6 +127,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
final CountDownLatch pollLatch = expectPolls(10);
// In this test, we don't flush, so nothing goes any further than the offset writer
@@ -133,6 +137,42 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.initialize(EMPTY_TASK_PROPS);
+ executor.submit(workerTask);
+ awaitPolls(pollLatch);
+ workerTask.stop();
+ assertEquals(true, workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFailureInPoll() throws Exception {
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
+
+ final CountDownLatch pollLatch = expectPolls(1);
+ RuntimeException exception = new RuntimeException();
+ EasyMock.expect(sourceTask.poll()).andThrow(exception);
+
+ statusListener.onFailure(taskId, exception);
+ EasyMock.expectLastCall();
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
PowerMock.replayAll();
workerTask.initialize(EMPTY_TASK_PROPS);
@@ -153,6 +193,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
// We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1);
@@ -164,6 +206,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(EMPTY_TASK_PROPS);
@@ -185,6 +230,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
EasyMock.expectLastCall();
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
// We'll wait for some data, then trigger a flush
final CountDownLatch pollLatch = expectPolls(1);
@@ -194,6 +241,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
EasyMock.expectLastCall();
expectOffsetFlush(true);
+ statusListener.onShutdown(taskId);
+ EasyMock.expectLastCall();
+
PowerMock.replayAll();
workerTask.initialize(EMPTY_TASK_PROPS);
@@ -269,6 +319,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
EasyMock.expectLastCall();
sourceTask.start(EMPTY_TASK_PROPS);
+ statusListener.onStartup(taskId);
+ EasyMock.expectLastCall();
EasyMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index f5213a6..20e3fe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -18,11 +18,14 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.easymock.EasyMock;
+import org.easymock.IAnswer;
import org.junit.Test;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import static org.easymock.EasyMock.expectLastCall;
import static org.easymock.EasyMock.partialMockBuilder;
import static org.easymock.EasyMock.replay;
import static org.easymock.EasyMock.verify;
@@ -33,22 +36,32 @@ public class WorkerTaskTest {
@Test
public void standardStartup() {
+ ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+ TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class)
- .withArgs(new ConnectorTaskId("foo", 0))
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+ .withArgs(taskId, statusListener)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
.createStrictMock();
workerTask.initialize(EMPTY_TASK_PROPS);
- EasyMock.expectLastCall();
+ expectLastCall();
workerTask.execute();
- EasyMock.expectLastCall();
+ expectLastCall();
+
+ statusListener.onStartup(taskId);
+ expectLastCall();
workerTask.close();
- EasyMock.expectLastCall();
+ expectLastCall();
+
+ statusListener.onShutdown(taskId);
+ expectLastCall();
replay(workerTask);
@@ -62,9 +75,13 @@ public class WorkerTaskTest {
@Test
public void stopBeforeStarting() {
+ ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+ TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
- .withConstructor(ConnectorTaskId.class)
- .withArgs(new ConnectorTaskId("foo", 0))
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+ .withArgs(taskId, statusListener)
.addMockedMethod("initialize")
.addMockedMethod("execute")
.addMockedMethod("close")
@@ -88,5 +105,62 @@ public class WorkerTaskTest {
verify(workerTask);
}
+ @Test
+ public void cancelBeforeStopping() throws Exception {
+ ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
+
+ TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
+
+ WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
+ .withConstructor(ConnectorTaskId.class, TaskStatus.Listener.class)
+ .withArgs(taskId, statusListener)
+ .addMockedMethod("initialize")
+ .addMockedMethod("execute")
+ .addMockedMethod("close")
+ .createStrictMock();
+
+ final CountDownLatch stopped = new CountDownLatch(1);
+ final Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ stopped.await();
+ } catch (Exception e) {
+ }
+ }
+ };
+
+ workerTask.initialize(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+
+ workerTask.execute();
+ expectLastCall().andAnswer(new IAnswer<Void>() {
+ @Override
+ public Void answer() throws Throwable {
+ thread.start();
+ return null;
+ }
+ });
+
+ statusListener.onStartup(taskId);
+ expectLastCall();
+
+ workerTask.close();
+ expectLastCall();
+
+ // there should be no call to onShutdown()
+
+ replay(workerTask);
+
+ workerTask.initialize(EMPTY_TASK_PROPS);
+ workerTask.run();
+
+ workerTask.stop();
+ workerTask.cancel();
+ stopped.countDown();
+ thread.join();
+
+ verify(workerTask);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index f33347a..0ca405e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -17,8 +17,8 @@
package org.apache.kafka.connect.runtime;
-import org.apache.kafka.common.utils.Time;
import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.Task;
@@ -60,10 +60,13 @@ public class WorkerTest extends ThreadedTest {
private static final String CONNECTOR_ID = "test-connector";
private static final ConnectorTaskId TASK_ID = new ConnectorTaskId("job", 0);
+ private static final String WORKER_ID = "localhost:8083";
private WorkerConfig config;
private Worker worker;
private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
+ private TaskStatus.Listener taskStatusListener = PowerMock.createStrictMock(TaskStatus.Listener.class);
+ private ConnectorStatus.Listener connectorStatusListener = PowerMock.createStrictMock(ConnectorStatus.Listener.class);
@Before
public void setup() {
@@ -80,17 +83,14 @@ public class WorkerTest extends ThreadedTest {
}
@Test
- public void testAddRemoveConnector() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ public void testStartAndStopConnector() throws Exception {
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -105,24 +105,29 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Remove
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
+ expectStopStorage();
+
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
try {
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
fail("Should have thrown exception when trying to add connector with same name.");
} catch (ConnectException e) {
// expected
@@ -137,16 +142,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddConnectorByAlias() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -161,22 +163,26 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Remove
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
- PowerMock.replayAll();
+ expectStopStorage();
+ PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
worker.stopConnector(CONNECTOR_ID);
@@ -189,16 +195,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddConnectorByShortAlias() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -213,21 +216,26 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Remove
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
+ expectStopStorage();
+
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
worker.stopConnector(CONNECTOR_ID);
@@ -241,14 +249,11 @@ public class WorkerTest extends ThreadedTest {
@Test(expected = ConnectException.class)
public void testStopInvalidConnector() {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
worker.stopConnector(CONNECTOR_ID);
@@ -256,16 +261,13 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testReconfigureConnectorTasks() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
Connector connector = PowerMock.createMock(Connector.class);
ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateConnector");
PowerMock.expectPrivate(Worker.class, "instantiateConnector", new Object[]{WorkerTestConnector.class}).andReturn(connector);
EasyMock.expect(connector.version()).andReturn("1.0");
@@ -280,6 +282,9 @@ public class WorkerTest extends ThreadedTest {
connector.start(props);
EasyMock.expectLastCall();
+ connectorStatusListener.onStartup(CONNECTOR_ID);
+ EasyMock.expectLastCall();
+
// Reconfigure
EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(TestSourceTask.class);
Map<String, String> taskProps = new HashMap<>();
@@ -290,20 +295,22 @@ public class WorkerTest extends ThreadedTest {
connector.stop();
EasyMock.expectLastCall();
- offsetBackingStore.stop();
+ connectorStatusListener.onShutdown(CONNECTOR_ID);
EasyMock.expectLastCall();
+ expectStopStorage();
+
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
ConnectorConfig config = new ConnectorConfig(props);
assertEquals(Collections.emptySet(), worker.connectorNames());
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_ID)), worker.connectorNames());
try {
- worker.addConnector(config, ctx);
+ worker.startConnector(config, ctx, connectorStatusListener);
fail("Should have thrown exception when trying to add connector with same name.");
} catch (ConnectException e) {
// expected
@@ -327,23 +334,21 @@ public class WorkerTest extends ThreadedTest {
@Test
public void testAddRemoveTask() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
-
- ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ expectStartStorage();
// Create
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+ EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
EasyMock.expect(task.version()).andReturn("1.0");
PowerMock.expectNew(
- WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+ WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+ EasyMock.eq(task),
+ EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
@@ -356,6 +361,8 @@ public class WorkerTest extends ThreadedTest {
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.initialize(origProps);
EasyMock.expectLastCall();
+ workerTask.run();
+ EasyMock.expectLastCall();
// Remove
workerTask.stop();
@@ -363,17 +370,16 @@ public class WorkerTest extends ThreadedTest {
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
EasyMock.expectLastCall();
- offsetBackingStore.stop();
- EasyMock.expectLastCall();
+ expectStopStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
assertEquals(Collections.emptySet(), worker.taskIds());
- worker.addTask(taskId, new TaskConfig(origProps));
- assertEquals(new HashSet<>(Arrays.asList(taskId)), worker.taskIds());
- worker.stopTask(taskId);
+ worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
+ assertEquals(new HashSet<>(Arrays.asList(TASK_ID)), worker.taskIds());
+ worker.stopAndAwaitTask(TASK_ID);
assertEquals(Collections.emptySet(), worker.taskIds());
// Nothing should be left, so this should effectively be a nop
worker.stop();
@@ -383,36 +389,33 @@ public class WorkerTest extends ThreadedTest {
@Test(expected = ConnectException.class)
public void testStopInvalidTask() {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
- worker.stopTask(TASK_ID);
+ worker.stopAndAwaitTask(TASK_ID);
}
@Test
public void testCleanupTasksOnStop() throws Exception {
- offsetBackingStore.configure(EasyMock.anyObject(Map.class));
- EasyMock.expectLastCall();
- offsetBackingStore.start();
- EasyMock.expectLastCall();
+ expectStartStorage();
// Create
TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+ EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
- PowerMock.mockStatic(Worker.class);
+ PowerMock.mockStaticPartial(Worker.class, "instantiateTask");
PowerMock.expectPrivate(Worker.class, "instantiateTask", new Object[]{TestSourceTask.class}).andReturn(task);
EasyMock.expect(task.version()).andReturn("1.0");
PowerMock.expectNew(
- WorkerSourceTask.class, EasyMock.eq(TASK_ID), EasyMock.eq(task),
+ WorkerSourceTask.class, EasyMock.eq(TASK_ID),
+ EasyMock.eq(task),
+ EasyMock.anyObject(TaskStatus.Listener.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(Converter.class),
EasyMock.anyObject(KafkaProducer.class),
@@ -425,27 +428,41 @@ public class WorkerTest extends ThreadedTest {
origProps.put(TaskConfig.TASK_CLASS_CONFIG, TestSourceTask.class.getName());
workerTask.initialize(origProps);
EasyMock.expectLastCall();
+ workerTask.run();
+ EasyMock.expectLastCall();
// Remove on Worker.stop()
workerTask.stop();
EasyMock.expectLastCall();
+
EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
// Note that in this case we *do not* commit offsets since it's an unclean shutdown
EasyMock.expectLastCall();
- offsetBackingStore.stop();
- EasyMock.expectLastCall();
+ expectStopStorage();
PowerMock.replayAll();
- worker = new Worker(new MockTime(), config, offsetBackingStore);
+ worker = new Worker(WORKER_ID, new MockTime(), config, offsetBackingStore);
worker.start();
- worker.addTask(TASK_ID, new TaskConfig(origProps));
+ worker.startTask(TASK_ID, new TaskConfig(origProps), taskStatusListener);
worker.stop();
PowerMock.verifyAll();
}
+ private void expectStartStorage() {
+ offsetBackingStore.configure(EasyMock.anyObject(Map.class));
+ EasyMock.expectLastCall();
+ offsetBackingStore.start();
+ EasyMock.expectLastCall();
+ }
+
+ private void expectStopStorage() {
+ offsetBackingStore.stop();
+ EasyMock.expectLastCall();
+ }
+
/* Name here needs to be unique as we are testing the aliasing mechanism */
private static class WorkerTestConnector extends Connector {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f7d019ed/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index 76f9bc0..d439e96 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -31,6 +31,8 @@ import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.KafkaConfigStorage;
+import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
+import org.apache.kafka.connect.storage.StatusBackingStore;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.apache.kafka.connect.util.FutureCallback;
@@ -66,6 +68,7 @@ import static org.junit.Assert.assertTrue;
public class DistributedHerderTest {
private static final Map<String, String> HERDER_CONFIG = new HashMap<>();
static {
+ HERDER_CONFIG.put(KafkaStatusBackingStore.STATUS_TOPIC_CONFIG, "status-topic");
HERDER_CONFIG.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
HERDER_CONFIG.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
HERDER_CONFIG.put(DistributedConfig.GROUP_ID_CONFIG, "test-connect-group");
@@ -122,7 +125,10 @@ public class DistributedHerderTest {
private static final ClusterConfigState SNAPSHOT_UPDATED_CONN1_CONFIG = new ClusterConfigState(1, Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG_UPDATED), TASK_CONFIGS_MAP, Collections.<String>emptySet());
+ private static final String WORKER_ID = "localhost:8083";
+
@Mock private KafkaConfigStorage configStorage;
+ @Mock private StatusBackingStore statusBackingStore;
@Mock private WorkerGroupMember member;
private MockTime time;
private DistributedHerder herder;
@@ -139,11 +145,12 @@ public class DistributedHerderTest {
EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.FALSE);
time = new MockTime();
- herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff"},
- new DistributedConfig(HERDER_CONFIG), worker, configStorage, member, MEMBER_URL, time);
+ herder = PowerMock.createPartialMock(DistributedHerder.class, new String[]{"backoff", "updateDeletedConnectorStatus"},
+ new DistributedConfig(HERDER_CONFIG), WORKER_ID, worker, statusBackingStore, configStorage, member, MEMBER_URL, time);
connectorConfigCallback = Whitebox.invokeMethod(herder, "connectorConfigCallback");
taskConfigCallback = Whitebox.invokeMethod(herder, "taskConfigCallback");
rebalanceListener = Whitebox.invokeMethod(herder, "rebalanceListener");
+ PowerMock.expectPrivate(herder, "updateDeletedConnectorStatus").andVoid().anyTimes();
}
@Test
@@ -152,10 +159,11 @@ public class DistributedHerderTest {
EasyMock.expect(member.memberId()).andStubReturn("member");
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -168,17 +176,55 @@ public class DistributedHerderTest {
}
@Test
+ public void testRebalance() {
+ // Join group and get assignment
+ EasyMock.expect(member.memberId()).andStubReturn("member");
+ expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+ expectPostRebalanceCatchup(SNAPSHOT);
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
+ PowerMock.expectLastCall();
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1), ConnectProtocol.Assignment.NO_ERROR,
+ 1, Arrays.asList(CONN1), Arrays.<ConnectorTaskId>asList());
+
+ // and the new assignment started
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
+ PowerMock.expectLastCall();
+ EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
+ member.poll(EasyMock.anyInt());
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.tick();
+ herder.tick();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testHaltCleansUpWorker() {
EasyMock.expect(worker.connectorNames()).andReturn(Collections.singleton(CONN1));
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
EasyMock.expect(worker.taskIds()).andReturn(Collections.singleton(TASK1));
- worker.stopTask(TASK1);
+ worker.stopTasks(Collections.singleton(TASK1));
+ PowerMock.expectLastCall();
+ worker.awaitStopTasks(Collections.singleton(TASK1));
PowerMock.expectLastCall();
member.stop();
PowerMock.expectLastCall();
configStorage.stop();
PowerMock.expectLastCall();
+ statusBackingStore.stop();
+ PowerMock.expectLastCall();
PowerMock.replayAll();
@@ -242,7 +288,8 @@ public class DistributedHerderTest {
// Start with one connector
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
@@ -286,7 +333,8 @@ public class DistributedHerderTest {
// Performs rebalance and gets new assignment
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -312,7 +360,8 @@ public class DistributedHerderTest {
// join
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -325,7 +374,8 @@ public class DistributedHerderTest {
EasyMock.expect(configStorage.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
@@ -360,8 +410,9 @@ public class DistributedHerderTest {
PowerMock.expectLastCall();
// Performs rebalance and gets new assignment
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
- ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(), Arrays.asList(TASK0));
- worker.addTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject());
+ ConnectProtocol.Assignment.NO_ERROR, 1, Collections.<String>emptyList(),
+ Arrays.asList(TASK0));
+ worker.startTask(EasyMock.eq(TASK0), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -381,7 +432,8 @@ public class DistributedHerderTest {
// Join group and as leader fail to do assignment
EasyMock.expect(member.memberId()).andStubReturn("leader");
expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
- ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
+ ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+ Collections.<ConnectorTaskId>emptyList());
// Reading to end of log times out
TestFuture<Void> readToEndFuture = new TestFuture<>();
readToEndFuture.resolveOnGet(new TimeoutException());
@@ -393,10 +445,11 @@ public class DistributedHerderTest {
expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
- worker.addTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject());
+ worker.startTask(EasyMock.eq(TASK1), EasyMock.<TaskConfig>anyObject(), EasyMock.eq(herder));
PowerMock.expectLastCall();
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
@@ -457,7 +510,8 @@ public class DistributedHerderTest {
EasyMock.expect(member.memberId()).andStubReturn("leader");
expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
expectPostRebalanceCatchup(SNAPSHOT);
- worker.addConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.<ConnectorConfig>anyObject(), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
@@ -485,7 +539,8 @@ public class DistributedHerderTest {
worker.stopConnector(CONN1);
PowerMock.expectLastCall();
Capture<ConnectorConfig> capturedUpdatedConfig = EasyMock.newCapture();
- worker.addConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject());
+ worker.startConnector(EasyMock.capture(capturedUpdatedConfig), EasyMock.<ConnectorContext>anyObject(),
+ EasyMock.eq(herder));
PowerMock.expectLastCall();
EasyMock.expect(worker.connectorTaskConfigs(CONN1, MAX_TASKS, null)).andReturn(TASK_CONFIGS);
@@ -534,13 +589,19 @@ public class DistributedHerderTest {
}
- private void expectRebalance(final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+ private void expectRebalance(final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks) {
expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);
}
// Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
- private void expectRebalance(final Collection<String> revokedConnectors, final List<ConnectorTaskId> revokedTasks,
- final short error, final long offset, final List<String> assignedConnectors, final List<ConnectorTaskId> assignedTasks) {
+ private void expectRebalance(final Collection<String> revokedConnectors,
+ final List<ConnectorTaskId> revokedTasks,
+ final short error,
+ final long offset,
+ final List<String> assignedConnectors,
+ final List<ConnectorTaskId> assignedTasks) {
member.ensureActive();
PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
@Override
@@ -549,10 +610,30 @@ public class DistributedHerderTest {
rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
- rebalanceListener.onAssigned(assignment);
+ rebalanceListener.onAssigned(assignment, 0);
return null;
}
});
+
+ if (revokedConnectors != null) {
+ for (String connector : revokedConnectors) {
+ worker.stopConnector(connector);
+ PowerMock.expectLastCall();
+ }
+ }
+
+ if (revokedTasks != null && !revokedTasks.isEmpty()) {
+ worker.stopTasks(revokedTasks);
+ PowerMock.expectLastCall();
+ worker.awaitStopTasks(revokedTasks);
+ PowerMock.expectLastCall();
+ }
+
+ if (revokedConnectors != null) {
+ statusBackingStore.flush();
+ PowerMock.expectLastCall();
+ }
+
member.wakeup();
PowerMock.expectLastCall();
}