You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/10/13 19:23:43 UTC
[2/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of
Copycat configs.
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
index a74b39c..b5af1fe 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaOffsetBackingStore.java
@@ -17,39 +17,22 @@
package org.apache.kafka.copycat.storage;
-import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.ConsumerWakeupException;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.copycat.errors.CopycatException;
import org.apache.kafka.copycat.util.Callback;
import org.apache.kafka.copycat.util.ConvertingFutureCallback;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -70,99 +53,42 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
public final static String OFFSET_STORAGE_TOPIC_CONFIG = "offset.storage.topic";
- private final static long CREATE_TOPIC_TIMEOUT_MS = 30000;
-
- private Time time;
- private Map<String, ?> configs;
- private String topic;
- private Consumer<byte[], byte[]> consumer;
- private Producer<byte[], byte[]> producer;
+ private KafkaBasedLog<byte[], byte[]> offsetLog;
private HashMap<ByteBuffer, ByteBuffer> data;
- private Thread thread;
- private boolean stopRequested;
- private Queue<Callback<Void>> readLogEndOffsetCallbacks;
-
- public KafkaOffsetBackingStore() {
- this(new SystemTime());
- }
-
- public KafkaOffsetBackingStore(Time time) {
- this.time = time;
- }
-
@Override
public void configure(Map<String, ?> configs) {
- this.configs = configs;
- topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
+ String topic = (String) configs.get(OFFSET_STORAGE_TOPIC_CONFIG);
if (topic == null)
throw new CopycatException("Offset storage topic must be specified");
data = new HashMap<>();
- stopRequested = false;
- readLogEndOffsetCallbacks = new ArrayDeque<>();
- }
-
- @Override
- public void start() {
- log.info("Starting KafkaOffsetBackingStore with topic " + topic);
-
- producer = createProducer();
- consumer = createConsumer();
- List<TopicPartition> partitions = new ArrayList<>();
-
- // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
- // we rely on topic auto-creation
- List<PartitionInfo> partitionInfos = null;
- long started = time.milliseconds();
- while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
- partitionInfos = consumer.partitionsFor(topic);
- Utils.sleep(Math.min(time.milliseconds() - started, 1000));
- }
- if (partitionInfos == null)
- throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
- " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
- " this is your first use of the topic it may have taken too long to create.");
- for (PartitionInfo partition : partitionInfos)
- partitions.add(new TopicPartition(partition.topic(), partition.partition()));
- consumer.assign(partitions);
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.putAll(configs);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- readToLogEnd();
+ Map<String, Object> consumerProps = new HashMap<>();
+ consumerProps.putAll(configs);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- thread = new WorkThread();
- thread.start();
+ offsetLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
+ }
+ @Override
+ public void start() {
+ log.info("Starting KafkaOffsetBackingStore");
+ offsetLog.start();
log.info("Finished reading offsets topic and starting KafkaOffsetBackingStore");
}
@Override
public void stop() {
log.info("Stopping KafkaOffsetBackingStore");
-
- synchronized (this) {
- stopRequested = true;
- consumer.wakeup();
- }
-
- try {
- thread.join();
- } catch (InterruptedException e) {
- throw new CopycatException("Failed to stop KafkaOffsetBackingStore. Exiting without cleanly shutting " +
- "down it's producer and consumer.", e);
- }
-
- try {
- producer.close();
- } catch (KafkaException e) {
- log.error("Failed to close KafkaOffsetBackingStore producer", e);
- }
-
- try {
- consumer.close();
- } catch (KafkaException e) {
- log.error("Failed to close KafkaOffsetBackingStore consumer", e);
- }
+ offsetLog.stop();
+ log.info("Stopped KafkaOffsetBackingStore");
}
@Override
@@ -172,15 +98,16 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
@Override
public Map<ByteBuffer, ByteBuffer> convert(Void result) {
Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
- synchronized (KafkaOffsetBackingStore.this) {
- for (ByteBuffer key : keys)
- values.put(key, data.get(key));
- }
+ for (ByteBuffer key : keys)
+ values.put(key, data.get(key));
return values;
}
};
- readLogEndOffsetCallbacks.add(future);
- consumer.wakeup();
+ // This operation may be relatively (but not too) expensive since it always requires checking end offsets, even
+ // if we've already read up to the end. However, it also should not be common (offsets should only be read when
+ // resetting a task). Always requiring that we read to the end is simpler than trying to differentiate when it
+ // is safe not to (which should only be if we *know* we've maintained ownership since the last write).
+ offsetLog.readToEnd(future);
return future;
}
@@ -188,95 +115,26 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
public Future<Void> set(final Map<ByteBuffer, ByteBuffer> values, final Callback<Void> callback) {
SetCallbackFuture producerCallback = new SetCallbackFuture(values.size(), callback);
- for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
- producer.send(new ProducerRecord<>(topic, entry.getKey().array(), entry.getValue().array()), producerCallback);
- }
+ for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet())
+ offsetLog.send(entry.getKey().array(), entry.getValue().array(), producerCallback);
return producerCallback;
}
-
-
- private Producer<byte[], byte[]> createProducer() {
- Map<String, Object> producerProps = new HashMap<>();
- producerProps.putAll(configs);
- producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
- producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
- return new KafkaProducer<>(producerProps);
- }
-
- private Consumer<byte[], byte[]> createConsumer() {
- Map<String, Object> consumerConfig = new HashMap<>();
- consumerConfig.putAll(configs);
- consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
- consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
- consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
- return new KafkaConsumer<>(consumerConfig);
- }
-
- private void poll(long timeoutMs) {
- try {
- ConsumerRecords<byte[], byte[]> records = consumer.poll(timeoutMs);
- for (ConsumerRecord record : records) {
- ByteBuffer key = record.key() != null ? ByteBuffer.wrap((byte[]) record.key()) : null;
- ByteBuffer value = record.value() != null ? ByteBuffer.wrap((byte[]) record.value()) : null;
- data.put(key, value);
- }
- } catch (ConsumerWakeupException e) {
- // Expected on get() or stop(). The calling code should handle this
- throw e;
- } catch (KafkaException e) {
- log.error("Error polling: " + e);
- }
- }
-
- private void readToLogEnd() {
- log.trace("Reading to end of offset log");
-
- Set<TopicPartition> assignment = consumer.assignment();
-
- // This approach to getting the current end offset is hacky until we have an API for looking these up directly
- Map<TopicPartition, Long> offsets = new HashMap<>();
- for (TopicPartition tp : assignment) {
- long offset = consumer.position(tp);
- offsets.put(tp, offset);
- consumer.seekToEnd(tp);
- }
-
- Map<TopicPartition, Long> endOffsets = new HashMap<>();
- try {
- poll(0);
- } finally {
- // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
- // the consumers position is reset or it'll get into an inconsistent state.
- for (TopicPartition tp : assignment) {
- long startOffset = offsets.get(tp);
- long endOffset = consumer.position(tp);
- if (endOffset > startOffset) {
- endOffsets.put(tp, endOffset);
- consumer.seek(tp, startOffset);
- }
- log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
- }
+ private final Callback<ConsumerRecord<byte[], byte[]>> consumedCallback = new Callback<ConsumerRecord<byte[], byte[]>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<byte[], byte[]> record) {
+ ByteBuffer key = record.key() != null ? ByteBuffer.wrap(record.key()) : null;
+ ByteBuffer value = record.value() != null ? ByteBuffer.wrap(record.value()) : null;
+ data.put(key, value);
}
+ };
- while (!endOffsets.isEmpty()) {
- poll(Integer.MAX_VALUE);
-
- Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<TopicPartition, Long> entry = it.next();
- if (consumer.position(entry.getKey()) >= entry.getValue())
- it.remove();
- else
- break;
- }
- }
+ private KafkaBasedLog<byte[], byte[]> createKafkaBasedLog(String topic, Map<String, Object> producerProps,
+ Map<String, Object> consumerProps, Callback<ConsumerRecord<byte[], byte[]>> consumedCallback) {
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
}
-
private static class SetCallbackFuture implements org.apache.kafka.clients.producer.Callback, Future<Void> {
private int numLeft;
private boolean completed = false;
@@ -349,45 +207,5 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
}
- private class WorkThread extends Thread {
- @Override
- public void run() {
- try {
- while (true) {
- int numCallbacks;
- synchronized (KafkaOffsetBackingStore.this) {
- if (stopRequested)
- break;
- numCallbacks = readLogEndOffsetCallbacks.size();
- }
-
- if (numCallbacks > 0) {
- try {
- readToLogEnd();
- } catch (ConsumerWakeupException e) {
- // Either received another get() call and need to retry reading to end of log or stop() was
- // called. Both are handled by restarting this loop.
- continue;
- }
- }
- synchronized (KafkaOffsetBackingStore.this) {
- for (int i = 0; i < numCallbacks; i++) {
- Callback<Void> cb = readLogEndOffsetCallbacks.poll();
- cb.onCompletion(null, null);
- }
- }
-
- try {
- poll(Integer.MAX_VALUE);
- } catch (ConsumerWakeupException e) {
- // See previous comment, both possible causes of this wakeup are handled by starting this loop again
- continue;
- }
- }
- } catch (Throwable t) {
- log.error("Unexpected exception in KafkaOffsetBackingStore's work thread", t);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
index dbb3d0d..84229a5 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -92,7 +92,7 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
continue;
}
Map<String, T> origKey = serializedToOriginal.get(rawEntry.getKey());
- SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue().array());
+ SchemaAndValue deserializedSchemaAndValue = valueConverter.toCopycatData(namespace, rawEntry.getValue() != null ? rawEntry.getValue().array() : null);
Object deserializedValue = deserializedSchemaAndValue.value();
OffsetUtils.validateFormat(deserializedValue);
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
index bd3a87b..8d78a57 100644
--- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetUtils.java
@@ -25,6 +25,9 @@ import java.util.Map;
public class OffsetUtils {
public static void validateFormat(Object offsetData) {
+ if (offsetData == null)
+ return;
+
if (!(offsetData instanceof Map))
throw new DataException("Offsets must be specified as a Map");
validateFormat((Map<Object, Object>) offsetData);
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
new file mode 100644
index 0000000..5e860d9
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/KafkaBasedLog.java
@@ -0,0 +1,331 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerWakeupException;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * KafkaBasedLog provides a generic implementation of a shared, compacted log of records stored in Kafka that all
+ * clients need to consume and, at times, agree on their offset / that they have read to the end of the log.
+ * </p>
+ * <p>
+ * This functionality is useful for storing different types of data that all clients may need to agree on --
+ * offsets or config for example. This class runs a consumer in a background thread to continuously tail the target
+ * topic, accepts write requests which it writes to the topic using an internal producer, and provides some helpful
+ * utilities like checking the current log end offset and waiting until the current end of the log is reached.
+ * </p>
+ * <p>
+ * To support different use cases, this class works with either single- or multi-partition topics.
+ * </p>
+ * <p>
+ * Since this class is generic, it delegates the details of data storage via a callback that is invoked for each
+ * record that is consumed from the topic. The invocation of callbacks is guaranteed to be serialized -- if the
+ * calling class keeps track of state based on the log and only writes to it when consume callbacks are invoked
+ * and only reads it in {@link #readToEnd(Callback)} callbacks then no additional synchronization will be required.
+ * </p>
+ */
+public class KafkaBasedLog<K, V> {
+ private static final Logger log = LoggerFactory.getLogger(KafkaBasedLog.class);
+ private static final long CREATE_TOPIC_TIMEOUT_MS = 30000;
+
+ private Time time;
+ private final String topic;
+ private final Map<String, Object> producerConfigs;
+ private final Map<String, Object> consumerConfigs;
+ private final Callback<ConsumerRecord<K, V>> consumedCallback;
+ private Consumer<K, V> consumer;
+ private Producer<K, V> producer;
+
+ private Thread thread;
+ private boolean stopRequested;
+ private Queue<Callback<Void>> readLogEndOffsetCallbacks;
+
+ /**
+ * Create a new KafkaBasedLog object. This does not start reading the log and writing is not permitted until
+ * {@link #start()} is invoked.
+ *
+ * @param topic the topic to treat as a log
+ * @param producerConfigs configuration options to use when creating the internal producer. At a minimum this must
+ * contain compatible serializer settings for the generic types used on this class. Some
+ * setting, such as the number of acks, will be overridden to ensure correct behavior of this
+ * class.
+ * @param consumerConfigs configuration options to use when creating the internal consumer. At a minimum this must
+ * contain compatible serializer settings for the generic types used on this class. Some
+ * setting, such as the auto offset reset policy, will be overridden to ensure correct
+ * behavior of this class.
+ * @param consumedCallback callback to invoke for each {@link ConsumerRecord} consumed when tailing the log
+ * @param time Time interface
+ */
+ public KafkaBasedLog(String topic, Map<String, Object> producerConfigs, Map<String, Object> consumerConfigs,
+ Callback<ConsumerRecord<K, V>> consumedCallback, Time time) {
+ this.topic = topic;
+ this.producerConfigs = producerConfigs;
+ this.consumerConfigs = consumerConfigs;
+ this.consumedCallback = consumedCallback;
+ this.stopRequested = false;
+ this.readLogEndOffsetCallbacks = new ArrayDeque<>();
+ this.time = time;
+ }
+
+ public void start() {
+ log.info("Starting KafkaBasedLog with topic " + topic);
+
+ producer = createProducer();
+ consumer = createConsumer();
+
+ List<TopicPartition> partitions = new ArrayList<>();
+
+ // Until we have admin utilities we can use to check for the existence of this topic and create it if it is missing,
+ // we rely on topic auto-creation
+ List<PartitionInfo> partitionInfos = null;
+ long started = time.milliseconds();
+ while (partitionInfos == null && time.milliseconds() - started < CREATE_TOPIC_TIMEOUT_MS) {
+ partitionInfos = consumer.partitionsFor(topic);
+ Utils.sleep(Math.min(time.milliseconds() - started, 1000));
+ }
+ if (partitionInfos == null)
+ throw new CopycatException("Could not look up partition metadata for offset backing store topic in" +
+ " allotted period. This could indicate a connectivity issue, unavailable topic partitions, or if" +
+ " this is your first use of the topic it may have taken too long to create.");
+
+ for (PartitionInfo partition : partitionInfos)
+ partitions.add(new TopicPartition(partition.topic(), partition.partition()));
+ consumer.assign(partitions);
+
+ readToLogEnd();
+
+ thread = new WorkThread();
+ thread.start();
+
+ log.info("Finished reading KafakBasedLog for topic " + topic);
+
+ log.info("Started KafakBasedLog for topic " + topic);
+ }
+
+ public void stop() {
+ log.info("Stopping KafkaBasedLog for topic " + topic);
+
+ synchronized (this) {
+ stopRequested = true;
+ }
+ consumer.wakeup();
+
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ throw new CopycatException("Failed to stop KafkaBasedLog. Exiting without cleanly shutting " +
+ "down it's producer and consumer.", e);
+ }
+
+ try {
+ producer.close();
+ } catch (KafkaException e) {
+ log.error("Failed to stop KafkaBasedLog producer", e);
+ }
+
+ try {
+ consumer.close();
+ } catch (KafkaException e) {
+ log.error("Failed to stop KafkaBasedLog consumer", e);
+ }
+
+ log.info("Stopped KafkaBasedLog for topic " + topic);
+ }
+
+ /**
+ * Flushes any outstanding writes and then reads to the current end of the log and invokes the specified callback.
+ * Note that this checks the current, offsets, reads to them, and invokes the callback regardless of whether
+ * additional records have been written to the log. If the caller needs to ensure they have truly reached the end
+ * of the log, they must ensure there are no other writers during this period.
+ *
+ * This waits until the end of all partitions has been reached.
+ *
+ * This method is asynchronous. If you need a synchronous version, pass an instance of
+ * {@link org.apache.kafka.copycat.util.FutureCallback} as the {@param callback} parameter and wait on it to block.
+ *
+ * @param callback the callback to invoke once the end of the log has been reached.
+ */
+ public void readToEnd(Callback<Void> callback) {
+ producer.flush();
+ synchronized (this) {
+ readLogEndOffsetCallbacks.add(callback);
+ }
+ consumer.wakeup();
+ }
+
+ /**
+ * Same as {@link #readToEnd(Callback)} but provides a {@link Future} instead of using a callback.
+ * @return the future associated with the operation
+ */
+ public Future<Void> readToEnd() {
+ FutureCallback<Void> future = new FutureCallback<>(null);
+ readToEnd(future);
+ return future;
+ }
+
+ public void send(K key, V value) {
+ send(key, value, null);
+ }
+
+ public void send(K key, V value, org.apache.kafka.clients.producer.Callback callback) {
+ producer.send(new ProducerRecord<>(topic, key, value), callback);
+ }
+
+
+ private Producer<K, V> createProducer() {
+ // Always require producer acks to all to ensure durable writes
+ producerConfigs.put(ProducerConfig.ACKS_CONFIG, "all");
+ return new KafkaProducer<>(producerConfigs);
+ }
+
+ private Consumer<K, V> createConsumer() {
+ // Always force reset to the beginning of the log since this class wants to consume all available log data
+ consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new KafkaConsumer<>(consumerConfigs);
+ }
+
+ private void poll(long timeoutMs) {
+ try {
+ ConsumerRecords<K, V> records = consumer.poll(timeoutMs);
+ for (ConsumerRecord<K, V> record : records)
+ consumedCallback.onCompletion(null, record);
+ } catch (ConsumerWakeupException e) {
+ // Expected on get() or stop(). The calling code should handle this
+ throw e;
+ } catch (KafkaException e) {
+ log.error("Error polling: " + e);
+ }
+ }
+
+ private void readToLogEnd() {
+ log.trace("Reading to end of offset log");
+
+ Set<TopicPartition> assignment = consumer.assignment();
+
+ // This approach to getting the current end offset is hacky until we have an API for looking these up directly
+ Map<TopicPartition, Long> offsets = new HashMap<>();
+ for (TopicPartition tp : assignment) {
+ long offset = consumer.position(tp);
+ offsets.put(tp, offset);
+ consumer.seekToEnd(tp);
+ }
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ try {
+ poll(0);
+ } finally {
+ // If there is an exception, even a possibly expected one like ConsumerWakeupException, we need to make sure
+ // the consumers position is reset or it'll get into an inconsistent state.
+ for (TopicPartition tp : assignment) {
+ long startOffset = offsets.get(tp);
+ long endOffset = consumer.position(tp);
+ if (endOffset > startOffset) {
+ endOffsets.put(tp, endOffset);
+ consumer.seek(tp, startOffset);
+ }
+ log.trace("Reading to end of log for {}: starting offset {} to ending offset {}", tp, startOffset, endOffset);
+ }
+ }
+
+ while (!endOffsets.isEmpty()) {
+ poll(Integer.MAX_VALUE);
+
+ Iterator<Map.Entry<TopicPartition, Long>> it = endOffsets.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<TopicPartition, Long> entry = it.next();
+ if (consumer.position(entry.getKey()) >= entry.getValue())
+ it.remove();
+ else
+ break;
+ }
+ }
+ }
+
+
+ private class WorkThread extends Thread {
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ int numCallbacks;
+ synchronized (KafkaBasedLog.this) {
+ if (stopRequested)
+ break;
+ numCallbacks = readLogEndOffsetCallbacks.size();
+ }
+
+ if (numCallbacks > 0) {
+ try {
+ readToLogEnd();
+ } catch (ConsumerWakeupException e) {
+ // Either received another get() call and need to retry reading to end of log or stop() was
+ // called. Both are handled by restarting this loop.
+ continue;
+ }
+ }
+
+ synchronized (KafkaBasedLog.this) {
+ // Only invoke exactly the number of callbacks we found before triggering the read to log end
+ // since it is possible for another write + readToEnd to sneak in in the meantime
+ for (int i = 0; i < numCallbacks; i++) {
+ Callback<Void> cb = readLogEndOffsetCallbacks.poll();
+ cb.onCompletion(null, null);
+ }
+ }
+
+ try {
+ poll(Integer.MAX_VALUE);
+ } catch (ConsumerWakeupException e) {
+ // See previous comment, both possible causes of this wakeup are handled by starting this loop again
+ continue;
+ }
+ }
+ } catch (Throwable t) {
+ log.error("Unexpected exception in KafkaBasedLog's work thread", t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
index e4d1d8e..d33f846 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -97,10 +97,10 @@ public class WorkerSinkTaskTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.key.converter.schemas.enable", "false");
- workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+ workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+ workerProps.setProperty("internal.value.converter.schemas.enable", "false");
workerConfig = new WorkerConfig(workerProps);
workerTask = PowerMock.createPartialMock(
WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
index 3ff3a62..13d5228 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -92,10 +92,10 @@ public class WorkerSourceTaskTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.key.converter.schemas.enable", "false");
- workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+ workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+ workerProps.setProperty("internal.value.converter.schemas.enable", "false");
config = new WorkerConfig(workerProps);
producerCallbacks = EasyMock.newCapture();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
index e75d2f9..4a30992 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -57,10 +57,10 @@ public class WorkerTest extends ThreadedTest {
Properties workerProps = new Properties();
workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
- workerProps.setProperty("offset.key.converter.schemas.enable", "false");
- workerProps.setProperty("offset.value.converter.schemas.enable", "false");
+ workerProps.setProperty("internal.key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("internal.value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("internal.key.converter.schemas.enable", "false");
+ workerProps.setProperty("internal.value.converter.schemas.enable", "false");
config = new WorkerConfig(workerProps);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
new file mode 100644
index 0000000..0463b85
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerderTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.distributed;
+
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceConnector;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.KafkaConfigStorage;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({DistributedHerder.class})
+@PowerMockIgnore("javax.management.*")
+public class DistributedHerderTest {
+ private static final List<String> CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2", "sink-test3");
+ private static final List<String> SOURCE_CONNECTOR_NAMES = Arrays.asList("source-test1", "source-test2");
+ private static final List<String> SINK_CONNECTOR_NAMES = Arrays.asList("sink-test3");
+ private static final String TOPICS_LIST_STR = "topic1,topic2";
+
+ private static final Map<String, String> CONFIG_STORAGE_CONFIG = Collections.singletonMap(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, "config-topic");
+
+ @Mock private KafkaConfigStorage configStorage;
+ private DistributedHerder herder;
+ @Mock private Worker worker;
+ @Mock private Callback<String> createCallback;
+
+ private Map<String, Map<String, String>> connectorProps;
+ private Map<String, Class<? extends Connector>> connectorClasses;
+ private Map<String, Class<? extends Task>> connectorTaskClasses;
+ private Map<String, Connector> connectors;
+ private Properties taskProps;
+
+ @Before
+ public void setUp() {
+ worker = PowerMock.createMock(Worker.class);
+ herder = new DistributedHerder(worker, configStorage);
+
+ connectorProps = new HashMap<>();
+ connectorClasses = new HashMap<>();
+ connectorTaskClasses = new HashMap<>();
+ connectors = new HashMap<>();
+ for (String connectorName : CONNECTOR_NAMES) {
+ Class<? extends Connector> connectorClass = connectorName.contains("source") ? BogusSourceConnector.class : BogusSinkConnector.class;
+ Class<? extends Task> taskClass = connectorName.contains("source") ? BogusSourceTask.class : BogusSinkTask.class;
+ Connector connector = connectorName.contains("source") ? PowerMock.createMock(BogusSourceConnector.class) : PowerMock.createMock(BogusSinkConnector.class);
+
+ Map<String, String> props = new HashMap<>();
+ props.put(ConnectorConfig.NAME_CONFIG, connectorName);
+ props.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+ props.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connectorClass.getName());
+
+ connectorProps.put(connectorName, props);
+ connectorClasses.put(connectorName, connectorClass);
+ connectorTaskClasses.put(connectorName, taskClass);
+ connectors.put(connectorName, connector);
+ }
+
+ PowerMock.mockStatic(DistributedHerder.class);
+
+ // These can be anything since connectors can pass along whatever they want.
+ taskProps = new Properties();
+ taskProps.setProperty("foo", "bar");
+ }
+
+ @Test
+ public void testCreateSourceConnector() throws Exception {
+ String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+ expectConfigStorageConfigureStart();
+ expectEmptyRestore();
+ expectAdd(connectorName);
+ PowerMock.replayAll();
+
+ herder.configure(CONFIG_STORAGE_CONFIG);
+ herder.start();
+ herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateSinkConnector() throws Exception {
+ String connectorName = SINK_CONNECTOR_NAMES.get(0);
+
+ expectConfigStorageConfigureStart();
+ expectEmptyRestore();
+ expectAdd(connectorName);
+ PowerMock.replayAll();
+
+ herder.configure(CONFIG_STORAGE_CONFIG);
+ herder.start();
+ herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testDestroyConnector() throws Exception {
+ String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+ expectConfigStorageConfigureStart();
+ expectEmptyRestore();
+ expectAdd(connectorName);
+ expectDestroy(connectorName);
+ PowerMock.replayAll();
+
+ herder.configure(CONFIG_STORAGE_CONFIG);
+ herder.start();
+ herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+ FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+
+ }
+ });
+ herder.deleteConnector(CONNECTOR_NAMES.get(0), futureCb);
+ futureCb.get(1000L, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateAndStop() throws Exception {
+ String connectorName = SOURCE_CONNECTOR_NAMES.get(0);
+
+ expectConfigStorageConfigureStart();
+ expectEmptyRestore();
+ expectAdd(connectorName);
+ PowerMock.replayAll();
+
+ herder.configure(CONFIG_STORAGE_CONFIG);
+ herder.start();
+ herder.addConnector(connectorProps.get(connectorName), createCallback);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testRestoreAndStop() throws Exception {
+ String restoreConnectorName1 = SOURCE_CONNECTOR_NAMES.get(0);
+ String restoreConnectorName2 = SINK_CONNECTOR_NAMES.get(0);
+ String additionalConnectorName = SOURCE_CONNECTOR_NAMES.get(1);
+
+ expectConfigStorageConfigureStart();
+ expectRestore(Arrays.asList(restoreConnectorName1, restoreConnectorName2));
+ expectAdd(additionalConnectorName);
+ // Stopping the herder should correctly stop all restored and new connectors
+ expectStop(restoreConnectorName1);
+ expectStop(restoreConnectorName2);
+ expectStop(additionalConnectorName);
+ configStorage.stop();
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ herder.configure(CONFIG_STORAGE_CONFIG);
+ herder.start();
+ herder.addConnector(connectorProps.get(additionalConnectorName), createCallback);
+ herder.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ private void expectConfigStorageConfigureStart() {
+ configStorage.configure(CONFIG_STORAGE_CONFIG);
+ PowerMock.expectLastCall();
+ configStorage.start();
+ PowerMock.expectLastCall();
+ }
+
+ private void expectAdd(String connectorName) throws Exception {
+ configStorage.putConnectorConfig(connectorName, connectorProps.get(connectorName));
+ PowerMock.expectLastCall();
+ expectInstantiateConnector(connectorName, true);
+ }
+
+ private void expectEmptyRestore() throws Exception {
+ expectRestore(Collections.<String>emptyList());
+ }
+
+ private void expectRestore(List<String> connectorNames) throws Exception {
+ Map<String, Integer> rootConfig = new HashMap<>();
+ Map<String, Map<String, String>> connectorConfigs = new HashMap<>();
+ for (String connName : connectorNames) {
+ rootConfig.put(connName, 0);
+ connectorConfigs.put(connName, connectorProps.get(connName));
+ }
+ EasyMock.expect(configStorage.snapshot())
+ .andReturn(new ClusterConfigState(1, rootConfig, connectorConfigs, Collections.EMPTY_MAP, Collections.EMPTY_SET));
+
+ // Restore never uses a callback
+ for (String connectorName : connectorNames)
+ expectInstantiateConnector(connectorName, false);
+ }
+
+ private void expectInstantiateConnector(String connectorName, boolean expectCallback) throws Exception {
+ PowerMock.expectPrivate(DistributedHerder.class, "instantiateConnector", connectorClasses.get(connectorName).getName())
+ .andReturn(connectors.get(connectorName));
+ if (expectCallback) {
+ createCallback.onCompletion(null, connectorName);
+ PowerMock.expectLastCall();
+ }
+
+ Connector connector = connectors.get(connectorName);
+ connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
+ PowerMock.expectLastCall();
+ connector.start(new Properties());
+ PowerMock.expectLastCall();
+
+ // Just return the connector properties for the individual task we generate by default
+ EasyMock.<Class<? extends Task>>expect(connector.taskClass()).andReturn(connectorTaskClasses.get(connectorName));
+
+ EasyMock.expect(connector.taskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
+ .andReturn(Arrays.asList(taskProps));
+ // And we should instantiate the tasks. For a sink task, we should see added properties for
+ // the input topic partitions
+ Properties generatedTaskProps = new Properties();
+ generatedTaskProps.putAll(taskProps);
+ if (connectorName.contains("sink"))
+ generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+ ConnectorTaskId taskId = new ConnectorTaskId(connectorName, 0);
+ worker.addTask(taskId, connectorTaskClasses.get(connectorName).getName(), generatedTaskProps);
+ PowerMock.expectLastCall();
+ }
+
+ private void expectStop(String connectorName) {
+ worker.stopTask(new ConnectorTaskId(connectorName, 0));
+ EasyMock.expectLastCall();
+ Connector connector = connectors.get(connectorName);
+ connector.stop();
+ EasyMock.expectLastCall();
+ }
+
+ private void expectDestroy(String connectorName) {
+ expectStop(connectorName);
+ configStorage.putConnectorConfig(connectorName, null);
+ PowerMock.expectLastCall();
+ }
+
+ // We need to use a real class here due to some issue with mocking java.lang.Class
+ private abstract class BogusSourceConnector extends SourceConnector {
+ }
+
+ private abstract class BogusSourceTask extends SourceTask {
+ }
+
+ private abstract class BogusSinkConnector extends SinkConnector {
+ }
+
+ private abstract class BogusSinkTask extends SourceTask {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
index 477893b..606b94d 100644
--- a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -20,6 +20,7 @@ package org.apache.kafka.copycat.runtime.standalone;
import org.apache.kafka.copycat.connector.Connector;
import org.apache.kafka.copycat.connector.Task;
import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.HerderConnectorContext;
import org.apache.kafka.copycat.runtime.Worker;
import org.apache.kafka.copycat.sink.SinkConnector;
import org.apache.kafka.copycat.sink.SinkTask;
@@ -39,6 +40,8 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -54,7 +57,7 @@ public class StandaloneHerderTest {
private Connector connector;
@Mock protected Callback<String> createCallback;
- private Properties connectorProps;
+ private Map<String, String> connectorProps;
private Properties taskProps;
@Before
@@ -62,9 +65,9 @@ public class StandaloneHerderTest {
worker = PowerMock.createMock(Worker.class);
herder = new StandaloneHerder(worker);
- connectorProps = new Properties();
- connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
- connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+ connectorProps = new HashMap<>();
+ connectorProps.put(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+ connectorProps.put(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
PowerMock.mockStatic(StandaloneHerder.class);
// These can be anything since connectors can pass along whatever they want.
@@ -74,8 +77,8 @@ public class StandaloneHerderTest {
@Test
public void testCreateSourceConnector() throws Exception {
- connector = PowerMock.createMock(BogusSourceClass.class);
- expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+ connector = PowerMock.createMock(BogusSourceConnector.class);
+ expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
PowerMock.replayAll();
herder.addConnector(connectorProps, createCallback);
@@ -85,8 +88,8 @@ public class StandaloneHerderTest {
@Test
public void testCreateSinkConnector() throws Exception {
- connector = PowerMock.createMock(BogusSinkClass.class);
- expectAdd(BogusSinkClass.class, BogusSinkTask.class, true);
+ connector = PowerMock.createMock(BogusSinkConnector.class);
+ expectAdd(BogusSinkConnector.class, BogusSinkTask.class, true);
PowerMock.replayAll();
@@ -97,8 +100,8 @@ public class StandaloneHerderTest {
@Test
public void testDestroyConnector() throws Exception {
- connector = PowerMock.createMock(BogusSourceClass.class);
- expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+ connector = PowerMock.createMock(BogusSourceConnector.class);
+ expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
expectDestroy();
PowerMock.replayAll();
@@ -114,32 +117,31 @@ public class StandaloneHerderTest {
PowerMock.verifyAll();
}
+ @Test
+ public void testCreateAndStop() throws Exception {
+ connector = PowerMock.createMock(BogusSourceConnector.class);
+ expectAdd(BogusSourceConnector.class, BogusSourceTask.class, false);
+ expectStop();
+ PowerMock.replayAll();
- private void expectAdd(Class<? extends Connector> connClass,
- Class<? extends Task> taskClass,
- boolean sink) throws Exception {
- expectCreate(connClass, taskClass, sink, true);
- }
+ herder.addConnector(connectorProps, createCallback);
+ herder.stop();
- private void expectRestore(Class<? extends Connector> connClass,
- Class<? extends Task> taskClass) throws Exception {
- // Restore never uses a callback. These tests always use sources
- expectCreate(connClass, taskClass, false, false);
+ PowerMock.verifyAll();
}
- private void expectCreate(Class<? extends Connector> connClass,
- Class<? extends Task> taskClass,
- boolean sink, boolean expectCallback) throws Exception {
- connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+ private void expectAdd(Class<? extends Connector> connClass,
+ Class<? extends Task> taskClass,
+ boolean sink) throws Exception {
+ connectorProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
.andReturn(connector);
- if (expectCallback) {
- createCallback.onCompletion(null, CONNECTOR_NAME);
- PowerMock.expectLastCall();
- }
- connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
+ createCallback.onCompletion(null, CONNECTOR_NAME);
+ PowerMock.expectLastCall();
+
+ connector.initialize(EasyMock.anyObject(HerderConnectorContext.class));
PowerMock.expectLastCall();
connector.start(new Properties());
PowerMock.expectLastCall();
@@ -171,13 +173,13 @@ public class StandaloneHerderTest {
}
// We need to use a real class here due to some issue with mocking java.lang.Class
- private abstract class BogusSourceClass extends SourceConnector {
+ private abstract class BogusSourceConnector extends SourceConnector {
}
private abstract class BogusSourceTask extends SourceTask {
}
- private abstract class BogusSinkClass extends SinkConnector {
+ private abstract class BogusSinkConnector extends SinkConnector {
}
private abstract class BogusSinkTask extends SourceTask {
http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
new file mode 100644
index 0000000..b02b752
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/KafkaConfigStorageTest.java
@@ -0,0 +1,508 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.copycat.data.Field;
+import org.apache.kafka.copycat.data.Schema;
+import org.apache.kafka.copycat.data.SchemaAndValue;
+import org.apache.kafka.copycat.data.Struct;
+import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.KafkaBasedLog;
+import org.apache.kafka.copycat.util.TestFuture;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConfigStorage.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaConfigStorageTest {
+ private static final String TOPIC = "copycat-configs";
+ private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+
+ static {
+ DEFAULT_CONFIG_STORAGE_PROPS.put(KafkaConfigStorage.CONFIG_TOPIC_CONFIG, TOPIC);
+ DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+ }
+
+ private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
+ private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
+ private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
+
+ // Need a) connector with multiple tasks and b) multiple connectors
+ private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
+ new ConnectorTaskId("connector1", 0),
+ new ConnectorTaskId("connector1", 1),
+ new ConnectorTaskId("connector2", 0)
+ );
+ private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
+
+ // Need some placeholders -- the contents don't matter here, just that they are restored properly
+ private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
+ Collections.singletonMap("config-key-one", "config-value-one"),
+ Collections.singletonMap("config-key-two", "config-value-two"),
+ Collections.singletonMap("config-key-three", "config-value-three")
+ );
+ private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
+ new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+ new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
+ new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
+ );
+ private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
+ new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+ new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
+ );
+
+ private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
+ = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
+
+ // The exact format doesn't matter here since both conversions are mocked
+ private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
+ "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
+ "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
+ "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
+ );
+
+ @Mock
+ private Converter converter;
+ @Mock
+ private Callback<String> connectorReconfiguredCallback;
+ @Mock
+ private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback;
+ @Mock
+ KafkaBasedLog<String, byte[]> storeLog;
+ private KafkaConfigStorage configStorage;
+
+ private Capture<String> capturedTopic = EasyMock.newCapture();
+ private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+ private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+ private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+
+ private long logOffset = 0;
+
+ @Before
+ public void setUp() {
+ configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"},
+ converter, connectorReconfiguredCallback, tasksReconfiguredCallback);
+ }
+
+ @Test
+ public void testStartStop() throws Exception {
+ expectConfigure();
+ expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ assertEquals(TOPIC, capturedTopic.getValue());
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+ assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+ assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+ configStorage.start();
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPutConnectorConfig() throws Exception {
+ expectConfigure();
+ expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+ expectConvertWriteAndRead(
+ CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+ connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0));
+ EasyMock.expectLastCall();
+
+ expectConvertWriteAndRead(
+ CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ "properties", SAMPLE_CONFIGS.get(1));
+ connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
+ EasyMock.expectLastCall();
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.start();
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+ // Writing should block until it is written and read back from Kafka
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+ configState = configStorage.snapshot();
+ assertEquals(0, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+ // Second should also block and all configs should still be available
+ configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
+ configState = configStorage.snapshot();
+ assertEquals(1, configState.offset());
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPutTaskConfigs() throws Exception {
+ expectConfigure();
+ expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+ // Task configs should read to end, write to the log, read to end, write root, then read to end again
+ expectReadToEnd(new LinkedHashMap<String, byte[]>());
+ expectConvertWriteRead(
+ TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+ expectConvertWriteRead(
+ TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+ "properties", SAMPLE_CONFIGS.get(1));
+ expectReadToEnd(new LinkedHashMap<String, byte[]>());
+ expectConvertWriteRead(
+ COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ "tasks", 2); // Starts with 0 tasks, after update has 2
+ // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+ tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
+ EasyMock.expectLastCall();
+
+ // Records to be read by consumer as it reads to the end of the log
+ LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+ serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+ serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
+ serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+ expectReadToEnd(serializedConfigs);
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+
+ configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.start();
+
+ // Bootstrap as if we had already added the connector, but no tasks had been added yet
+ whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
+
+ // Null before writing
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(-1, configState.offset());
+ assertNull(configState.taskConfig(TASK_IDS.get(0)));
+ assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+ // Writing task task configs should block until all the writes have been performed and the root record update
+ // has completed
+ Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+ taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
+ taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
+ configStorage.putTaskConfigs(taskConfigs);
+
+ // Validate root config by listing all connectors and tasks
+ configState = configStorage.snapshot();
+ assertEquals(2, configState.offset());
+ String connectorName = CONNECTOR_IDS.get(0);
+ assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
+ assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testRestore() throws Exception {
+ // Restoring data should notify only of the latest values after loading is complete. This also validates
+ // that inconsistent state is ignored.
+
+ expectConfigure();
+ // Overwrite each type at least once to ensure we see the latest data after loading
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+ new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+ new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+ new ConsumerRecord<>(TOPIC, 0, 3, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+ new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+ // Connector after root update should make it through, task update shouldn't
+ new ConsumerRecord<>(TOPIC, 0, 5, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+ new ConsumerRecord<>(TOPIC, 0, 6, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
+ deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
+ deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
+ logOffset = 7;
+ expectStart(existingRecords, deserialized);
+
+ // Shouldn't see any callbacks since this is during startup
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.start();
+
+ // Should see a single connector and its config should be the last one seen anywhere in the log
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(6, configState.offset()); // Should always be last read, even if uncommitted
+ assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+ // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+ assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+ // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
+ assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
+ // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
+ // We start out by loading an initial configuration where we started to write a task update and failed before
+ // writing an the commit, and then compaction cleaned up the earlier record.
+
+ expectConfigure();
+ List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+ // This is the record that has been compacted:
+ //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+ new ConsumerRecord<>(TOPIC, 0, 2, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+ new ConsumerRecord<>(TOPIC, 0, 4, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+ new ConsumerRecord<>(TOPIC, 0, 5, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+ LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+ deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+ deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+ deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
+ logOffset = 6;
+ expectStart(existingRecords, deserialized);
+
+ // One failed attempt to write new task configs
+ expectReadToEnd(new LinkedHashMap<String, byte[]>());
+
+ // Successful attempt to write new task config
+ expectReadToEnd(new LinkedHashMap<String, byte[]>());
+ expectConvertWriteRead(
+ TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+ "properties", SAMPLE_CONFIGS.get(0));
+ expectReadToEnd(new LinkedHashMap<String, byte[]>());
+ expectConvertWriteRead(
+ COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+ "tasks", 1); // Updated to just 1 task
+ // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+ tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0)));
+ EasyMock.expectLastCall();
+ // Records to be read by consumer as it reads to the end of the log
+ LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+ serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+ serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+ expectReadToEnd(serializedConfigs);
+
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ configStorage.configure(DEFAULT_CONFIG_STORAGE_PROPS);
+ configStorage.start();
+ // After reading the log, it should have been in an inconsistent state
+ ClusterConfigState configState = configStorage.snapshot();
+ assertEquals(5, configState.offset()); // Should always be last read, not last committed
+ assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+ // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
+ assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
+ // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+ assertNull(configState.taskConfig(TASK_IDS.get(0)));
+ assertNull(configState.taskConfig(TASK_IDS.get(1)));
+ assertEquals(new HashSet<>(Arrays.asList(CONNECTOR_IDS.get(0))), configState.inconsistentConnectors());
+
+ // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
+ try {
+ configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
+ fail("Should have failed due to incomplete task set.");
+ } catch (KafkaException e) {
+ // expected
+ }
+
+ // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
+ // we are going to shrink the number of tasks to 1
+ configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+ // Validate updated config
+ configState = configStorage.snapshot();
+ // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
+ // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
+ assertEquals(7, configState.offset());
+ assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+ assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
+ assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+ assertEquals(new HashSet<>(Collections.EMPTY_LIST), configState.inconsistentConnectors());
+
+ configStorage.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ private void expectConfigure() throws Exception {
+ PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
+ EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+ EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+ .andReturn(storeLog);
+ }
+
+ // If non-empty, deserializations should be a LinkedHashMap
+ private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
+ final Map<byte[], Struct> deserializations) throws Exception {
+ storeLog.start();
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
+ capturedConsumedCallback.getValue().onCompletion(null, rec);
+ return null;
+ }
+ });
+ for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
+ // Note null schema because default settings for internal serialization are schema-less
+ EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
+ .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
+ }
+ }
+
+ private void expectStop() {
+ storeLog.stop();
+ PowerMock.expectLastCall();
+ }
+
+ // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
+ // from the log. Validate the data that is captured when the conversion is performed matches the specified data
+ // (by checking a single field's value)
+ private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+ final String dataFieldName, final Object dataFieldValue) {
+ final Capture<Struct> capturedRecord = EasyMock.newCapture();
+ EasyMock.expect(converter.fromCopycatData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+ .andReturn(serialized);
+ storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+ PowerMock.expectLastCall();
+ EasyMock.expect(converter.toCopycatData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
+ .andAnswer(new IAnswer<SchemaAndValue>() {
+ @Override
+ public SchemaAndValue answer() throws Throwable {
+ assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
+ // Note null schema because default settings for internal serialization are schema-less
+ return new SchemaAndValue(null, structToMap(capturedRecord.getValue()));
+ }
+ });
+ }
+
+ // This map needs to maintain ordering
+ private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
+ EasyMock.expect(storeLog.readToEnd())
+ .andAnswer(new IAnswer<Future<Void>>() {
+ @Override
+ public Future<Void> answer() throws Throwable {
+ TestFuture<Void> future = new TestFuture<Void>();
+ for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, entry.getKey(), entry.getValue()));
+ future.resolveOnGet((Void) null);
+ return future;
+ }
+ });
+ }
+
+
+ private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+ final String dataFieldName, final Object dataFieldValue) {
+ expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
+ LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+ recordsToRead.put(configKey, serialized);
+ expectReadToEnd(recordsToRead);
+ }
+
+ // Manually insert a connector into config storage, updating the task configs, connector config, and root config
+ private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
+ Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
+ for (int i = 0; i < taskConfigs.size(); i++)
+ storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
+
+ Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
+ connectorConfigs.put(connectorName, connectorConfig);
+
+ Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
+ }
+
+ // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
+ private Map<String, Object> structToMap(Struct struct) {
+ HashMap<String, Object> result = new HashMap<>();
+ for (Field field : struct.schema().fields())
+ result.put(field.name(), struct.get(field));
+ return result;
+ }
+
+}