You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/08/15 01:01:06 UTC
[2/7] kafka git commit: KAFKA-2366; Initial patch for Copycat
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
new file mode 100644
index 0000000..c6e829c
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/OffsetStorageWriter.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.util.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * <p>
+ * OffsetStorageWriter is a buffered writer that wraps the simple OffsetBackingStore interface.
+ * It maintains a copy of the key-value data in memory and buffers writes. It allows you to take
+ * a snapshot, which can then be asynchronously flushed to the backing store while new writes
+ * continue to be processed. This allows Copycat to process offset commits in the background
+ * while continuing to process messages.
+ * </p>
+ * <p>
+ * Copycat uses an OffsetStorage implementation to save state about the current progress of
+ * source (import to Kafka) jobs, which may have many input partitions and "offsets" may not be as
+ * simple as they are for Kafka partitions or files. Offset storage is not required for sink jobs
+ * because they can use Kafka's native offset storage (or the sink data store can handle offset
+ * storage to achieve exactly once semantics).
+ * </p>
+ * <p>
+ * Both partitions and offsets are generic data objects. This allows different connectors to use
+ * whatever representation they need, even arbitrarily complex records. These are translated
+ * internally into the serialized form the OffsetBackingStore uses.
+ * </p>
+ * <p>
+ * Note that this only provides write functionality. This is intentional to ensure stale data is
+ * never read. Offset data should only be read during startup or reconfiguration of a task. By
+ * always serving those requests by reading the values from the backing store, we ensure we never
+ * accidentally use stale data. (One example of how this can occur: a task is processing input
+ * partition A, writing offsets; reconfiguration causes partition A to be reassigned elsewhere;
+ * reconfiguration causes partition A to be reassigned to this node, but now the offset data is out
+ * of date). Since these offsets are created and managed by the connector itself, there's no way
+ * for the offset management layer to know which keys are "owned" by which tasks at any given
+ * time.
+ * </p>
+ * <p>
+ * This class is not thread-safe. It should only be accessed from a Task's processing thread.
+ * </p>
+ */
+public class OffsetStorageWriter<K, V> {
+ private static final Logger log = LoggerFactory.getLogger(OffsetStorageWriter.class);
+
+ private final OffsetBackingStore backingStore;
+ private final Converter<K> keyConverter;
+ private final Converter<V> valueConverter;
+ private final Serializer<K> keySerializer;
+ private final Serializer<V> valueSerializer;
+ private final String namespace;
+ // Offset data in Copycat format
+ private Map<Object, Object> data = new HashMap<>();
+
+ // Not synchronized, should only be accessed by flush thread
+ private Map<Object, Object> toFlush = null;
+ // Unique ID for each flush request to handle callbacks after timeouts
+ private long currentFlushId = 0;
+
+ public OffsetStorageWriter(OffsetBackingStore backingStore,
+ String namespace, Converter<K> keyConverter, Converter<V> valueConverter,
+ Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+ this.backingStore = backingStore;
+ this.namespace = namespace;
+ this.keyConverter = keyConverter;
+ this.valueConverter = valueConverter;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ }
+
+ /**
+ * Set an offset for a partition using Copycat data values
+ * @param partition the partition to store an offset for
+ * @param offset the offset
+ */
+ public synchronized void setOffset(Object partition, Object offset) {
+ data.put(partition, offset);
+ }
+
+ private boolean flushing() {
+ return toFlush != null;
+ }
+
+ /**
+ * Performs the first step of a flush operation, snapshotting the current state. This does not
+ * actually initiate the flush with the underlying storage.
+ *
+ * @return true if a flush was initiated, false if no data was available
+ */
+ public synchronized boolean beginFlush() {
+ if (flushing()) {
+ log.error("Invalid call to OffsetStorageWriter flush() while already flushing, the "
+ + "framework should not allow this");
+ throw new CopycatException("OffsetStorageWriter is already flushing");
+ }
+
+ if (data.isEmpty())
+ return false;
+
+ assert !flushing();
+ toFlush = data;
+ data = new HashMap<>();
+ return true;
+ }
+
+ /**
+ * Flush the current offsets and clear them from this writer. This is non-blocking: it
+ * moves the current set of offsets out of the way, serializes the data, and asynchronously
+ * writes the data to the backing store. If no offsets need to be written, the callback is
+ * still invoked, but no Future is returned.
+ *
+ * @return a Future, or null if there are no offsets to commitOffsets
+ */
+ public Future<Void> doFlush(final Callback<Void> callback) {
+ final long flushId = currentFlushId;
+
+ // Serialize
+ Map<ByteBuffer, ByteBuffer> offsetsSerialized;
+ try {
+ offsetsSerialized = new HashMap<>();
+ for (Map.Entry<Object, Object> entry : toFlush.entrySet()) {
+ byte[] key = keySerializer.serialize(namespace, keyConverter.fromCopycatData(entry.getKey()));
+ ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
+ byte[] value = valueSerializer.serialize(namespace, valueConverter.fromCopycatData(entry.getValue()));
+ ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
+ offsetsSerialized.put(keyBuffer, valueBuffer);
+ }
+ } catch (Throwable t) {
+ // Must handle errors properly here or the writer will be left mid-flush forever and be
+ // unable to make progress.
+ log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
+ + "offsets under namespace {}. This likely won't recover unless the "
+ + "unserializable partition or offset information is overwritten.", namespace);
+ callback.onCompletion(t, null);
+ return null;
+ }
+
+ // And submit the data
+ log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
+ return backingStore.set(namespace, offsetsSerialized, new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ boolean isCurrent = handleFinishWrite(flushId, error, result);
+ if (isCurrent && callback != null)
+ callback.onCompletion(error, result);
+ }
+ });
+ }
+
+ /**
+ * Cancel a flush that has been initiated by {@link #beginFlush}. This should not be called if
+ * {@link #doFlush} has already been invoked. It should be used if an operation performed
+ * between beginFlush and doFlush failed.
+ */
+ public synchronized void cancelFlush() {
+ // Verify we're still flushing data to handle a race between cancelFlush() calls from up the
+ // call stack and callbacks from the write request to underlying storage
+ if (flushing()) {
+ // Just recombine the data and place it back in the primary storage
+ toFlush.putAll(data);
+ data = toFlush;
+ currentFlushId++;
+ toFlush = null;
+ }
+ }
+
+ /**
+ * Handle completion of a write. Returns true if this callback is for the current flush
+ * operation, false if it's for an old operation that should now be ignored.
+ */
+ private synchronized boolean handleFinishWrite(long flushId, Throwable error, Void result) {
+ // Callbacks need to be handled carefully since the flush operation may have already timed
+ // out and been cancelled.
+ if (flushId != currentFlushId)
+ return false;
+
+ if (error != null) {
+ cancelFlush();
+ } else {
+ currentFlushId++;
+ toFlush = null;
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
new file mode 100644
index 0000000..5cf1423
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/Callback.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+/**
+ * Generic interface for callbacks
+ */
+public interface Callback<V> {
+ /**
+ * Invoked upon completion of the operation.
+ *
+ * @param error the error that caused the operation to fail, or null if no error occurred
+ * @param result the return value, or null if the operation failed
+ */
+ void onCompletion(Throwable error, V result);
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
new file mode 100644
index 0000000..44a9e41
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ConnectorTaskId.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import java.io.Serializable;
+
+/**
+ * Unique ID for a single task. It includes a unique connector ID and a task ID that is unique within
+ * the connector.
+ */
+public class ConnectorTaskId implements Serializable {
+ private final String connector;
+ private final int task;
+
+ public ConnectorTaskId(String job, int task) {
+ this.connector = job;
+ this.task = task;
+ }
+
+ public String getConnector() {
+ return connector;
+ }
+
+ public int getTask() {
+ return task;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o)
+ return true;
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ ConnectorTaskId that = (ConnectorTaskId) o;
+
+ if (task != that.task)
+ return false;
+ if (connector != null ? !connector.equals(that.connector) : that.connector != null)
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = connector != null ? connector.hashCode() : 0;
+ result = 31 * result + task;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return connector + '-' + task;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
new file mode 100644
index 0000000..278fdd3
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/FutureCallback.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import java.util.concurrent.*;
+
+public class FutureCallback<T> implements Callback<T>, Future<T> {
+
+ private Callback<T> underlying;
+ private CountDownLatch finishedLatch;
+ private T result = null;
+ private Throwable exception = null;
+
+ public FutureCallback(Callback<T> underlying) {
+ this.underlying = underlying;
+ this.finishedLatch = new CountDownLatch(1);
+ }
+
+ @Override
+ public void onCompletion(Throwable error, T result) {
+ underlying.onCompletion(error, result);
+ this.exception = error;
+ this.result = result;
+ finishedLatch.countDown();
+ }
+
+ @Override
+ public boolean cancel(boolean b) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return finishedLatch.getCount() == 0;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ finishedLatch.await();
+ return getResult();
+ }
+
+ @Override
+ public T get(long l, TimeUnit timeUnit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ finishedLatch.await(l, timeUnit);
+ return getResult();
+ }
+
+ private T getResult() throws ExecutionException {
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ }
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
new file mode 100644
index 0000000..3e23f29
--- /dev/null
+++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/util/ShutdownableThread.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * <p>
+ * Thread class with support for triggering graceful and forcible shutdown. In graceful shutdown,
+ * a flag is set, which the thread should detect and try to exit gracefully from. In forcible
+ * shutdown, the thread is interrupted. These can be combined to give a thread a chance to exit
+ * gracefully, but then force it to exit if it takes too long.
+ * </p>
+ * <p>
+ * Implementations should override the {@link #execute} method and check {@link #getRunning} to
+ * determine whether they should try to gracefully exit.
+ * </p>
+ */
+public abstract class ShutdownableThread extends Thread {
+ private static final Logger log = LoggerFactory.getLogger(ShutdownableThread.class);
+
+ private AtomicBoolean isRunning = new AtomicBoolean(true);
+ private CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+ /**
+ * An UncaughtExceptionHandler to register on every instance of this class. This is useful for
+ * testing, where AssertionExceptions in the thread may not cause the test to fail. Since one
+ * instance is used for all threads, it must be thread-safe.
+ */
+ volatile public static UncaughtExceptionHandler funcaughtExceptionHandler = null;
+
+ public ShutdownableThread(String name) {
+ // The default is daemon=true so that these threads will not prevent shutdown. We use this
+ // default because threads that are running user code that may not clean up properly, even
+ // when we attempt to forcibly shut them down.
+ this(name, true);
+ }
+
+ public ShutdownableThread(String name, boolean daemon) {
+ super(name);
+ this.setDaemon(daemon);
+ if (funcaughtExceptionHandler != null)
+ this.setUncaughtExceptionHandler(funcaughtExceptionHandler);
+ }
+
+ /**
+ * Implementations should override this method with the main body for the thread.
+ */
+ public abstract void execute();
+
+ /**
+ * Returns true if the thread hasn't exited yet and none of the shutdown methods have been
+ * invoked
+ */
+ public boolean getRunning() {
+ return isRunning.get();
+ }
+
+ @Override
+ public void run() {
+ try {
+ execute();
+ } catch (Error | RuntimeException e) {
+ log.error("Thread {} exiting with uncaught exception: ", getName(), e);
+ throw e;
+ } finally {
+ shutdownLatch.countDown();
+ }
+ }
+
+ /**
+ * Shutdown the thread, first trying to shut down gracefully using the specified timeout, then
+ * forcibly interrupting the thread.
+ * @param gracefulTimeout the maximum time to wait for a graceful exit
+ * @param unit the time unit of the timeout argument
+ */
+ public void shutdown(long gracefulTimeout, TimeUnit unit)
+ throws InterruptedException {
+ boolean success = gracefulShutdown(gracefulTimeout, unit);
+ if (!success)
+ forceShutdown();
+ }
+
+ /**
+ * Attempt graceful shutdown
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return true if successful, false if the timeout elapsed
+ */
+ public boolean gracefulShutdown(long timeout, TimeUnit unit) throws InterruptedException {
+ startGracefulShutdown();
+ return awaitShutdown(timeout, unit);
+ }
+
+ /**
+ * Start shutting down this thread gracefully, but do not block waiting for it to exit.
+ */
+ public void startGracefulShutdown() {
+ log.info("Starting graceful shutdown of thread {}", getName());
+ isRunning.set(false);
+ }
+
+ /**
+ * Awaits shutdown of this thread, waiting up to the timeout.
+ * @param timeout the maximum time to wait
+ * @param unit the time unit of the timeout argument
+ * @return true if successful, false if the timeout elapsed
+ * @throws InterruptedException
+ */
+ public boolean awaitShutdown(long timeout, TimeUnit unit) throws InterruptedException {
+ return shutdownLatch.await(timeout, unit);
+ }
+
+ /**
+ * Immediately tries to force the thread to shut down by interrupting it. This does not try to
+ * wait for the thread to truly exit because forcible shutdown is not always possible. By
+ * default, threads are marked as daemon threads so they will not prevent the process from
+ * exiting.
+ */
+ public void forceShutdown() throws InterruptedException {
+ log.info("Forcing shutdown of thread {}", getName());
+ isRunning.set(false);
+ interrupt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
new file mode 100644
index 0000000..0c6f950
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSinkTaskTest.java
@@ -0,0 +1,367 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.sink.SinkRecord;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.sink.SinkTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.MockTime;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.*;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(WorkerSinkTask.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerSinkTaskTest extends ThreadedTest {
+
+ // These are fixed to keep this code simpler. In this example we assume byte[] raw values
+ // with mix of integer/string in Copycat
+ private static final String TOPIC = "test";
+ private static final int PARTITION = 12;
+ private static final long FIRST_OFFSET = 45;
+ private static final int KEY = 12;
+ private static final String VALUE = "VALUE";
+ private static final byte[] RAW_KEY = "key".getBytes();
+ private static final byte[] RAW_VALUE = "value".getBytes();
+
+ private static final TopicPartition TOPIC_PARTITION = new TopicPartition(TOPIC, PARTITION);
+
+ private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private Time time;
+ @Mock private SinkTask sinkTask;
+ private WorkerConfig workerConfig;
+ @Mock private Converter<byte[]> keyConverter;
+ @Mock
+ private Converter<byte[]> valueConverter;
+ private WorkerSinkTask<Integer, String> workerTask;
+ @Mock private KafkaConsumer<byte[], byte[]> consumer;
+ private WorkerSinkTaskThread workerThread;
+
+ private long recordsReturned;
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void setup() {
+ super.setup();
+ time = new MockTime();
+ Properties workerProps = new Properties();
+ workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
+ workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
+ workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerConfig = new WorkerConfig(workerProps);
+ workerTask = PowerMock.createPartialMock(
+ WorkerSinkTask.class, new String[]{"createConsumer", "createWorkerThread"},
+ taskId, sinkTask, workerConfig, keyConverter, valueConverter, time);
+
+ recordsReturned = 0;
+ }
+
+ @Test
+ public void testPollsInBackground() throws Exception {
+ Properties taskProps = new Properties();
+
+ expectInitializeTask(taskProps);
+ Capture<Collection<SinkRecord>> capturedRecords = expectPolls(1L);
+ expectStopTask(10L);
+
+ PowerMock.replayAll();
+
+ workerTask.start(taskProps);
+ for (int i = 0; i < 10; i++) {
+ workerThread.iteration();
+ }
+ workerTask.stop();
+ // No need for awaitStop since the thread is mocked
+ workerTask.close();
+
+ // Verify contents match expected values, i.e. that they were translated properly. With max
+ // batch size 1 and poll returns 1 message at a time, we should have a matching # of batches
+ assertEquals(10, capturedRecords.getValues().size());
+ int offset = 0;
+ for (Collection<SinkRecord> recs : capturedRecords.getValues()) {
+ assertEquals(1, recs.size());
+ for (SinkRecord rec : recs) {
+ SinkRecord referenceSinkRecord
+ = new SinkRecord(TOPIC, PARTITION, KEY, VALUE, FIRST_OFFSET + offset);
+ assertEquals(referenceSinkRecord, rec);
+ offset++;
+ }
+ }
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testDeliverConvertsData() throws Exception {
+ // Validate conversion is performed when data is delivered
+ Integer record = 12;
+
+ ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+ Collections.singletonMap(
+ new TopicPartition("topic", 0),
+ Collections.singletonList(new ConsumerRecord<>("topic", 0, 0, RAW_KEY, RAW_VALUE))));
+
+ // Exact data doesn't matter, but should be passed directly to sink task
+ EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(record);
+ EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(record);
+ Capture<Collection<SinkRecord>> capturedRecords
+ = EasyMock.newCapture(CaptureType.ALL);
+ sinkTask.put(EasyMock.capture(capturedRecords));
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ Whitebox.invokeMethod(workerTask, "deliverMessages", records);
+ assertEquals(record, capturedRecords.getValue().iterator().next().getKey());
+ assertEquals(record, capturedRecords.getValue().iterator().next().getValue());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ Properties taskProps = new Properties();
+
+ expectInitializeTask(taskProps);
+ // Make each poll() take the offset commit interval
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, null, null, 0, true);
+ expectStopTask(2);
+
+ PowerMock.replayAll();
+
+ workerTask.start(taskProps);
+ // First iteration gets one record
+ workerThread.iteration();
+ // Second triggers commit, gets a second offset
+ workerThread.iteration();
+ // Commit finishes synchronously for testing so we can check this immediately
+ assertEquals(0, workerThread.getCommitFailures());
+ workerTask.stop();
+ workerTask.close();
+
+ assertEquals(2, capturedRecords.getValues().size());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitTaskFlushFailure() throws Exception {
+ Properties taskProps = new Properties();
+
+ expectInitializeTask(taskProps);
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, new RuntimeException(), null, 0, true);
+ expectStopTask(2);
+
+ PowerMock.replayAll();
+
+ workerTask.start(taskProps);
+ // Second iteration triggers commit
+ workerThread.iteration();
+ workerThread.iteration();
+ assertEquals(1, workerThread.getCommitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitConsumerFailure() throws Exception {
+ Properties taskProps = new Properties();
+
+ expectInitializeTask(taskProps);
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT);
+ expectOffsetFlush(1L, null, new Exception(), 0, true);
+ expectStopTask(2);
+
+ PowerMock.replayAll();
+
+ workerTask.start(taskProps);
+ // Second iteration triggers commit
+ workerThread.iteration();
+ workerThread.iteration();
+ // TODO Response to consistent failures?
+ assertEquals(1, workerThread.getCommitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitTimeout() throws Exception {
+ Properties taskProps = new Properties();
+
+ expectInitializeTask(taskProps);
+ // Cut down amount of time to pass in each poll so we trigger exactly 1 offset commit
+ Capture<Collection<SinkRecord>> capturedRecords
+ = expectPolls(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_DEFAULT / 2);
+ expectOffsetFlush(2L, null, null, WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_DEFAULT, false);
+ expectStopTask(4);
+
+ PowerMock.replayAll();
+
+ workerTask.start(taskProps);
+ // Third iteration triggers commit, fourth gives a chance to trigger the timeout but doesn't
+ // trigger another commit
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ workerThread.iteration();
+ // TODO Response to consistent failures?
+ assertEquals(1, workerThread.getCommitFailures());
+ assertEquals(false, Whitebox.getInternalState(workerThread, "committing"));
+ workerTask.stop();
+ workerTask.close();
+
+ PowerMock.verifyAll();
+ }
+
+ private KafkaConsumer<byte[], byte[]> expectInitializeTask(Properties taskProps)
+ throws Exception {
+ sinkTask.initialize(EasyMock.anyObject(SinkTaskContext.class));
+ PowerMock.expectLastCall();
+ sinkTask.start(taskProps);
+ PowerMock.expectLastCall();
+
+ PowerMock.expectPrivate(workerTask, "createConsumer", taskProps)
+ .andReturn(consumer);
+ workerThread = PowerMock.createPartialMock(WorkerSinkTaskThread.class, new String[]{"start"},
+ workerTask, "mock-worker-thread", time,
+ workerConfig);
+ PowerMock.expectPrivate(workerTask, "createWorkerThread")
+ .andReturn(workerThread);
+ workerThread.start();
+ PowerMock.expectLastCall();
+ return consumer;
+ }
+
+ private void expectStopTask(final long expectedMessages) throws Exception {
+ final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
+
+ sinkTask.stop();
+ PowerMock.expectLastCall();
+
+ // No offset commit since it happens in the mocked worker thread, but the main thread does need to wake up the
+ // consumer so it exits quickly
+ consumer.wakeup();
+ PowerMock.expectLastCall();
+
+ consumer.close();
+ PowerMock.expectLastCall();
+ }
+
+ // Note that this can only be called once per test currently
+ private Capture<Collection<SinkRecord>> expectPolls(final long pollDelayMs) throws Exception {
+ // Stub out all the consumer stream/iterator responses, which we just want to verify occur,
+ // but don't care about the exact details here.
+ EasyMock.expect(consumer.poll(EasyMock.anyLong())).andStubAnswer(
+ new IAnswer<ConsumerRecords<byte[], byte[]>>() {
+ @Override
+ public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
+ // "Sleep" so time will progress
+ time.sleep(pollDelayMs);
+ ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
+ Collections.singletonMap(
+ new TopicPartition(TOPIC, PARTITION),
+ Arrays.asList(
+ new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, RAW_KEY, RAW_VALUE)
+ )));
+ recordsReturned++;
+ return records;
+ }
+ });
+ EasyMock.expect(keyConverter.toCopycatData(RAW_KEY)).andReturn(KEY).anyTimes();
+ EasyMock.expect(valueConverter.toCopycatData(RAW_VALUE)).andReturn(VALUE).anyTimes();
+ Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
+ sinkTask.put(EasyMock.capture(capturedRecords));
+ EasyMock.expectLastCall().anyTimes();
+ return capturedRecords;
+ }
+
+ private Capture<ConsumerCommitCallback> expectOffsetFlush(final long expectedMessages,
+ final RuntimeException flushError,
+ final Exception consumerCommitError,
+ final long consumerCommitDelayMs,
+ final boolean invokeCallback)
+ throws Exception {
+ final long finalOffset = FIRST_OFFSET + expectedMessages - 1;
+
+ EasyMock.expect(consumer.subscriptions()).andReturn(Collections.singleton(TOPIC_PARTITION));
+ EasyMock.expect(consumer.position(TOPIC_PARTITION)).andAnswer(
+ new IAnswer<Long>() {
+ @Override
+ public Long answer() throws Throwable {
+ return FIRST_OFFSET + recordsReturned - 1;
+ }
+ }
+ );
+
+ sinkTask.flush(Collections.singletonMap(TOPIC_PARTITION, finalOffset));
+ IExpectationSetters<Object> flushExpectation = PowerMock.expectLastCall();
+ if (flushError != null) {
+ flushExpectation.andThrow(flushError).once();
+ return null;
+ }
+
+ final Capture<ConsumerCommitCallback> capturedCallback = EasyMock.newCapture();
+ final Map<TopicPartition, Long> offsets = Collections.singletonMap(TOPIC_PARTITION, finalOffset);
+ consumer.commit(EasyMock.eq(offsets),
+ EasyMock.eq(CommitType.ASYNC),
+ EasyMock.capture(capturedCallback));
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ time.sleep(consumerCommitDelayMs);
+ if (invokeCallback)
+ capturedCallback.getValue().onComplete(offsets, consumerCommitError);
+ return null;
+ }
+ });
+ return capturedCallback;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
new file mode 100644
index 0000000..60e1462
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerSourceTaskTest.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.source.SourceTaskContext;
+import org.apache.kafka.copycat.storage.Converter;
+import org.apache.kafka.copycat.storage.OffsetStorageReader;
+import org.apache.kafka.copycat.storage.OffsetStorageWriter;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+@RunWith(PowerMockRunner.class)
+public class WorkerSourceTaskTest extends ThreadedTest {
+ private static final byte[] PARTITION_BYTES = "partition".getBytes();
+ private static final byte[] OFFSET_BYTES = "offset-1".getBytes();
+
+ // Copycat-format data
+ private static final Integer KEY = -1;
+ private static final Long RECORD = 12L;
+ // Native-formatted data. The actual format of this data doesn't matter -- we just want to see that the right version
+ // is used in the right place.
+ private static final ByteBuffer CONVERTED_KEY = ByteBuffer.wrap("converted-key".getBytes());
+ private static final String CONVERTED_RECORD = "converted-record";
+
+ private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private WorkerConfig config;
+ @Mock private SourceTask sourceTask;
+ @Mock private Converter<ByteBuffer> keyConverter;
+ @Mock private Converter<String> valueConverter;
+ @Mock private KafkaProducer<ByteBuffer, String> producer;
+ @Mock private OffsetStorageReader offsetReader;
+ @Mock private OffsetStorageWriter<ByteBuffer, String> offsetWriter;
+ private WorkerSourceTask<ByteBuffer, String> workerTask;
+ @Mock private Future<RecordMetadata> sendFuture;
+
+ private Capture<org.apache.kafka.clients.producer.Callback> producerCallbacks;
+
+ private static final Properties EMPTY_TASK_PROPS = new Properties();
+ private static final List<SourceRecord> RECORDS = Arrays.asList(
+ new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD)
+ );
+
+ @Override
+ public void setup() {
+ super.setup();
+ Properties workerProps = new Properties();
+ workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
+ workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
+ workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ config = new WorkerConfig(workerProps);
+ producerCallbacks = EasyMock.newCapture();
+ }
+
+ private void createWorkerTask() {
+ workerTask = new WorkerSourceTask<>(taskId, sourceTask, keyConverter, valueConverter, producer,
+ offsetReader, offsetWriter, config, new SystemTime());
+ }
+
+ @Test
+ public void testPollsInBackground() throws Exception {
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+
+ final CountDownLatch pollLatch = expectPolls(10);
+ // In this test, we don't flush, so nothing goes any further than the offset writer
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(EMPTY_TASK_PROPS);
+ awaitPolls(pollLatch);
+ workerTask.stop();
+ assertEquals(true, workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommit() throws Exception {
+ // Test that the task commits properly when prompted
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+
+ // We'll wait for some data, then trigger a flush
+ final CountDownLatch pollLatch = expectPolls(1);
+ expectOffsetFlush(true);
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(EMPTY_TASK_PROPS);
+ awaitPolls(pollLatch);
+ assertTrue(workerTask.commitOffsets());
+ workerTask.stop();
+ assertEquals(true, workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCommitFailure() throws Exception {
+ // Test that the task commits properly when prompted
+ createWorkerTask();
+
+ sourceTask.initialize(EasyMock.anyObject(SourceTaskContext.class));
+ EasyMock.expectLastCall();
+ sourceTask.start(EMPTY_TASK_PROPS);
+ EasyMock.expectLastCall();
+
+ // We'll wait for some data, then trigger a flush
+ final CountDownLatch pollLatch = expectPolls(1);
+ expectOffsetFlush(false);
+
+ sourceTask.stop();
+ EasyMock.expectLastCall();
+ expectOffsetFlush(true);
+
+ PowerMock.replayAll();
+
+ workerTask.start(EMPTY_TASK_PROPS);
+ awaitPolls(pollLatch);
+ assertFalse(workerTask.commitOffsets());
+ workerTask.stop();
+ assertEquals(true, workerTask.awaitStop(1000));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendRecordsConvertsData() throws Exception {
+ createWorkerTask();
+
+ List<SourceRecord> records = new ArrayList<>();
+ // Can just use the same record for key and value
+ records.add(new SourceRecord(PARTITION_BYTES, OFFSET_BYTES, "topic", null, KEY, RECORD));
+
+ Capture<ProducerRecord<ByteBuffer, String>> sent = expectSendRecord();
+
+ PowerMock.replayAll();
+
+ Whitebox.invokeMethod(workerTask, "sendRecords", records);
+ assertEquals(CONVERTED_KEY, sent.getValue().key());
+ assertEquals(CONVERTED_RECORD, sent.getValue().value());
+
+ PowerMock.verifyAll();
+ }
+
+
+ private CountDownLatch expectPolls(int count) throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(count);
+ // Note that we stub these to allow any number of calls because the thread will continue to
+ // run. The count passed in + latch returned just makes sure we get *at least* that number of
+ // calls
+ EasyMock.expect(sourceTask.poll())
+ .andStubAnswer(new IAnswer<List<SourceRecord>>() {
+ @Override
+ public List<SourceRecord> answer() throws Throwable {
+ latch.countDown();
+ return RECORDS;
+ }
+ });
+ // Fallout of the poll() call
+ expectSendRecord();
+ return latch;
+ }
+
+ private Capture<ProducerRecord<ByteBuffer, String>> expectSendRecord() throws InterruptedException {
+ EasyMock.expect(keyConverter.fromCopycatData(KEY)).andStubReturn(CONVERTED_KEY);
+ EasyMock.expect(valueConverter.fromCopycatData(RECORD)).andStubReturn(CONVERTED_RECORD);
+
+ Capture<ProducerRecord<ByteBuffer, String>> sent = EasyMock.newCapture();
+ // 1. Converted data passed to the producer, which will need callbacks invoked for flush to work
+ EasyMock.expect(
+ producer.send(EasyMock.capture(sent),
+ EasyMock.capture(producerCallbacks)))
+ .andStubAnswer(new IAnswer<Future<RecordMetadata>>() {
+ @Override
+ public Future<RecordMetadata> answer() throws Throwable {
+ synchronized (producerCallbacks) {
+ for (org.apache.kafka.clients.producer.Callback cb : producerCallbacks.getValues()) {
+ cb.onCompletion(new RecordMetadata(new TopicPartition("foo", 0), 0, 0), null);
+ }
+ producerCallbacks.reset();
+ }
+ return sendFuture;
+ }
+ });
+ // 2. Offset data is passed to the offset storage.
+ offsetWriter.setOffset(PARTITION_BYTES, OFFSET_BYTES);
+ PowerMock.expectLastCall().anyTimes();
+
+ return sent;
+ }
+
+ private void awaitPolls(CountDownLatch latch) throws InterruptedException {
+ latch.await(1000, TimeUnit.MILLISECONDS);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void expectOffsetFlush(boolean succeed) throws Exception {
+ EasyMock.expect(offsetWriter.beginFlush()).andReturn(true);
+ Future<Void> flushFuture = PowerMock.createMock(Future.class);
+ EasyMock.expect(offsetWriter.doFlush(EasyMock.anyObject(Callback.class))).andReturn(flushFuture);
+ // Should throw for failure
+ IExpectationSetters<Void> futureGetExpect = EasyMock.expect(
+ flushFuture.get(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class)));
+ if (succeed) {
+ futureGetExpect.andReturn(null);
+ } else {
+ futureGetExpect.andThrow(new TimeoutException());
+ offsetWriter.cancelFlush();
+ PowerMock.expectLastCall();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
new file mode 100644
index 0000000..32e7ff9
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/WorkerTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.copycat.cli.WorkerConfig;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.source.SourceRecord;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.storage.*;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.MockTime;
+import org.apache.kafka.copycat.util.ThreadedTest;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Properties;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(Worker.class)
+@PowerMockIgnore("javax.management.*")
+public class WorkerTest extends ThreadedTest {
+
+ private ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+ private Worker worker;
+ private OffsetBackingStore offsetBackingStore = PowerMock.createMock(OffsetBackingStore.class);
+ private Serializer offsetKeySerializer = PowerMock.createMock(Serializer.class);
+ private Serializer offsetValueSerializer = PowerMock.createMock(Serializer.class);
+ private Deserializer offsetKeyDeserializer = PowerMock.createMock(Deserializer.class);
+ private Deserializer offsetValueDeserializer = PowerMock.createMock(Deserializer.class);
+
+ @Before
+ public void setup() {
+ super.setup();
+
+ Properties workerProps = new Properties();
+ workerProps.setProperty("key.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("value.converter", "org.apache.kafka.copycat.json.JsonConverter");
+ workerProps.setProperty("key.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
+ workerProps.setProperty("value.serializer", "org.apache.kafka.copycat.json.JsonSerializer");
+ workerProps.setProperty("key.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ workerProps.setProperty("value.deserializer", "org.apache.kafka.copycat.json.JsonDeserializer");
+ WorkerConfig config = new WorkerConfig(workerProps);
+ worker = new Worker(new MockTime(), config, offsetBackingStore,
+ offsetKeySerializer, offsetValueSerializer,
+ offsetKeyDeserializer, offsetValueDeserializer);
+ worker.start();
+ }
+
+ @Test
+ public void testAddRemoveTask() throws Exception {
+ ConnectorTaskId taskId = new ConnectorTaskId("job", 0);
+
+ // Create
+ TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
+ WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+
+ PowerMock.mockStatic(Worker.class);
+ PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
+
+ PowerMock.expectNew(
+ WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+ EasyMock.anyObject(Converter.class),
+ EasyMock.anyObject(Converter.class),
+ EasyMock.anyObject(KafkaProducer.class),
+ EasyMock.anyObject(OffsetStorageReader.class),
+ EasyMock.anyObject(OffsetStorageWriter.class),
+ EasyMock.anyObject(WorkerConfig.class),
+ EasyMock.anyObject(Time.class))
+ .andReturn(workerTask);
+ Properties origProps = new Properties();
+ workerTask.start(origProps);
+ EasyMock.expectLastCall();
+
+ // Remove
+ workerTask.stop();
+ EasyMock.expectLastCall();
+ EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andStubReturn(true);
+ workerTask.close();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+ worker.stopTask(taskId);
+ // Nothing should be left, so this should effectively be a nop
+ worker.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = CopycatException.class)
+ public void testStopInvalidTask() {
+ worker.stopTask(taskId);
+ }
+
+ @Test
+ public void testCleanupTasksOnStop() throws Exception {
+ // Create
+ TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
+ WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
+
+ PowerMock.mockStatic(Worker.class);
+ PowerMock.expectPrivate(Worker.class, "instantiateTask", TestSourceTask.class.getName()).andReturn(task);
+
+ PowerMock.expectNew(
+ WorkerSourceTask.class, EasyMock.eq(taskId), EasyMock.eq(task),
+ EasyMock.anyObject(Converter.class),
+ EasyMock.anyObject(Converter.class),
+ EasyMock.anyObject(KafkaProducer.class),
+ EasyMock.anyObject(OffsetStorageReader.class),
+ EasyMock.anyObject(OffsetStorageWriter.class),
+ EasyMock.anyObject(WorkerConfig.class),
+ EasyMock.anyObject(Time.class))
+ .andReturn(workerTask);
+ Properties origProps = new Properties();
+ workerTask.start(origProps);
+ EasyMock.expectLastCall();
+
+ // Remove on Worker.stop()
+ workerTask.stop();
+ EasyMock.expectLastCall();
+ EasyMock.expect(workerTask.awaitStop(EasyMock.anyLong())).andReturn(true);
+ // Note that in this case we *do not* commit offsets since it's an unclean shutdown
+ workerTask.close();
+ EasyMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ worker.addTask(taskId, TestSourceTask.class.getName(), origProps);
+ worker.stop();
+
+ PowerMock.verifyAll();
+ }
+
+
+ private static class TestSourceTask extends SourceTask {
+ public TestSourceTask() {
+ }
+
+ @Override
+ public void start(Properties props) {
+ }
+
+ @Override
+ public List<SourceRecord> poll() throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
new file mode 100644
index 0000000..5ac7e38
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerderTest.java
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.runtime.standalone;
+
+import org.apache.kafka.copycat.connector.Connector;
+import org.apache.kafka.copycat.connector.Task;
+import org.apache.kafka.copycat.runtime.ConnectorConfig;
+import org.apache.kafka.copycat.runtime.Worker;
+import org.apache.kafka.copycat.sink.SinkConnector;
+import org.apache.kafka.copycat.sink.SinkTask;
+import org.apache.kafka.copycat.source.SourceConnector;
+import org.apache.kafka.copycat.source.SourceTask;
+import org.apache.kafka.copycat.util.Callback;
+import org.apache.kafka.copycat.util.ConnectorTaskId;
+import org.apache.kafka.copycat.util.FutureCallback;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({StandaloneHerder.class})
+@PowerMockIgnore("javax.management.*")
+public class StandaloneHerderTest {
+ private static final String CONNECTOR_NAME = "test";
+ private static final String TOPICS_LIST_STR = "topic1,topic2";
+
+ private StandaloneHerder herder;
+ @Mock protected Worker worker;
+ private Connector connector;
+ @Mock protected Callback<String> createCallback;
+
+ private Properties connectorProps;
+ private Properties taskProps;
+
+ @Before
+ public void setup() {
+ worker = PowerMock.createMock(Worker.class);
+ herder = new StandaloneHerder(worker);
+
+ connectorProps = new Properties();
+ connectorProps.setProperty(ConnectorConfig.NAME_CONFIG, CONNECTOR_NAME);
+ connectorProps.setProperty(SinkConnector.TOPICS_CONFIG, TOPICS_LIST_STR);
+ PowerMock.mockStatic(StandaloneHerder.class);
+
+ // These can be anything since connectors can pass along whatever they want.
+ taskProps = new Properties();
+ taskProps.setProperty("foo", "bar");
+ }
+
+ @Test
+ public void testCreateSourceConnector() throws Exception {
+ connector = PowerMock.createMock(BogusSourceClass.class);
+ expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+ PowerMock.replayAll();
+
+ herder.addConnector(connectorProps, createCallback);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCreateSinkConnector() throws Exception {
+ connector = PowerMock.createMock(BogusSinkClass.class);
+ expectAdd(BogusSinkClass.class, BogusSinkTask.class, true);
+
+ PowerMock.replayAll();
+
+ herder.addConnector(connectorProps, createCallback);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testDestroyConnector() throws Exception {
+ connector = PowerMock.createMock(BogusSourceClass.class);
+ expectAdd(BogusSourceClass.class, BogusSourceTask.class, false);
+ expectDestroy();
+ PowerMock.replayAll();
+
+ herder.addConnector(connectorProps, createCallback);
+ FutureCallback<Void> futureCb = new FutureCallback<>(new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+
+ }
+ });
+ herder.deleteConnector(CONNECTOR_NAME, futureCb);
+ futureCb.get(1000L, TimeUnit.MILLISECONDS);
+ PowerMock.verifyAll();
+ }
+
+
+ private void expectAdd(Class<? extends Connector> connClass,
+ Class<? extends Task> taskClass,
+ boolean sink) throws Exception {
+ expectCreate(connClass, taskClass, sink, true);
+ }
+
+ private void expectRestore(Class<? extends Connector> connClass,
+ Class<? extends Task> taskClass) throws Exception {
+ // Restore never uses a callback. These tests always use sources
+ expectCreate(connClass, taskClass, false, false);
+ }
+
+ private void expectCreate(Class<? extends Connector> connClass,
+ Class<? extends Task> taskClass,
+ boolean sink, boolean expectCallback) throws Exception {
+ connectorProps.setProperty(ConnectorConfig.CONNECTOR_CLASS_CONFIG, connClass.getName());
+
+ PowerMock.expectPrivate(StandaloneHerder.class, "instantiateConnector", connClass.getName())
+ .andReturn(connector);
+ if (expectCallback) {
+ createCallback.onCompletion(null, CONNECTOR_NAME);
+ PowerMock.expectLastCall();
+ }
+
+ connector.initialize(EasyMock.anyObject(StandaloneConnectorContext.class));
+ PowerMock.expectLastCall();
+ connector.start(new Properties());
+ PowerMock.expectLastCall();
+
+ // Just return the connector properties for the individual task we generate by default
+ EasyMock.<Class<? extends Task>>expect(connector.getTaskClass()).andReturn(taskClass);
+
+ EasyMock.expect(connector.getTaskConfigs(ConnectorConfig.TASKS_MAX_DEFAULT))
+ .andReturn(Arrays.asList(taskProps));
+ // And we should instantiate the tasks. For a sink task, we should see added properties for
+ // the input topic partitions
+ Properties generatedTaskProps = new Properties();
+ generatedTaskProps.putAll(taskProps);
+ if (sink)
+ generatedTaskProps.setProperty(SinkTask.TOPICS_CONFIG, TOPICS_LIST_STR);
+ worker.addTask(new ConnectorTaskId(CONNECTOR_NAME, 0), taskClass.getName(), generatedTaskProps);
+ PowerMock.expectLastCall();
+ }
+
+ private void expectStop() {
+ worker.stopTask(new ConnectorTaskId(CONNECTOR_NAME, 0));
+ EasyMock.expectLastCall();
+ connector.stop();
+ EasyMock.expectLastCall();
+ }
+
+ private void expectDestroy() {
+ expectStop();
+ }
+
+ // We need to use a real class here due to some issue with mocking java.lang.Class
+ private abstract class BogusSourceClass extends SourceConnector {
+ }
+
+ private abstract class BogusSourceTask extends SourceTask {
+ }
+
+ private abstract class BogusSinkClass extends SinkConnector {
+ }
+
+ private abstract class BogusSinkTask extends SourceTask {
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
new file mode 100644
index 0000000..bbcbdc9
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/FileOffsetBackingStoreTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.copycat.util.Callback;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.powermock.api.easymock.PowerMock;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class FileOffsetBackingStoreTest {
+
+ FileOffsetBackingStore store;
+ Map<String, Object> props;
+ File tempFile;
+
+ private static Map<ByteBuffer, ByteBuffer> firstSet = new HashMap<>();
+
+ static {
+ firstSet.put(buffer("key"), buffer("value"));
+ firstSet.put(null, null);
+ }
+
+ @Before
+ public void setup() throws IOException {
+ store = new FileOffsetBackingStore();
+ tempFile = File.createTempFile("fileoffsetbackingstore", null);
+ props = new HashMap<>();
+ props.put(FileOffsetBackingStore.OFFSET_STORAGE_FILE_FILENAME_CONFIG, tempFile.getAbsolutePath());
+ store.configure(props);
+ store.start();
+ }
+
+ @After
+ public void teardown() {
+ tempFile.delete();
+ }
+
+ @Test
+ public void testGetSet() throws Exception {
+ Callback<Void> setCallback = expectSuccessfulSetCallback();
+ Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
+ PowerMock.replayAll();
+
+ store.set("namespace", firstSet, setCallback).get();
+
+ Map<ByteBuffer, ByteBuffer> values = store.get("namespace", Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
+ assertEquals(buffer("value"), values.get(buffer("key")));
+ assertEquals(null, values.get(buffer("bad")));
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSaveRestore() throws Exception {
+ Callback<Void> setCallback = expectSuccessfulSetCallback();
+ Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
+ PowerMock.replayAll();
+
+ store.set("namespace", firstSet, setCallback).get();
+ store.stop();
+
+ // Restore into a new store to ensure correct reload from scratch
+ FileOffsetBackingStore restore = new FileOffsetBackingStore();
+ restore.configure(props);
+ restore.start();
+ Map<ByteBuffer, ByteBuffer> values = restore.get("namespace", Arrays.asList(buffer("key")), getCallback).get();
+ assertEquals(buffer("value"), values.get(buffer("key")));
+
+ PowerMock.verifyAll();
+ }
+
+ private static ByteBuffer buffer(String v) {
+ return ByteBuffer.wrap(v.getBytes());
+ }
+
+ private Callback<Void> expectSuccessfulSetCallback() {
+ @SuppressWarnings("unchecked")
+ Callback<Void> setCallback = PowerMock.createMock(Callback.class);
+ setCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.isNull(Void.class));
+ PowerMock.expectLastCall();
+ return setCallback;
+ }
+
+ @SuppressWarnings("unchecked")
+ private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
+ Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
+ getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
+ PowerMock.expectLastCall();
+ return getCallback;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
new file mode 100644
index 0000000..3d49f05
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/storage/OffsetStorageWriterTest.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.storage;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.copycat.errors.CopycatException;
+import org.apache.kafka.copycat.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+public class OffsetStorageWriterTest {
+ private static final String NAMESPACE = "namespace";
+ // Copycat format - any types should be accepted here
+ private static final List<String> OFFSET_KEY = Arrays.asList("key", "key");
+ private static final String OFFSET_VALUE = "value";
+ // Native objects - must match serializer types
+ private static final int OFFSET_KEY_CONVERTED = 12;
+ private static final String OFFSET_VALUE_CONVERTED = "value-converted";
+ // Serialized
+ private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
+ private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
+ private static final Map<ByteBuffer, ByteBuffer> OFFSETS_SERIALIZED
+ = Collections.singletonMap(ByteBuffer.wrap(OFFSET_KEY_SERIALIZED),
+ ByteBuffer.wrap(OFFSET_VALUE_SERIALIZED));
+
+ @Mock private OffsetBackingStore store;
+ @Mock private Converter<Integer> keyConverter;
+ @Mock private Converter<String> valueConverter;
+ @Mock private Serializer<Integer> keySerializer;
+ @Mock private Serializer<String> valueSerializer;
+ private OffsetStorageWriter<Integer, String> writer;
+
+ private static Exception exception = new RuntimeException("error");
+
+ private ExecutorService service;
+
+ @Before
+ public void setup() {
+ writer = new OffsetStorageWriter<>(store, NAMESPACE, keyConverter, valueConverter, keySerializer, valueSerializer);
+ service = Executors.newFixedThreadPool(1);
+ }
+
+ @After
+ public void teardown() {
+ service.shutdownNow();
+ }
+
+ @Test
+ public void testWriteFlush() throws Exception {
+ @SuppressWarnings("unchecked")
+ Callback<Void> callback = PowerMock.createMock(Callback.class);
+ expectStore(callback, false);
+
+ PowerMock.replayAll();
+
+ writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testNoOffsetsToFlush() {
+ // If no offsets are flushed, we should finish immediately and not have made any calls to the
+ // underlying storage layer
+
+ PowerMock.replayAll();
+
+ // Should not return a future
+ assertFalse(writer.beginFlush());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFlushFailureReplacesOffsets() throws Exception {
+ // When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
+ // such that a subsequent flush will write them.
+
+ @SuppressWarnings("unchecked")
+ final Callback<Void> callback = PowerMock.createMock(Callback.class);
+ // First time the write fails
+ expectStore(callback, true);
+ // Second time it succeeds
+ expectStore(callback, false);
+ // Third time it has no data to flush so we won't get past beginFlush()
+
+ PowerMock.replayAll();
+
+ writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+ assertFalse(writer.beginFlush());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = CopycatException.class)
+ public void testAlreadyFlushing() throws Exception {
+ @SuppressWarnings("unchecked")
+ final Callback<Void> callback = PowerMock.createMock(Callback.class);
+ // Trigger the send, but don't invoke the callback so we'll still be mid-flush
+ CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+ expectStore(null, false, allowStoreCompleteCountdown);
+
+ PowerMock.replayAll();
+
+ writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback);
+ assertTrue(writer.beginFlush()); // should throw
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCancelBeforeAwaitFlush() {
+ PowerMock.replayAll();
+
+ writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ writer.cancelFlush();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCancelAfterAwaitFlush() throws Exception {
+ @SuppressWarnings("unchecked")
+ Callback<Void> callback = PowerMock.createMock(Callback.class);
+ CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+ // In this test, the write should be cancelled so the callback will not be invoked and is not
+ // passed to the expectStore call
+ expectStore(null, false, allowStoreCompleteCountdown);
+
+ PowerMock.replayAll();
+
+ writer.setOffset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ // Start the flush, then immediately cancel before allowing the mocked store request to finish
+ Future<Void> flushFuture = writer.doFlush(callback);
+ writer.cancelFlush();
+ allowStoreCompleteCountdown.countDown();
+ flushFuture.get(1000, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ private void expectStore(final Callback<Void> callback, final boolean fail) {
+ expectStore(callback, fail, null);
+ }
+
+ /**
+ * Expect a request to store data to the underlying OffsetBackingStore.
+ *
+ * @param callback the callback to invoke when completed, or null if the callback isn't
+ * expected to be invoked
+ * @param fail if true, treat
+ * @param waitForCompletion if non-null, a CountDownLatch that should be awaited on before
+ * invoking the callback. A (generous) timeout is still imposed to
+ * ensure tests complete.
+ * @return the captured set of ByteBuffer key-value pairs passed to the storage layer
+ */
+ private void expectStore(final Callback<Void> callback,
+ final boolean fail,
+ final CountDownLatch waitForCompletion) {
+ EasyMock.expect(keyConverter.fromCopycatData(OFFSET_KEY)).andReturn(OFFSET_KEY_CONVERTED);
+ EasyMock.expect(keySerializer.serialize(NAMESPACE, OFFSET_KEY_CONVERTED)).andReturn(OFFSET_KEY_SERIALIZED);
+ EasyMock.expect(valueConverter.fromCopycatData(OFFSET_VALUE)).andReturn(OFFSET_VALUE_CONVERTED);
+ EasyMock.expect(valueSerializer.serialize(NAMESPACE, OFFSET_VALUE_CONVERTED)).andReturn(OFFSET_VALUE_SERIALIZED);
+
+ final Capture<Callback<Void>> storeCallback = Capture.newInstance();
+ EasyMock.expect(store.set(EasyMock.eq(NAMESPACE), EasyMock.eq(OFFSETS_SERIALIZED),
+ EasyMock.capture(storeCallback)))
+ .andAnswer(new IAnswer<Future<Void>>() {
+ @Override
+ public Future<Void> answer() throws Throwable {
+ return service.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (waitForCompletion != null)
+ assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
+
+ if (fail) {
+ storeCallback.getValue().onCompletion(exception, null);
+ } else {
+ storeCallback.getValue().onCompletion(null, null);
+ }
+ return null;
+ }
+ });
+ }
+ });
+ if (callback != null) {
+ if (fail) {
+ callback.onCompletion(EasyMock.eq(exception), EasyMock.eq((Void) null));
+ } else {
+ callback.onCompletion(null, null);
+ }
+ }
+ PowerMock.expectLastCall();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
new file mode 100644
index 0000000..53149db
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/MockTime.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.copycat.util;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock that you can manually advance by calling sleep
+ */
+public class MockTime implements Time {
+
+ private long nanos = 0;
+
+ public MockTime() {
+ this.nanos = System.nanoTime();
+ }
+
+ @Override
+ public long milliseconds() {
+ return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long nanoseconds() {
+ return nanos;
+ }
+
+ @Override
+ public void sleep(long ms) {
+ this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f6acfb08/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
----------------------------------------------------------------------
diff --git a/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
new file mode 100644
index 0000000..4880ca1
--- /dev/null
+++ b/copycat/runtime/src/test/java/org/apache/kafka/copycat/util/ShutdownableThreadTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.copycat.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ShutdownableThreadTest {
+
+ @Test
+ public void testGracefulShutdown() throws InterruptedException {
+ ShutdownableThread thread = new ShutdownableThread("graceful") {
+ @Override
+ public void execute() {
+ while (getRunning()) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ };
+ thread.start();
+ Thread.sleep(10);
+ assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testForcibleShutdown() throws InterruptedException {
+ final CountDownLatch startedLatch = new CountDownLatch(1);
+ ShutdownableThread thread = new ShutdownableThread("forcible") {
+ @Override
+ public void execute() {
+ try {
+ startedLatch.countDown();
+ Thread.sleep(100000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ };
+ thread.start();
+ startedLatch.await();
+ thread.forceShutdown();
+ // Not all threads can be forcibly stopped since interrupt() doesn't work on threads in
+ // certain conditions, but in this case we know the thread is interruptible so we should be
+ // able join() it
+ thread.join(1000);
+ assertFalse(thread.isAlive());
+ }
+}