You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/15 01:01:07 UTC
[3/7] kafka git commit: KAFKA-2366; Initial patch for Copycat
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
new file mode 100644
index 0000000..7f8b7c2
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.util.Callback;
+
+import java.util.Properties;
+
+/**
+ * <p>
+ * The herder interface tracks and manages workers and connectors. It is the main interface for external components
+ * to make changes to the state of the cluster. For example, in distributed mode, an implementation of this class
+ * knows how to accept a connector configuration, may need to route it to the current leader worker for the cluster so
+ * the config can be written to persistent storage, and then ensures the new connector is correctly instantiated on one
+ * of the workers.
+ * </p>
+ * <p>
+ * This class must implement all the actions that can be taken on the cluster (add/remove connectors, pause/resume tasks,
+ * get state of connectors and tasks, etc). The non-Java interfaces to the cluster (REST API and CLI) are very simple
+ * wrappers of the functionality provided by this interface.
+ * </p>
+ * <p>
+ * In standalone mode, this implementation of this class will be trivial because no coordination is needed. In that case,
+ * the implementation will mainly be delegating tasks directly to other components. For example, when creating a new
+ * connector in standalone mode, there is no need to persist the config and the connector and its tasks must run in the
+ * same process, so the standalone herder implementation can immediately instantiate and start the connector and its
+ * tasks.
+ * </p>
+ */
+public interface Herder {
+
+ void start();
+
+ void stop();
+
+ /**
+ * Submit a connector job to the cluster. This works from any node by forwarding the request to
+ * the leader herder if necessary.
+ *
+ * @param connectorProps user-specified properties for this job
+ * @param callback callback to invoke when the request completes
+ */
+ void addConnector(Properties connectorProps, Callback<String> callback);
+
+ /**
+ * Delete a connector job by name.
+ *
+ * @param name name of the connector job to shutdown and delete
+ * @param callback callback to invoke when the request completes
+ */
+ void deleteConnector(String name, Callback<Void> callback);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
new file mode 100644
index 0000000..f47c984
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SinkTaskContextImpl.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+
+class SinkTaskContextImpl extends SinkTaskContext {
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
new file mode 100644
index 0000000..953cfa5
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/SourceTaskOffsetCommitter.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ * Manages offset commit scheduling and execution for SourceTasks.
+ * </p>
+ * <p>
+ * Unlike sink tasks which directly manage their offset commits in the main poll() thread since
+ * they drive the event loop and control (for all intents and purposes) the timeouts, source
+ * tasks are at the whim of the connector and cannot be guaranteed to wake up on the necessary
+ * schedule. Instead, this class tracks all the active tasks, their schedule for commits, and
+ * ensures they are invoked in a timely fashion.
+ * </p>
+ * <p>
+ * The current implementation uses a single thread to process commits and
+ * </p>
+ */
+class SourceTaskOffsetCommitter {
+ private static final Logger log = LoggerFactory.getLogger(SourceTaskOffsetCommitter.class);
+
+ private Time time;
+ private WorkerConfig config;
+ private ScheduledExecutorService commitExecutorService = null;
+ private HashMap<ConnectorTaskId, ScheduledFuture<?>> committers = new HashMap<>();
+
+ SourceTaskOffsetCommitter(Time time, WorkerConfig config) {
+ this.time = time;
+ this.config = config;
+ commitExecutorService = Executors.newSingleThreadScheduledExecutor();
+ }
+
+ public void close(long timeoutMs) {
+ commitExecutorService.shutdown();
+ try {
+ if (!commitExecutorService.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) {
+ log.error("Graceful shutdown of offset commitOffsets thread timed out.");
+ }
+ } catch (InterruptedException e) {
+ // ignore and allow to exit immediately
+ }
+ }
+
+ public void schedule(final ConnectorTaskId id, final WorkerSourceTask workerTask) {
+ long commitIntervalMs = config.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ ScheduledFuture<?> commitFuture = commitExecutorService.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ commit(workerTask);
+ }
+ }, commitIntervalMs, commitIntervalMs, TimeUnit.MILLISECONDS);
+ committers.put(id, commitFuture);
+ }
+
+ public void remove(ConnectorTaskId id) {
+ ScheduledFuture<?> commitFuture = committers.remove(id);
+ commitFuture.cancel(false);
+ }
+
+ public void commit(WorkerSourceTask workerTask) {
+ try {
+ log.debug("Committing offsets for {}", workerTask);
+ boolean success = workerTask.commitOffsets();
+ if (!success) {
+ log.error("Failed to commit offsets for {}", workerTask);
+ }
+ } catch (Throwable t) {
+ // We're very careful about exceptions here since any uncaught exceptions in the commit
+ // thread would cause the fixed interval schedule on the ExecutorService to stop running
+ // for that task
+ log.error("Unhandled exception when committing {}: ", workerTask, t);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
new file mode 100644
index 0000000..55d0784
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java
@@ -0,0 +1,236 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * <p>
+ * Worker runs a (dynamic) set of tasks in a set of threads, doing the work of actually moving
+ * data to/from Kafka.
+ * </p>
+ * <p>
+ * Since each task has a dedicated thread, this is mainly just a container for them.
+ * </p>
+ */
+public class Worker<K, V> {
+ private static final Logger log = LoggerFactory.getLogger(Worker.class);
+
+ private Time time;
+ private WorkerConfig config;
+ private Converter<K> keyConverter;
+ private Converter<V> valueConverter;
+ private OffsetBackingStore offsetBackingStore;
+ private Serializer<K> offsetKeySerializer;
+ private Serializer<V> offsetValueSerializer;
+ private Deserializer<K> offsetKeyDeserializer;
+ private Deserializer<V> offsetValueDeserializer;
+ private HashMap<ConnectorTaskId, WorkerTask> tasks = new HashMap<>();
+ private KafkaProducer<K, V> producer;
+ private SourceTaskOffsetCommitter sourceTaskOffsetCommitter;
+
+ public Worker(WorkerConfig config) {
+ this(new SystemTime(), config, null, null, null, null, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ public Worker(Time time, WorkerConfig config, OffsetBackingStore offsetBackingStore,
+ Serializer offsetKeySerializer, Serializer offsetValueSerializer,
+ Deserializer offsetKeyDeserializer, Deserializer offsetValueDeserializer) {
+ this.time = time;
+ this.config = config;
+ this.keyConverter = config.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
+ this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
+
+ if (offsetBackingStore != null) {
+ this.offsetBackingStore = offsetBackingStore;
+ } else {
+ this.offsetBackingStore = new FileOffsetBackingStore();
+ this.offsetBackingStore.configure(config.originals());
+ }
+
+ if (offsetKeySerializer != null) {
+ this.offsetKeySerializer = offsetKeySerializer;
+ } else {
+ this.offsetKeySerializer = config.getConfiguredInstance(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ this.offsetKeySerializer.configure(config.originals(), true);
+ }
+
+ if (offsetValueSerializer != null) {
+ this.offsetValueSerializer = offsetValueSerializer;
+ } else {
+ this.offsetValueSerializer = config.getConfiguredInstance(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class);
+ this.offsetValueSerializer.configure(config.originals(), false);
+ }
+
+ if (offsetKeyDeserializer != null) {
+ this.offsetKeyDeserializer = offsetKeyDeserializer;
+ } else {
+ this.offsetKeyDeserializer = config.getConfiguredInstance(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ this.offsetKeyDeserializer.configure(config.originals(), true);
+ }
+
+ if (offsetValueDeserializer != null) {
+ this.offsetValueDeserializer = offsetValueDeserializer;
+ } else {
+ this.offsetValueDeserializer = config.getConfiguredInstance(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Deserializer.class);
+ this.offsetValueDeserializer.configure(config.originals(), false);
+ }
+ }
+
+ public void start() {
+ log.info("Worker starting");
+
+ Properties unusedConfigs = config.getUnusedProperties();
+
+ Map<String, Object> producerProps = new HashMap<>();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(config.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName());
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, config.getClass(WorkerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName());
+ for (String propName : unusedConfigs.stringPropertyNames()) {
+ producerProps.put(propName, unusedConfigs.getProperty(propName));
+ }
+ producer = new KafkaProducer<>(producerProps);
+
+ offsetBackingStore.start();
+ sourceTaskOffsetCommitter = new SourceTaskOffsetCommitter(time, config);
+
+ log.info("Worker started");
+ }
+
+ public void stop() {
+ log.info("Worker stopping");
+
+ long started = time.milliseconds();
+ long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
+
+ for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+ WorkerTask task = entry.getValue();
+ log.warn("Shutting down task {} uncleanly; herder should have shut down "
+ + "tasks before the Worker is stopped.", task);
+ try {
+ task.stop();
+ } catch (CopycatException e) {
+ log.error("Error while shutting down task " + task, e);
+ }
+ }
+
+ for (Map.Entry<ConnectorTaskId, WorkerTask> entry : tasks.entrySet()) {
+ WorkerTask task = entry.getValue();
+ log.debug("Waiting for task {} to finish shutting down", task);
+ if (!task.awaitStop(Math.max(limit - time.milliseconds(), 0)))
+ log.error("Graceful shutdown of task {} failed.", task);
+ task.close();
+ }
+
+ long timeoutMs = limit - time.milliseconds();
+ sourceTaskOffsetCommitter.close(timeoutMs);
+
+ offsetBackingStore.start();
+
+ log.info("Worker stopped");
+ }
+
+ /**
+ * Add a new task.
+ * @param id Globally unique ID for this task.
+ * @param taskClassName name of the {@link org.apache.kafka.copycat.connector.Task}
+ * class to instantiate. Must be a subclass of either
+ * {@link org.apache.kafka.copycat.source.SourceTask} or
+ * {@link org.apache.kafka.copycat.sink.SinkTask}.
+ * @param props configuration options for the task
+ */
+ public void addTask(ConnectorTaskId id, String taskClassName, Properties props) {
+ if (tasks.containsKey(id)) {
+ String msg = "Task already exists in this worker; the herder should not have requested "
+ + "that this : " + id;
+ log.error(msg);
+ throw new CopycatException(msg);
+ }
+
+ final Task task = instantiateTask(taskClassName);
+
+ // Decide which type of worker task we need based on the type of task.
+ final WorkerTask workerTask;
+ if (task instanceof SourceTask) {
+ SourceTask sourceTask = (SourceTask) task;
+ OffsetStorageReader offsetReader = new OffsetStorageReaderImpl<>(offsetBackingStore, id.getConnector(),
+ keyConverter, valueConverter, offsetKeySerializer, offsetValueDeserializer);
+ OffsetStorageWriter<K, V> offsetWriter = new OffsetStorageWriter<>(offsetBackingStore, id.getConnector(),
+ keyConverter, valueConverter, offsetKeySerializer, offsetValueSerializer);
+ workerTask = new WorkerSourceTask<>(id, sourceTask, keyConverter, valueConverter, producer,
+ offsetReader, offsetWriter, config, time);
+ } else if (task instanceof SinkTask) {
+ workerTask = new WorkerSinkTask<>(id, (SinkTask) task, config, keyConverter, valueConverter, time);
+ } else {
+ log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
+ throw new CopycatException("Tasks must be a subclass of either SourceTask or SinkTask");
+ }
+
+ // Start the task before adding modifying any state, any exceptions are caught higher up the
+ // call chain and there's no cleanup to do here
+ workerTask.start(props);
+ tasks.put(id, workerTask);
+ }
+
+ private static Task instantiateTask(String taskClassName) {
+ try {
+ return Utils.newInstance(Class.forName(taskClassName).asSubclass(Task.class));
+ } catch (ClassNotFoundException e) {
+ throw new CopycatException("Task class not found", e);
+ }
+ }
+
+ public void stopTask(ConnectorTaskId id) {
+ WorkerTask task = getTask(id);
+ task.stop();
+ if (!task.awaitStop(config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG)))
+ log.error("Graceful stop of task {} failed.", task);
+ task.close();
+ tasks.remove(id);
+ }
+
+ private WorkerTask getTask(ConnectorTaskId id) {
+ WorkerTask task = tasks.get(id);
+ if (task == null) {
+ log.error("Task not found: " + id);
+ throw new CopycatException("Task not found: " + id);
+ }
+ return task;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
new file mode 100644
index 0000000..4eaf756
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTask.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * WorkerTask that uses a SinkTask to export data from Kafka.
+ */
+class WorkerSinkTask<K, V> implements WorkerTask {
+ private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
+
+ private final ConnectorTaskId id;
+ private final SinkTask task;
+ private final WorkerConfig workerConfig;
+ private final Time time;
+ private final Converter<K> keyConverter;
+ private final Converter<V> valueConverter;
+ private WorkerSinkTaskThread workThread;
+ private KafkaConsumer<K, V> consumer;
+ private final SinkTaskContext context;
+
+ public WorkerSinkTask(ConnectorTaskId id, SinkTask task, WorkerConfig workerConfig,
+ Converter<K> keyConverter, Converter<V> valueConverter, Time time) {
+ this.id = id;
+ this.task = task;
+ this.workerConfig = workerConfig;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ context = new SinkTaskContextImpl();
+ this.time = time;
+ }
+
+ @Override
+ public void start(Properties props) {
+ task.initialize(context);
+ task.start(props);
+ consumer = createConsumer(props);
+ workThread = createWorkerThread();
+ workThread.start();
+ }
+
+ @Override
+ public void stop() {
+ // Offset commit is handled upon exit in work thread
+ task.stop();
+ if (workThread != null)
+ workThread.startGracefulShutdown();
+ consumer.wakeup();
+ }
+
+ @Override
+ public boolean awaitStop(long timeoutMs) {
+ if (workThread != null) {
+ try {
+ boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
+ if (!success)
+ workThread.forceShutdown();
+ return success;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
+ // passed in
+ if (consumer != null)
+ consumer.close();
+ }
+
+ /** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
+ public void poll(long timeoutMs) {
+ try {
+ log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
+ ConsumerRecords<K, V> msgs = consumer.poll(timeoutMs);
+ log.trace("{} polling returned {} messages", id, msgs.count());
+ deliverMessages(msgs);
+ } catch (ConsumerWakeupException we) {
+ log.trace("{} consumer woken up", id);
+ }
+ }
+
+ /**
+ * Starts an offset commit by flushing outstanding messages from the task and then starting
+ * the write commit. This should only be invoked by the WorkerSinkTaskThread.
+ **/
+ public void commitOffsets(long now, boolean sync, final int seqno, boolean flush) {
+ HashMap<TopicPartition, Long> offsets = new HashMap<>();
+ for (TopicPartition tp : consumer.subscriptions()) {
+ offsets.put(tp, consumer.position(tp));
+ }
+ // We only don't flush the task in one case: when shutting down, the task has already been
+ // stopped and all data should have already been flushed
+ if (flush) {
+ try {
+ task.flush(offsets);
+ } catch (Throwable t) {
+ log.error("Commit of {} offsets failed due to exception while flushing: {}", this, t);
+ workThread.onCommitCompleted(t, seqno);
+ return;
+ }
+ }
+
+ ConsumerCommitCallback cb = new ConsumerCommitCallback() {
+ @Override
+ public void onComplete(Map<TopicPartition, Long> offsets, Exception error) {
+ workThread.onCommitCompleted(error, seqno);
+ }
+ };
+ consumer.commit(offsets, sync ? CommitType.SYNC : CommitType.ASYNC, cb);
+ }
+
+ public Time getTime() {
+ return time;
+ }
+
+ public WorkerConfig getWorkerConfig() {
+ return workerConfig;
+ }
+
+ private KafkaConsumer<K, V> createConsumer(Properties taskProps) {
+ String topicsStr = taskProps.getProperty(SinkTask.TOPICS_CONFIG);
+ if (topicsStr == null || topicsStr.isEmpty())
+ throw new CopycatException("Sink tasks require a list of topics.");
+ String[] topics = topicsStr.split(",");
+
+ // Include any unknown worker configs so consumer configs can be set globally on the worker
+ // and through to the task
+ Properties props = workerConfig.getUnusedProperties();
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "copycat-" + id.toString());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
+ Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
+ props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ workerConfig.getClass(WorkerConfig.KEY_DESERIALIZER_CLASS_CONFIG).getName());
+ props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ workerConfig.getClass(WorkerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).getName());
+
+ KafkaConsumer<K, V> newConsumer;
+ try {
+ newConsumer = new KafkaConsumer<>(props);
+ } catch (Throwable t) {
+ throw new CopycatException("Failed to create consumer", t);
+ }
+
+ log.debug("Task {} subscribing to topics {}", id, topics);
+ newConsumer.subscribe(topics);
+
+ // Seek to any user-provided offsets. This is useful if offsets are tracked in the downstream system (e.g., to
+ // enable exactly once delivery to that system).
+ //
+ // To do this correctly, we need to first make sure we have been assigned partitions, which poll() will guarantee.
+ // We ask for offsets after this poll to make sure any offsets committed before the rebalance are picked up correctly.
+ newConsumer.poll(0);
+ Map<TopicPartition, Long> offsets = context.getOffsets();
+ for (TopicPartition tp : newConsumer.subscriptions()) {
+ Long offset = offsets.get(tp);
+ if (offset != null)
+ newConsumer.seek(tp, offset);
+ }
+ return newConsumer;
+ }
+
+ private WorkerSinkTaskThread createWorkerThread() {
+ return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + id, time, workerConfig);
+ }
+
+ private void deliverMessages(ConsumerRecords<K, V> msgs) {
+ // Finally, deliver this batch to the sink
+ if (msgs.count() > 0) {
+ List<SinkRecord> records = new ArrayList<>();
+ for (ConsumerRecord<K, V> msg : msgs) {
+ log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
+ records.add(
+ new SinkRecord(msg.topic(), msg.partition(),
+ keyConverter.toCopycatData(msg.key()),
+ valueConverter.toCopycatData(msg.value()),
+ msg.offset())
+ );
+ }
+
+ try {
+ task.put(records);
+ } catch (CopycatException e) {
+ log.error("Exception from SinkTask {}: ", id, e);
+ } catch (Throwable t) {
+ log.error("Unexpected exception from SinkTask {}: ", id, t);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
new file mode 100644
index 0000000..b946343
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskThread.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Worker thread for a WorkerSinkTask. These classes are very tightly coupled, but separated to
+ * simplify testing.
+ */
+class WorkerSinkTaskThread extends ShutdownableThread {
+ private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
+
+ private final WorkerSinkTask task;
+ private long nextCommit;
+ private boolean committing;
+ private int commitSeqno;
+ private long commitStarted;
+ private int commitFailures;
+
+ public WorkerSinkTaskThread(WorkerSinkTask task, String name, Time time,
+ WorkerConfig workerConfig) {
+ super(name);
+ this.task = task;
+ this.nextCommit = time.milliseconds() +
+ workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ this.committing = false;
+ this.commitSeqno = 0;
+ this.commitStarted = -1;
+ this.commitFailures = 0;
+ }
+
+ @Override
+ public void execute() {
+ while (getRunning()) {
+ iteration();
+ }
+ // Make sure any uncommitted data has committed
+ task.commitOffsets(task.getTime().milliseconds(), true, -1, false);
+ }
+
+ public void iteration() {
+ long now = task.getTime().milliseconds();
+
+ // Maybe commit
+ if (!committing && now >= nextCommit) {
+ synchronized (this) {
+ committing = true;
+ commitSeqno += 1;
+ commitStarted = now;
+ }
+ task.commitOffsets(now, false, commitSeqno, true);
+ nextCommit += task.getWorkerConfig().getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
+ }
+
+ // Check for timed out commits
+ long commitTimeout = commitStarted + task.getWorkerConfig().getLong(
+ WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+ if (committing && now >= commitTimeout) {
+ log.warn("Commit of {} offsets timed out", this);
+ commitFailures++;
+ committing = false;
+ }
+
+ // And process messages
+ long timeoutMs = Math.max(nextCommit - now, 0);
+ task.poll(timeoutMs);
+ }
+
+ public void onCommitCompleted(Throwable error, long seqno) {
+ synchronized (this) {
+ if (commitSeqno != seqno) {
+ log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}",
+ this,
+ seqno, commitSeqno);
+ } else {
+ if (error != null) {
+ log.error("Commit of {} offsets threw an unexpected exception: ", this, error);
+ commitFailures++;
+ } else {
+ log.debug("Finished {} offset commit successfully in {} ms",
+ this, task.getTime().milliseconds() - commitStarted);
+ commitFailures = 0;
+ }
+ committing = false;
+ }
+ }
+ }
+
+ public int getCommitFailures() {
+ return commitFailures;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
new file mode 100644
index 0000000..c80adb4
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerSourceTask.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.ShutdownableThread;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * WorkerTask that uses a SourceTask to ingest data into Kafka.
+ */
+class WorkerSourceTask<K, V> implements WorkerTask {
+ private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
+
+ private ConnectorTaskId id;
+ private SourceTask task;
+ private final Converter<K> keyConverter;
+ private final Converter<V> valueConverter;
+ private KafkaProducer<K, V> producer;
+ private WorkerSourceTaskThread workThread;
+ private OffsetStorageReader offsetReader;
+ private OffsetStorageWriter<K, V> offsetWriter;
+ private final WorkerConfig workerConfig;
+ private final Time time;
+
+ // Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
+ // there is no IdentityHashSet.
+ private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
+ outstandingMessages;
+ // A second buffer is used while an offset flush is running
+ private IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>>
+ outstandingMessagesBacklog;
+ private boolean flushing;
+
+ public WorkerSourceTask(ConnectorTaskId id, SourceTask task,
+ Converter<K> keyConverter, Converter<V> valueConverter,
+ KafkaProducer<K, V> producer,
+ OffsetStorageReader offsetReader, OffsetStorageWriter<K, V> offsetWriter,
+ WorkerConfig workerConfig, Time time) {
+ this.id = id;
+ this.task = task;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ this.producer = producer;
+ this.offsetReader = offsetReader;
+ this.offsetWriter = offsetWriter;
+ this.workerConfig = workerConfig;
+ this.time = time;
+
+ this.outstandingMessages = new IdentityHashMap<>();
+ this.outstandingMessagesBacklog = new IdentityHashMap<>();
+ this.flushing = false;
+ }
+
+ @Override
+ public void start(Properties props) {
+ task.initialize(new SourceTaskContext(offsetReader));
+ task.start(props);
+ workThread = new WorkerSourceTaskThread("WorkerSourceTask-" + id);
+ workThread.start();
+ }
+
+ @Override
+ public void stop() {
+ task.stop();
+ commitOffsets();
+ if (workThread != null)
+ workThread.startGracefulShutdown();
+ }
+
+ @Override
+ public boolean awaitStop(long timeoutMs) {
+ if (workThread != null) {
+ try {
+ boolean success = workThread.awaitShutdown(timeoutMs, TimeUnit.MILLISECONDS);
+ if (!success)
+ workThread.forceShutdown();
+ return success;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do
+ }
+
+ /**
+ * Send a batch of records. This is atomic up to the point of getting the messages into the
+ * Producer and recorded in our set of outstanding messages, so either all or none will be sent
+ * @param records
+ */
+ private synchronized void sendRecords(List<SourceRecord> records) {
+ for (SourceRecord record : records) {
+ final ProducerRecord<K, V> producerRecord
+ = new ProducerRecord<>(record.getTopic(), record.getKafkaPartition(),
+ keyConverter.fromCopycatData(record.getKey()),
+ valueConverter.fromCopycatData(record.getValue()));
+ log.trace("Appending record with key {}, value {}", record.getKey(), record.getValue());
+ if (!flushing) {
+ outstandingMessages.put(producerRecord, producerRecord);
+ } else {
+ outstandingMessagesBacklog.put(producerRecord, producerRecord);
+ }
+ producer.send(
+ producerRecord,
+ new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
+ if (e != null) {
+ log.error("Failed to send record: ", e);
+ } else {
+ log.trace("Wrote record successfully: topic {} partition {} offset {}",
+ recordMetadata.topic(), recordMetadata.partition(),
+ recordMetadata.offset());
+ }
+ recordSent(producerRecord);
+ }
+ });
+ // Offsets are converted & serialized in the OffsetWriter
+ offsetWriter.setOffset(record.getSourcePartition(), record.getSourceOffset());
+ }
+ }
+
+ private synchronized void recordSent(final ProducerRecord<K, V> record) {
+ ProducerRecord<K, V> removed = outstandingMessages.remove(record);
+ // While flushing, we may also see callbacks for items in the backlog
+ if (removed == null && flushing)
+ removed = outstandingMessagesBacklog.remove(record);
+ // But if neither one had it, something is very wrong
+ if (removed == null) {
+ log.error("Saw callback for record that was not present in the outstanding message set: "
+ + "{}", record);
+ } else if (flushing && outstandingMessages.isEmpty()) {
+ // flush thread may be waiting on the outstanding messages to clear
+ this.notifyAll();
+ }
+ }
+
+ public boolean commitOffsets() {
+ long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
+
+ long started = time.milliseconds();
+ long timeout = started + commitTimeoutMs;
+
+ synchronized (this) {
+ // First we need to make sure we snapshot everything in exactly the current state. This
+ // means both the current set of messages we're still waiting to finish, stored in this
+ // class, which setting flushing = true will handle by storing any new values into a new
+ // buffer; and the current set of user-specified offsets, stored in the
+ // OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
+ flushing = true;
+ boolean flushStarted = offsetWriter.beginFlush();
+ // Still wait for any producer records to flush, even if there aren't any offsets to write
+ // to persistent storage
+
+ // Next we need to wait for all outstanding messages to finish sending
+ while (!outstandingMessages.isEmpty()) {
+ try {
+ long timeoutMs = timeout - time.milliseconds();
+ if (timeoutMs <= 0) {
+ log.error(
+ "Failed to flush {}, timed out while waiting for producer to flush outstanding "
+ + "messages", this.toString());
+ finishFailedFlush();
+ return false;
+ }
+ this.wait(timeoutMs);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+
+ if (!flushStarted) {
+ // There was nothing in the offsets to process, but we still waited for the data in the
+ // buffer to flush. This is useful since this can feed into metrics to monitor, e.g.
+ // flush time, which can be used for monitoring even if the connector doesn't record any
+ // offsets.
+ finishSuccessfulFlush();
+ log.debug("Finished {} offset commitOffsets successfully in {} ms",
+ this, time.milliseconds() - started);
+ return true;
+ }
+ }
+
+ // Now we can actually flush the offsets to user storage.
+ Future<Void> flushFuture = offsetWriter.doFlush(new org.apache.kafka.copycat.util.Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ if (error != null) {
+ log.error("Failed to flush {} offsets to storage: ", this, error);
+ } else {
+ log.trace("Finished flushing {} offsets to storage", this);
+ }
+ }
+ });
+ // Very rare case: offsets were unserializable and we finished immediately, unable to store
+ // any data
+ if (flushFuture == null) {
+ finishFailedFlush();
+ return false;
+ }
+ try {
+ flushFuture.get(Math.max(timeout - time.milliseconds(), 0), TimeUnit.MILLISECONDS);
+ // There's a small race here where we can get the callback just as this times out (and log
+ // success), but then catch the exception below and cancel everything. This won't cause any
+ // errors, is only wasteful in this minor edge case, and the worst result is that the log
+ // could look a little confusing.
+ } catch (InterruptedException e) {
+ log.warn("Flush of {} offsets interrupted, cancelling", this);
+ finishFailedFlush();
+ return false;
+ } catch (ExecutionException e) {
+ log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
+ finishFailedFlush();
+ return false;
+ } catch (TimeoutException e) {
+ log.error("Timed out waiting to flush {} offsets to storage", this);
+ finishFailedFlush();
+ return false;
+ }
+
+ finishSuccessfulFlush();
+ log.debug("Finished {} commitOffsets successfully in {} ms",
+ this, time.milliseconds() - started);
+ return true;
+ }
+
+ private synchronized void finishFailedFlush() {
+ offsetWriter.cancelFlush();
+ outstandingMessages.putAll(outstandingMessagesBacklog);
+ outstandingMessagesBacklog.clear();
+ flushing = false;
+ }
+
+ private void finishSuccessfulFlush() {
+ // If we were successful, we can just swap instead of replacing items back into the original map
+ IdentityHashMap<ProducerRecord<K, V>, ProducerRecord<K, V>> temp = outstandingMessages;
+ outstandingMessages = outstandingMessagesBacklog;
+ outstandingMessagesBacklog = temp;
+ flushing = false;
+ }
+
+
+ private class WorkerSourceTaskThread extends ShutdownableThread {
+ public WorkerSourceTaskThread(String name) {
+ super(name);
+ }
+
+ @Override
+ public void execute() {
+ try {
+ while (getRunning()) {
+ List<SourceRecord> records = task.poll();
+ if (records == null)
+ continue;
+ sendRecords(records);
+ }
+ } catch (InterruptedException e) {
+ // Ignore and allow to exit.
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "WorkerSourceTask{" +
+ "id=" + id +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
new file mode 100644
index 0000000..af225bb
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/WorkerTask.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import java.util.Properties;
+
+/**
+ * Handles processing for an individual task. This interface only provides the basic methods
+ * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with
+ * Kafka to create a data flow.
+ */
+interface WorkerTask {
+ /**
+ * Start the Task
+ * @param props initial configuration
+ */
+ void start(Properties props);
+
+ /**
+ * Stop this task from processing messages. This method does not block, it only triggers
+ * shutdown. Use #{@link #awaitStop} to block until completion.
+ */
+ void stop();
+
+ /**
+ * Wait for this task to finish stopping.
+ *
+ * @param timeoutMs
+ * @return true if successful, false if the timeout was reached
+ */
+ boolean awaitStop(long timeoutMs);
+
+ /**
+ * Close this task. This is different from #{@link #stop} and #{@link #awaitStop} in that the
+ * stop methods ensure processing has stopped but may leave resources allocated. This method
+ * should clean up all resources.
+ */
+ void close();
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
new file mode 100644
index 0000000..0e14015
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.copycat.connector.ConnectorContext;
+
+/**
+ * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks
+ * in a single process.
+ */
+class StandaloneConnectorContext implements ConnectorContext {
+
+ private StandaloneHerder herder;
+ private String connectorName;
+
+ public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) {
+ this.herder = herder;
+ this.connectorName = connectorName;
+ }
+
+ @Override
+ public void requestTaskReconfiguration() {
+ // This is trivial to forward since there is only one herder and it's in memory in this
+ // process
+ herder.requestTaskReconfiguration(connectorName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
new file mode 100644
index 0000000..2ed9183
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Herder;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+
+/**
+ * Single process, in-memory "herder". Useful for a standalone copycat process.
+ */
+public class StandaloneHerder implements Herder {
+ private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class);
+
+ private Worker worker;
+ private HashMap<String, ConnectorState> connectors = new HashMap<>();
+
+ public StandaloneHerder(Worker worker) {
+ this.worker = worker;
+ }
+
+ public synchronized void start() {
+ log.info("Herder starting");
+ log.info("Herder started");
+ }
+
+ public synchronized void stop() {
+ log.info("Herder stopping");
+
+ // There's no coordination/hand-off to do here since this is all standalone. Instead, we
+ // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all
+ // the tasks.
+ for (Map.Entry<String, ConnectorState> entry : connectors.entrySet()) {
+ ConnectorState state = entry.getValue();
+ stopConnector(state);
+ }
+ connectors.clear();
+
+ log.info("Herder stopped");
+ }
+
+ @Override
+ public synchronized void addConnector(Properties connectorProps,
+ Callback<String> callback) {
+ try {
+ ConnectorState connState = createConnector(connectorProps);
+ if (callback != null)
+ callback.onCompletion(null, connState.name);
+ // This should always be a new job, create jobs from scratch
+ createConnectorTasks(connState);
+ } catch (CopycatException e) {
+ if (callback != null)
+ callback.onCompletion(e, null);
+ }
+ }
+
+ @Override
+ public synchronized void deleteConnector(String name, Callback<Void> callback) {
+ try {
+ destroyConnector(name);
+ if (callback != null)
+ callback.onCompletion(null, null);
+ } catch (CopycatException e) {
+ if (callback != null)
+ callback.onCompletion(e, null);
+ }
+ }
+
+ // Creates the and configures the connector. Does not setup any tasks
+ private ConnectorState createConnector(Properties connectorProps) {
+ ConnectorConfig connConfig = new ConnectorConfig(connectorProps);
+ String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
+ String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
+ log.info("Creating connector {} of type {}", connName, className);
+ int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG);
+ List<String> topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only
+ Properties configs = connConfig.getUnusedProperties();
+
+ if (connectors.containsKey(connName)) {
+ log.error("Ignoring request to create connector due to conflicting connector name");
+ throw new CopycatException("Connector with name " + connName + " already exists");
+ }
+
+ final Connector connector;
+ try {
+ connector = instantiateConnector(className);
+ } catch (Throwable t) {
+ // Catches normal exceptions due to instantiation errors as well as any runtime errors that
+ // may be caused by user code
+ throw new CopycatException("Failed to create connector instance", t);
+ }
+ connector.initialize(new StandaloneConnectorContext(this, connName));
+ try {
+ connector.start(configs);
+ } catch (CopycatException e) {
+ throw new CopycatException("Connector threw an exception while starting", e);
+ }
+ ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics);
+ connectors.put(connName, state);
+
+ log.info("Finished creating connector {}", connName);
+
+ return state;
+ }
+
+ private static Connector instantiateConnector(String className) {
+ try {
+ return Utils.newInstance(className, Connector.class);
+ } catch (ClassNotFoundException e) {
+ throw new CopycatException("Couldn't instantiate connector class", e);
+ }
+ }
+
+ private void destroyConnector(String connName) {
+ log.info("Destroying connector {}", connName);
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ log.error("Failed to destroy connector {} because it does not exist", connName);
+ throw new CopycatException("Connector does not exist");
+ }
+
+ stopConnector(state);
+ connectors.remove(state.name);
+
+ log.info("Finished destroying connector {}", connName);
+ }
+
+ // Stops a connectors tasks, then the connector
+ private void stopConnector(ConnectorState state) {
+ removeConnectorTasks(state);
+ try {
+ state.connector.stop();
+ } catch (CopycatException e) {
+ log.error("Error shutting down connector {}: ", state.connector, e);
+ }
+ }
+
+ private void createConnectorTasks(ConnectorState state) {
+ String taskClassName = state.connector.getTaskClass().getName();
+
+ log.info("Creating tasks for connector {} of type {}", state.name, taskClassName);
+
+ List<Properties> taskConfigs = state.connector.getTaskConfigs(state.maxTasks);
+
+ // Generate the final configs, including framework provided settings
+ Map<ConnectorTaskId, Properties> taskProps = new HashMap<>();
+ for (int i = 0; i < taskConfigs.size(); i++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+ Properties config = taskConfigs.get(i);
+ // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics
+ // is automatically provided to tasks since it is required by the framework, but this
+ String subscriptionTopics = Utils.join(state.inputTopics, ",");
+ if (state.connector instanceof SinkConnector) {
+ // Make sure we don't modify the original since the connector may reuse it internally
+ Properties configForSink = new Properties();
+ configForSink.putAll(config);
+ configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics);
+ config = configForSink;
+ }
+ taskProps.put(taskId, config);
+ }
+
+ // And initiate the tasks
+ for (int i = 0; i < taskConfigs.size(); i++) {
+ ConnectorTaskId taskId = new ConnectorTaskId(state.name, i);
+ Properties config = taskProps.get(taskId);
+ try {
+ worker.addTask(taskId, taskClassName, config);
+ // We only need to store the task IDs so we can clean up.
+ state.tasks.add(taskId);
+ } catch (Throwable e) {
+ log.error("Failed to add task {}: ", taskId, e);
+ // Swallow this so we can continue updating the rest of the tasks
+ // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task
+ // that died after starting successfully.
+ }
+ }
+ }
+
+ private void removeConnectorTasks(ConnectorState state) {
+ Iterator<ConnectorTaskId> taskIter = state.tasks.iterator();
+ while (taskIter.hasNext()) {
+ ConnectorTaskId taskId = taskIter.next();
+ try {
+ worker.stopTask(taskId);
+ taskIter.remove();
+ } catch (CopycatException e) {
+ log.error("Failed to stop task {}: ", taskId, e);
+ // Swallow this so we can continue stopping the rest of the tasks
+ // FIXME: Forcibly kill the task?
+ }
+ }
+ }
+
+ private void updateConnectorTasks(ConnectorState state) {
+ removeConnectorTasks(state);
+ createConnectorTasks(state);
+ }
+
+ /**
+ * Requests reconfiguration of the task. This should only be triggered by
+ * {@link StandaloneConnectorContext}.
+ *
+ * @param connName name of the connector that should be reconfigured
+ */
+ public synchronized void requestTaskReconfiguration(String connName) {
+ ConnectorState state = connectors.get(connName);
+ if (state == null) {
+ log.error("Task that requested reconfiguration does not exist: {}", connName);
+ return;
+ }
+ updateConnectorTasks(state);
+ }
+
+
+ private static class ConnectorState {
+ public String name;
+ public Connector connector;
+ public int maxTasks;
+ public List<String> inputTopics;
+ Set<ConnectorTaskId> tasks;
+
+ public ConnectorState(String name, Connector connector, int maxTasks,
+ List<String> inputTopics) {
+ this.name = name;
+ this.connector = connector;
+ this.maxTasks = maxTasks;
+ this.inputTopics = inputTopics;
+ this.tasks = new HashSet<>();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
new file mode 100644
index 0000000..dfa9e78
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/FileOffsetBackingStore.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of OffsetBackingStore that saves data locally to a file. To ensure this behaves
+ * similarly to a real backing store, operations are executed asynchronously on a background thread.
+ */
+public class FileOffsetBackingStore extends MemoryOffsetBackingStore {
+ private static final Logger log = LoggerFactory.getLogger(FileOffsetBackingStore.class);
+
+ public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename";
+ private File file;
+
+ public FileOffsetBackingStore() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> props) {
+ super.configure(props);
+ String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG);
+ file = new File(filename);
+ }
+
+ @Override
+ public synchronized void start() {
+ super.start();
+ log.info("Starting FileOffsetBackingStore with file {}", file);
+ load();
+ }
+
+ @Override
+ public synchronized void stop() {
+ super.stop();
+ // Nothing to do since this doesn't maintain any outstanding connections/data
+ log.info("Stopped FileOffsetBackingStore");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void load() {
+ try {
+ ObjectInputStream is = new ObjectInputStream(new FileInputStream(file));
+ Object obj = is.readObject();
+ if (!(obj instanceof HashMap))
+ throw new CopycatException("Expected HashMap but found " + obj.getClass());
+ HashMap<String, Map<byte[], byte[]>> raw = (HashMap<String, Map<byte[], byte[]>>) obj;
+ data = new HashMap<>();
+ for (Map.Entry<String, Map<byte[], byte[]>> entry : raw.entrySet()) {
+ HashMap<ByteBuffer, ByteBuffer> converted = new HashMap<>();
+ for (Map.Entry<byte[], byte[]> mapEntry : entry.getValue().entrySet()) {
+ ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null;
+ ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) :
+ null;
+ converted.put(key, value);
+ }
+ data.put(entry.getKey(), converted);
+ }
+ is.close();
+ } catch (FileNotFoundException | EOFException e) {
+ // FileNotFoundException: Ignore, may be new.
+ // EOFException: Ignore, this means the file was missing or corrupt
+ } catch (IOException | ClassNotFoundException e) {
+ throw new CopycatException(e);
+ }
+ }
+
+ protected void save() {
+ try {
+ ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file));
+ HashMap<String, Map<byte[], byte[]>> raw = new HashMap<>();
+ for (Map.Entry<String, Map<ByteBuffer, ByteBuffer>> entry : data.entrySet()) {
+ HashMap<byte[], byte[]> converted = new HashMap<>();
+ for (Map.Entry<ByteBuffer, ByteBuffer> mapEntry : entry.getValue().entrySet()) {
+ byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null;
+ byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null;
+ converted.put(key, value);
+ }
+ raw.put(entry.getKey(), converted);
+ }
+ os.writeObject(raw);
+ os.close();
+ } catch (IOException e) {
+ throw new CopycatException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
new file mode 100644
index 0000000..6ffba58
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/MemoryOffsetBackingStore.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * Implementation of OffsetBackingStore that doesn't actually persist any data. To ensure this
+ * behaves similarly to a real backing store, operations are executed asynchronously on a
+ * background thread.
+ */
+public class MemoryOffsetBackingStore implements OffsetBackingStore {
+ private static final Logger log = LoggerFactory.getLogger(MemoryOffsetBackingStore.class);
+
+ protected HashMap<String, Map<ByteBuffer, ByteBuffer>> data = new HashMap<>();
+ protected ExecutorService executor = Executors.newSingleThreadExecutor();
+
+ public MemoryOffsetBackingStore() {
+
+ }
+
+ @Override
+ public void configure(Map<String, ?> props) {
+ }
+
+ @Override
+ public synchronized void start() {
+ }
+
+ @Override
+ public synchronized void stop() {
+ // Nothing to do since this doesn't maintain any outstanding connections/data
+ }
+
+ @Override
+ public Future<Map<ByteBuffer, ByteBuffer>> get(
+ final String namespace, final Collection<ByteBuffer> keys,
+ final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+ return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
+ @Override
+ public Map<ByteBuffer, ByteBuffer> call() throws Exception {
+ Map<ByteBuffer, ByteBuffer> result = new HashMap<>();
+ synchronized (MemoryOffsetBackingStore.this) {
+ Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
+ if (namespaceData == null)
+ return result;
+ for (ByteBuffer key : keys) {
+ result.put(key, namespaceData.get(key));
+ }
+ }
+ if (callback != null)
+ callback.onCompletion(null, result);
+ return result;
+ }
+ });
+
+ }
+
+ @Override
+ public Future<Void> set(final String namespace, final Map<ByteBuffer, ByteBuffer> values,
+ final Callback<Void> callback) {
+ return executor.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ synchronized (MemoryOffsetBackingStore.this) {
+ Map<ByteBuffer, ByteBuffer> namespaceData = data.get(namespace);
+ if (namespaceData == null) {
+ namespaceData = new HashMap<>();
+ data.put(namespace, namespaceData);
+ }
+ for (Map.Entry<ByteBuffer, ByteBuffer> entry : values.entrySet()) {
+ namespaceData.put(entry.getKey(), entry.getValue());
+ }
+ save();
+ }
+ if (callback != null)
+ callback.onCompletion(null, null);
+ return null;
+ }
+ });
+ }
+
+ // Hook to allow subclasses to persist data
+ protected void save() {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
new file mode 100644
index 0000000..e8cb2ae
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetBackingStore.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.copycat.util.Callback;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * OffsetBackingStore is an interface for storage backends that store key-value data. The backing
+ * store doesn't need to handle serialization or deserialization. It only needs to support
+ * reading/writing bytes. Since it is expected these operations will require network
+ * operations, only bulk operations are supported.
+ * </p>
+ * <p>
+ * Since OffsetBackingStore is a shared resource that may be used by many OffsetStorage instances
+ * that are associated with individual tasks, all operations include a namespace which should be
+ * used to isolate different key spaces.
+ * </p>
+ */
+public interface OffsetBackingStore extends Configurable {
+
+ /**
+ * Start this offset store.
+ */
+ public void start();
+
+ /**
+ * Stop the backing store. Implementations should attempt to shutdown gracefully, but not block
+ * indefinitely.
+ */
+ public void stop();
+
+ /**
+ * Get the values for the specified keys
+ * @param namespace prefix for the keys in this request
+ * @param keys list of keys to look up
+ * @param callback callback to invoke on completion
+ * @return future for the resulting map from key to value
+ */
+ public Future<Map<ByteBuffer, ByteBuffer>> get(
+ String namespace, Collection<ByteBuffer> keys,
+ Callback<Map<ByteBuffer, ByteBuffer>> callback);
+
+ /**
+ * Set the specified keys and values.
+ * @param namespace prefix for the keys in this request
+ * @param values map from key to value
+ * @param callback callback to invoke on completion
+ * @return void future for the operation
+ */
+ public Future<Void> set(String namespace, Map<ByteBuffer, ByteBuffer> values,
+ Callback<Void> callback);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
new file mode 100644
index 0000000..7a050dc
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageReaderImpl.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented
+ * directly, the interface is only separate from this implementation because it needs to be
+ * included in the public API package.
+ */
+public class OffsetStorageReaderImpl<K, V> implements OffsetStorageReader {
+ private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
+
+ private final OffsetBackingStore backingStore;
+ private final String namespace;
+ private final Converter<K> keyConverter;
+ private final Converter<V> valueConverter;
+ private final Serializer<K> keySerializer;
+ private final Deserializer<V> valueDeserializer;
+
+ public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
+ Converter<K> keyConverter, Converter<V> valueConverter,
+ Serializer<K> keySerializer, Deserializer<V> valueDeserializer) {
+ this.backingStore = backingStore;
+ this.namespace = namespace;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ this.keySerializer = keySerializer;
+ this.valueDeserializer = valueDeserializer;
+ }
+
+ @Override
+ public Object getOffset(Object partition) {
+ return getOffsets(Arrays.asList(partition)).get(partition);
+ }
+
+ @Override
+ public Map<Object, Object> getOffsets(Collection<Object> partitions) {
+ // Serialize keys so backing store can work with them
+ Map<ByteBuffer, Object> serializedToOriginal = new HashMap<>(partitions.size());
+ for (Object key : partitions) {
+ try {
+ byte[] keySerialized = keySerializer.serialize(namespace, keyConverter.fromCopycatData(key));
+ ByteBuffer keyBuffer = (keySerialized != null) ? ByteBuffer.wrap(keySerialized) : null;
+ serializedToOriginal.put(keyBuffer, key);
+ } catch (Throwable t) {
+ log.error("CRITICAL: Failed to serialize partition key when getting offsets for task with "
+ + "namespace {}. No value for this data will be returned, which may break the "
+ + "task or cause it to skip some data.", namespace, t);
+ }
+ }
+
+ // Get serialized key -> serialized value from backing store
+ Map<ByteBuffer, ByteBuffer> raw;
+ try {
+ raw = backingStore.get(namespace, serializedToOriginal.keySet(), null).get();
+ } catch (Exception e) {
+ log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
+ throw new CopycatException("Failed to fetch offsets.", e);
+ }
+
+ // Deserialize all the values and map back to the original keys
+ Map<Object, Object> result = new HashMap<>(partitions.size());
+ for (Map.Entry<ByteBuffer, ByteBuffer> rawEntry : raw.entrySet()) {
+ try {
+ // Since null could be a valid key, explicitly check whether map contains the key
+ if (!serializedToOriginal.containsKey(rawEntry.getKey())) {
+ log.error("Should be able to map {} back to a requested partition-offset key, backing "
+ + "store may have returned invalid data", rawEntry.getKey());
+ continue;
+ }
+ Object origKey = serializedToOriginal.get(rawEntry.getKey());
+ Object deserializedValue = valueConverter.toCopycatData(
+ valueDeserializer.deserialize(namespace, rawEntry.getValue().array())
+ );
+
+ result.put(origKey, deserializedValue);
+ } catch (Throwable t) {
+ log.error("CRITICAL: Failed to deserialize offset data when getting offsets for task with"
+ + " namespace {}. No value for this data will be returned, which may break the "
+ + "task or cause it to skip some data. This could either be due to an error in "
+ + "the connector implementation or incompatible schema.", namespace, t);
+ }
+ }
+
+ return result;
+ }
+}