You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2015/11/09 07:11:33 UTC
[14/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
new file mode 100644
index 0000000..4e54bf1
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -0,0 +1,357 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaOffsetBackingStore.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaOffsetBackingStoreTest {
+ private static final String TOPIC = "connect-offsets";
+ private static final Map<String, String> DEFAULT_PROPS = new HashMap<>();
+ static {
+ DEFAULT_PROPS.put(KafkaOffsetBackingStore.OFFSET_STORAGE_TOPIC_CONFIG, TOPIC);
+ DEFAULT_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+ }
+ private static final Map<ByteBuffer, ByteBuffer> FIRST_SET = new HashMap<>();
+ static {
+ FIRST_SET.put(buffer("key"), buffer("value"));
+ FIRST_SET.put(null, null);
+ }
+
+ private static final ByteBuffer TP0_KEY = buffer("TP0KEY");
+ private static final ByteBuffer TP1_KEY = buffer("TP1KEY");
+ private static final ByteBuffer TP2_KEY = buffer("TP2KEY");
+ private static final ByteBuffer TP0_VALUE = buffer("VAL0");
+ private static final ByteBuffer TP1_VALUE = buffer("VAL1");
+ private static final ByteBuffer TP2_VALUE = buffer("VAL2");
+ private static final ByteBuffer TP0_VALUE_NEW = buffer("VAL0_NEW");
+ private static final ByteBuffer TP1_VALUE_NEW = buffer("VAL1_NEW");
+
+ @Mock
+ KafkaBasedLog<byte[], byte[]> storeLog;
+ private KafkaOffsetBackingStore store;
+
+ private Capture<String> capturedTopic = EasyMock.newCapture();
+ private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+ private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+ private Capture<Callback<ConsumerRecord<byte[], byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+
+ @Before
+ public void setUp() throws Exception {
+ store = PowerMock.createPartialMockAndInvokeDefaultConstructor(KafkaOffsetBackingStore.class, new String[]{"createKafkaBasedLog"});
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testMissingTopic() {
+ store = new KafkaOffsetBackingStore();
+ store.configure(Collections.<String, Object>emptyMap());
+ }
+
+ @Test
+ public void testStartStop() throws Exception {
+ expectConfigure();
+ expectStart(Collections.EMPTY_LIST);
+ expectStop();
+
+ PowerMock.replayAll();
+
+ store.configure(DEFAULT_PROPS);
+ assertEquals(TOPIC, capturedTopic.getValue());
+ assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+ assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+ assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+ assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+ store.start();
+ store.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testReloadOnStart() throws Exception {
+ expectConfigure();
+ expectStart(Arrays.asList(
+ new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()),
+ new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()),
+ new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()),
+ new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array())
+ ));
+ expectStop();
+
+ PowerMock.replayAll();
+
+ store.configure(DEFAULT_PROPS);
+ store.start();
+ HashMap<ByteBuffer, ByteBuffer> data = Whitebox.getInternalState(store, "data");
+ assertEquals(TP0_VALUE_NEW, data.get(TP0_KEY));
+ assertEquals(TP1_VALUE_NEW, data.get(TP1_KEY));
+
+ store.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testGetSet() throws Exception {
+ expectConfigure();
+ expectStart(Collections.EMPTY_LIST);
+ expectStop();
+
+ // First get() against an empty store
+ final Capture<Callback<Void>> firstGetReadToEndCallback = EasyMock.newCapture();
+ storeLog.readToEnd(EasyMock.capture(firstGetReadToEndCallback));
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ firstGetReadToEndCallback.getValue().onCompletion(null, null);
+ return null;
+ }
+ });
+
+ // Set offsets
+ Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+ storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+ PowerMock.expectLastCall();
+ Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+ storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+ PowerMock.expectLastCall();
+
+ // Second get() should get the produced data and return the new values
+ final Capture<Callback<Void>> secondGetReadToEndCallback = EasyMock.newCapture();
+ storeLog.readToEnd(EasyMock.capture(secondGetReadToEndCallback));
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY.array(), TP0_VALUE.array()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY.array(), TP1_VALUE.array()));
+ secondGetReadToEndCallback.getValue().onCompletion(null, null);
+ return null;
+ }
+ });
+
+ // Third get() should pick up data produced by someone else and return those values
+ final Capture<Callback<Void>> thirdGetReadToEndCallback = EasyMock.newCapture();
+ storeLog.readToEnd(EasyMock.capture(thirdGetReadToEndCallback));
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY.array(), TP0_VALUE_NEW.array()));
+ capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY.array(), TP1_VALUE_NEW.array()));
+ thirdGetReadToEndCallback.getValue().onCompletion(null, null);
+ return null;
+ }
+ });
+
+ PowerMock.replayAll();
+
+
+
+ store.configure(DEFAULT_PROPS);
+ store.start();
+
+ // Getting from empty store should return nulls
+ final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+ store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
+ @Override
+ public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
+ // Since we didn't read them yet, these will be null
+ assertEquals(null, result.get(TP0_KEY));
+ assertEquals(null, result.get(TP1_KEY));
+ getInvokedAndPassed.set(true);
+ }
+ }).get(10000, TimeUnit.MILLISECONDS);
+ assertTrue(getInvokedAndPassed.get());
+
+ // Set some offsets
+ Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+ toSet.put(TP0_KEY, TP0_VALUE);
+ toSet.put(TP1_KEY, TP1_VALUE);
+ final AtomicBoolean invoked = new AtomicBoolean(false);
+ Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ invoked.set(true);
+ }
+ });
+ assertFalse(setFuture.isDone());
+ // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
+ // for the store's set callback
+ callback1.getValue().onCompletion(null, null);
+ assertFalse(invoked.get());
+ callback0.getValue().onCompletion(null, null);
+ setFuture.get(10000, TimeUnit.MILLISECONDS);
+ assertTrue(invoked.get());
+
+ // Getting data should read to end of our published data and return it
+ final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
+ store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
+ @Override
+ public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
+ assertEquals(TP0_VALUE, result.get(TP0_KEY));
+ assertEquals(TP1_VALUE, result.get(TP1_KEY));
+ secondGetInvokedAndPassed.set(true);
+ }
+ }).get(10000, TimeUnit.MILLISECONDS);
+ assertTrue(secondGetInvokedAndPassed.get());
+
+ // Getting data should read to end of our published data and return it
+ final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false);
+ store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
+ @Override
+ public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
+ assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
+ assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
+ thirdGetInvokedAndPassed.set(true);
+ }
+ }).get(10000, TimeUnit.MILLISECONDS);
+ assertTrue(thirdGetInvokedAndPassed.get());
+
+ store.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSetFailure() throws Exception {
+ expectConfigure();
+ expectStart(Collections.EMPTY_LIST);
+ expectStop();
+
+ // Set offsets
+ Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+ storeLog.send(EasyMock.aryEq(TP0_KEY.array()), EasyMock.aryEq(TP0_VALUE.array()), EasyMock.capture(callback0));
+ PowerMock.expectLastCall();
+ Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+ storeLog.send(EasyMock.aryEq(TP1_KEY.array()), EasyMock.aryEq(TP1_VALUE.array()), EasyMock.capture(callback1));
+ PowerMock.expectLastCall();
+ Capture<org.apache.kafka.clients.producer.Callback> callback2 = EasyMock.newCapture();
+ storeLog.send(EasyMock.aryEq(TP2_KEY.array()), EasyMock.aryEq(TP2_VALUE.array()), EasyMock.capture(callback2));
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+
+
+ store.configure(DEFAULT_PROPS);
+ store.start();
+
+ // Set some offsets
+ Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
+ toSet.put(TP0_KEY, TP0_VALUE);
+ toSet.put(TP1_KEY, TP1_VALUE);
+ toSet.put(TP2_KEY, TP2_VALUE);
+ final AtomicBoolean invoked = new AtomicBoolean(false);
+ final AtomicBoolean invokedFailure = new AtomicBoolean(false);
+ Future<Void> setFuture = store.set(toSet, new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ invoked.set(true);
+ if (error != null)
+ invokedFailure.set(true);
+ }
+ });
+ assertFalse(setFuture.isDone());
+ // Out of order callbacks shouldn't matter, should still require all to be invoked before invoking the callback
+ // for the store's set callback
+ callback1.getValue().onCompletion(null, null);
+ assertFalse(invoked.get());
+ callback2.getValue().onCompletion(null, new KafkaException("bogus error"));
+ assertTrue(invoked.get());
+ assertTrue(invokedFailure.get());
+ callback0.getValue().onCompletion(null, null);
+ try {
+ setFuture.get(10000, TimeUnit.MILLISECONDS);
+ fail("Should have seen KafkaException thrown when waiting on KafkaOffsetBackingStore.set() future");
+ } catch (ExecutionException e) {
+ // expected
+ assertNotNull(e.getCause());
+ assertTrue(e.getCause() instanceof KafkaException);
+ }
+
+ store.stop();
+
+ PowerMock.verifyAll();
+ }
+
+ private void expectConfigure() throws Exception {
+ PowerMock.expectPrivate(store, "createKafkaBasedLog", EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+ EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+ .andReturn(storeLog);
+ }
+
+ private void expectStart(final List<ConsumerRecord<byte[], byte[]>> preexistingRecords) throws Exception {
+ storeLog.start();
+ PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ for (ConsumerRecord<byte[], byte[]> rec : preexistingRecords)
+ capturedConsumedCallback.getValue().onCompletion(null, rec);
+ return null;
+ }
+ });
+ }
+
+ private void expectStop() {
+ storeLog.stop();
+ PowerMock.expectLastCall();
+ }
+
+ private static ByteBuffer buffer(String v) {
+ return ByteBuffer.wrap(v.getBytes());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
new file mode 100644
index 0000000..3e0347c
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetStorageWriterTest.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.util.Callback;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+public class OffsetStorageWriterTest {
+ private static final String NAMESPACE = "namespace";
+ // Connect format - any types should be accepted here
+ private static final Map<String, String> OFFSET_KEY = Collections.singletonMap("key", "key");
+ private static final Map<String, Integer> OFFSET_VALUE = Collections.singletonMap("key", 12);
+
+ // Serialized
+ private static final byte[] OFFSET_KEY_SERIALIZED = "key-serialized".getBytes();
+ private static final byte[] OFFSET_VALUE_SERIALIZED = "value-serialized".getBytes();
+
+ @Mock private OffsetBackingStore store;
+ @Mock private Converter keyConverter;
+ @Mock private Converter valueConverter;
+ private OffsetStorageWriter writer;
+
+ private static Exception exception = new RuntimeException("error");
+
+ private ExecutorService service;
+
+ @Before
+ public void setup() {
+ writer = new OffsetStorageWriter(store, NAMESPACE, keyConverter, valueConverter);
+ service = Executors.newFixedThreadPool(1);
+ }
+
+ @After
+ public void teardown() {
+ service.shutdownNow();
+ }
+
+ @Test
+ public void testWriteFlush() throws Exception {
+ @SuppressWarnings("unchecked")
+ Callback<Void> callback = PowerMock.createMock(Callback.class);
+ expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null);
+
+ PowerMock.replayAll();
+
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
+
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ // It should be possible to set offset values to null
+ @Test
+ public void testWriteNullValueFlush() throws Exception {
+ @SuppressWarnings("unchecked")
+ Callback<Void> callback = PowerMock.createMock(Callback.class);
+ expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, null, null, callback, false, null);
+
+ PowerMock.replayAll();
+
+ writer.offset(OFFSET_KEY, null);
+
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ // It should be possible to use null keys. These aren't actually stored as null since the key is wrapped to include
+ // info about the namespace (connector)
+ @Test
+ public void testWriteNullKeyFlush() throws Exception {
+ @SuppressWarnings("unchecked")
+ Callback<Void> callback = PowerMock.createMock(Callback.class);
+ expectStore(null, null, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null);
+
+ PowerMock.replayAll();
+
+ writer.offset(null, OFFSET_VALUE);
+
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testNoOffsetsToFlush() {
+ // If no offsets are flushed, we should finish immediately and not have made any calls to the
+ // underlying storage layer
+
+ PowerMock.replayAll();
+
+ // Should not return a future
+ assertFalse(writer.beginFlush());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testFlushFailureReplacesOffsets() throws Exception {
+ // When a flush fails, we shouldn't just lose the offsets. Instead, they should be restored
+ // such that a subsequent flush will write them.
+
+ @SuppressWarnings("unchecked")
+ final Callback<Void> callback = PowerMock.createMock(Callback.class);
+ // First time the write fails
+ expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, true, null);
+ // Second time it succeeds
+ expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, callback, false, null);
+ // Third time it has no data to flush so we won't get past beginFlush()
+
+ PowerMock.replayAll();
+
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback).get(1000, TimeUnit.MILLISECONDS);
+ assertFalse(writer.beginFlush());
+
+ PowerMock.verifyAll();
+ }
+
+ @Test(expected = ConnectException.class)
+ public void testAlreadyFlushing() throws Exception {
+ @SuppressWarnings("unchecked")
+ final Callback<Void> callback = PowerMock.createMock(Callback.class);
+ // Trigger the send, but don't invoke the callback so we'll still be mid-flush
+ CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+ expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
+
+ PowerMock.replayAll();
+
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ writer.doFlush(callback);
+ assertTrue(writer.beginFlush()); // should throw
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCancelBeforeAwaitFlush() {
+ PowerMock.replayAll();
+
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ writer.cancelFlush();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testCancelAfterAwaitFlush() throws Exception {
+ @SuppressWarnings("unchecked")
+ Callback<Void> callback = PowerMock.createMock(Callback.class);
+ CountDownLatch allowStoreCompleteCountdown = new CountDownLatch(1);
+ // In this test, the write should be cancelled so the callback will not be invoked and is not
+ // passed to the expectStore call
+ expectStore(OFFSET_KEY, OFFSET_KEY_SERIALIZED, OFFSET_VALUE, OFFSET_VALUE_SERIALIZED, null, false, allowStoreCompleteCountdown);
+
+ PowerMock.replayAll();
+
+ writer.offset(OFFSET_KEY, OFFSET_VALUE);
+ assertTrue(writer.beginFlush());
+ // Start the flush, then immediately cancel before allowing the mocked store request to finish
+ Future<Void> flushFuture = writer.doFlush(callback);
+ writer.cancelFlush();
+ allowStoreCompleteCountdown.countDown();
+ flushFuture.get(1000, TimeUnit.MILLISECONDS);
+
+ PowerMock.verifyAll();
+ }
+
+ /**
+ * Expect a request to store data to the underlying OffsetBackingStore.
+ *
+ * @param key the key for the offset
+ * @param keySerialized serialized version of the key
+ * @param value the value for the offset
+ * @param valueSerialized serialized version of the value
+ * @param callback the callback to invoke when completed, or null if the callback isn't
+ * expected to be invoked
+ * @param fail if true, treat
+ * @param waitForCompletion if non-null, a CountDownLatch that should be awaited on before
+ * invoking the callback. A (generous) timeout is still imposed to
+ * ensure tests complete.
+ * @return the captured set of ByteBuffer key-value pairs passed to the storage layer
+ */
+ private void expectStore(Map<String, String> key, byte[] keySerialized,
+ Map<String, Integer> value, byte[] valueSerialized,
+ final Callback<Void> callback,
+ final boolean fail,
+ final CountDownLatch waitForCompletion) {
+ List<Object> keyWrapped = Arrays.asList(NAMESPACE, key);
+ EasyMock.expect(keyConverter.fromConnectData(NAMESPACE, null, keyWrapped)).andReturn(keySerialized);
+ EasyMock.expect(valueConverter.fromConnectData(NAMESPACE, null, value)).andReturn(valueSerialized);
+
+ final Capture<Callback<Void>> storeCallback = Capture.newInstance();
+ final Map<ByteBuffer, ByteBuffer> offsetsSerialized = Collections.singletonMap(
+ keySerialized == null ? null : ByteBuffer.wrap(keySerialized),
+ valueSerialized == null ? null : ByteBuffer.wrap(valueSerialized));
+ EasyMock.expect(store.set(EasyMock.eq(offsetsSerialized), EasyMock.capture(storeCallback)))
+ .andAnswer(new IAnswer<Future<Void>>() {
+ @Override
+ public Future<Void> answer() throws Throwable {
+ return service.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ if (waitForCompletion != null)
+ assertTrue(waitForCompletion.await(10000, TimeUnit.MILLISECONDS));
+
+ if (fail) {
+ storeCallback.getValue().onCompletion(exception, null);
+ } else {
+ storeCallback.getValue().onCompletion(null, null);
+ }
+ return null;
+ }
+ });
+ }
+ });
+ if (callback != null) {
+ if (fail) {
+ callback.onCompletion(EasyMock.eq(exception), EasyMock.eq((Void) null));
+ } else {
+ callback.onCompletion(null, null);
+ }
+ }
+ PowerMock.expectLastCall();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
new file mode 100644
index 0000000..4d17ac4
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ByteArrayProducerRecordEquals.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.easymock.EasyMock;
+import org.easymock.IArgumentMatcher;
+
+import java.util.Arrays;
+
+public class ByteArrayProducerRecordEquals implements IArgumentMatcher {
+ private ProducerRecord<byte[], byte[]> record;
+
+ public static ProducerRecord<byte[], byte[]> eqProducerRecord(ProducerRecord<byte[], byte[]> in) {
+ EasyMock.reportMatcher(new ByteArrayProducerRecordEquals(in));
+ return null;
+ }
+
+ public ByteArrayProducerRecordEquals(ProducerRecord<byte[], byte[]> record) {
+ this.record = record;
+ }
+
+ @Override
+ public boolean matches(Object argument) {
+ if (!(argument instanceof ProducerRecord))
+ return false;
+ ProducerRecord<byte[], byte[]> other = (ProducerRecord<byte[], byte[]>) argument;
+ return record.topic().equals(other.topic()) &&
+ record.partition() != null ? record.partition().equals(other.partition()) : other.partition() == null &&
+ record.key() != null ? Arrays.equals(record.key(), other.key()) : other.key() == null &&
+ record.value() != null ? Arrays.equals(record.value(), other.value()) : other.value() == null;
+ }
+
+ @Override
+ public void appendTo(StringBuffer buffer) {
+ buffer.append(record.toString());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
new file mode 100644
index 0000000..1c3b842
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/KafkaBasedLogTest.java
@@ -0,0 +1,437 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.MockConsumer;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.utils.Time;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaBasedLog.class)
+@PowerMockIgnore("javax.management.*")
+public class KafkaBasedLogTest {
+
+ private static final String TOPIC = "connect-log";
+ private static final TopicPartition TP0 = new TopicPartition(TOPIC, 0);
+ private static final TopicPartition TP1 = new TopicPartition(TOPIC, 1);
+ private static final Map<String, Object> PRODUCER_PROPS = new HashMap<>();
+ static {
+ PRODUCER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+ PRODUCER_PROPS.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ PRODUCER_PROPS.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ }
+ private static final Map<String, Object> CONSUMER_PROPS = new HashMap<>();
+ static {
+ CONSUMER_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+ CONSUMER_PROPS.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ CONSUMER_PROPS.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ }
+
+ private static final Set<TopicPartition> CONSUMER_ASSIGNMENT = new HashSet<>(Arrays.asList(TP0, TP1));
+ private static final Map<String, String> FIRST_SET = new HashMap<>();
+ static {
+ FIRST_SET.put("key", "value");
+ FIRST_SET.put(null, null);
+ }
+
+ private static final Node LEADER = new Node(1, "broker1", 9092);
+ private static final Node REPLICA = new Node(1, "broker2", 9093);
+
+ private static final PartitionInfo TPINFO0 = new PartitionInfo(TOPIC, 0, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+ private static final PartitionInfo TPINFO1 = new PartitionInfo(TOPIC, 1, LEADER, new Node[]{REPLICA}, new Node[]{REPLICA});
+
+ private static final String TP0_KEY = "TP0KEY";
+ private static final String TP1_KEY = "TP1KEY";
+ private static final String TP0_VALUE = "VAL0";
+ private static final String TP1_VALUE = "VAL1";
+ private static final String TP0_VALUE_NEW = "VAL0_NEW";
+ private static final String TP1_VALUE_NEW = "VAL1_NEW";
+
+ private Time time = new MockTime();
+ private KafkaBasedLog<String, String> store;
+
+ @Mock
+ private KafkaProducer<String, String> producer;
+ private MockConsumer<String, String> consumer;
+
+ private List<ConsumerRecord<String, String>> consumedRecords = new ArrayList<>();
+ private Callback<ConsumerRecord<String, String>> consumedCallback = new Callback<ConsumerRecord<String, String>>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord<String, String> record) {
+ consumedRecords.add(record);
+ }
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ store = PowerMock.createPartialMock(KafkaBasedLog.class, new String[]{"createConsumer", "createProducer"},
+ TOPIC, PRODUCER_PROPS, CONSUMER_PROPS, consumedCallback, time);
+ consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
+ consumer.updatePartitions(TOPIC, Arrays.asList(TPINFO0, TPINFO1));
+ Map<TopicPartition, Long> beginningOffsets = new HashMap<>();
+ beginningOffsets.put(TP0, 0L);
+ beginningOffsets.put(TP1, 0L);
+ consumer.updateBeginningOffsets(beginningOffsets);
+ }
+
+ @Test
+ public void testStartStop() throws Exception {
+ expectStart();
+ expectStop();
+
+ PowerMock.replayAll();
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 0L);
+ endOffsets.put(TP1, 0L);
+ consumer.updateEndOffsets(endOffsets);
+ store.start();
+ assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+
+ store.stop();
+
+ assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+ assertTrue(consumer.closed());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testReloadOnStart() throws Exception {
+ expectStart();
+ expectStop();
+
+ PowerMock.replayAll();
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 1L);
+ endOffsets.put(TP1, 1L);
+ consumer.updateEndOffsets(endOffsets);
+ final CountDownLatch finishedLatch = new CountDownLatch(1);
+ consumer.schedulePollTask(new Runnable() { // Use first poll task to setup sequence of remaining responses to polls
+ @Override
+ public void run() {
+ // Should keep polling until it reaches current log end offset for all partitions. Should handle
+ // as many empty polls as needed
+ consumer.scheduleNopPollTask();
+ consumer.scheduleNopPollTask();
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+ }
+ });
+ consumer.scheduleNopPollTask();
+ consumer.scheduleNopPollTask();
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+ }
+ });
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ finishedLatch.countDown();
+ }
+ });
+ }
+ });
+ store.start();
+ assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+
+ assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+ assertEquals(2, consumedRecords.size());
+ assertEquals(TP0_VALUE, consumedRecords.get(0).value());
+ assertEquals(TP1_VALUE, consumedRecords.get(1).value());
+
+ store.stop();
+
+ assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+ assertTrue(consumer.closed());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testSendAndReadToEnd() throws Exception {
+ expectStart();
+ TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+ ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
+ Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+ EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
+ TestFuture<RecordMetadata> tp1Future = new TestFuture<>();
+ ProducerRecord<String, String> tp1Record = new ProducerRecord<>(TOPIC, TP1_KEY, TP1_VALUE);
+ Capture<org.apache.kafka.clients.producer.Callback> callback1 = EasyMock.newCapture();
+ EasyMock.expect(producer.send(EasyMock.eq(tp1Record), EasyMock.capture(callback1))).andReturn(tp1Future);
+
+ // Producer flushes when read to log end is called
+ producer.flush();
+ PowerMock.expectLastCall();
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 0L);
+ endOffsets.put(TP1, 0L);
+ consumer.updateEndOffsets(endOffsets);
+ store.start();
+ assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+ assertEquals(0L, consumer.position(TP0));
+ assertEquals(0L, consumer.position(TP1));
+
+ // Set some keys
+ final AtomicInteger invoked = new AtomicInteger(0);
+ org.apache.kafka.clients.producer.Callback producerCallback = new org.apache.kafka.clients.producer.Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ invoked.incrementAndGet();
+ }
+ };
+ store.send(TP0_KEY, TP0_VALUE, producerCallback);
+ store.send(TP1_KEY, TP1_VALUE, producerCallback);
+ assertEquals(0, invoked.get());
+ tp1Future.resolve((RecordMetadata) null); // Output not used, so safe to not return a real value for testing
+ callback1.getValue().onCompletion(null, null);
+ assertEquals(1, invoked.get());
+ tp0Future.resolve((RecordMetadata) null);
+ callback0.getValue().onCompletion(null, null);
+ assertEquals(2, invoked.get());
+
+ // Now we should have to wait for the records to be read back when we call readToEnd()
+ final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
+ final FutureCallback<Void> readEndFutureCallback = new FutureCallback<>(new Callback<Void>() {
+ @Override
+ public void onCompletion(Throwable error, Void result) {
+ assertEquals(4, consumedRecords.size());
+ assertEquals(TP0_VALUE_NEW, consumedRecords.get(2).value());
+ assertEquals(TP1_VALUE_NEW, consumedRecords.get(3).value());
+ getInvokedAndPassed.set(true);
+ }
+ });
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ // Once we're synchronized in a poll, start the read to end and schedule the exact set of poll events
+ // that should follow. This readToEnd call will immediately wakeup this consumer.poll() call without
+ // returning any data.
+ store.readToEnd(readEndFutureCallback);
+
+ // Needs to seek to end to find end offsets
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ Map<TopicPartition, Long> newEndOffsets = new HashMap<>();
+ newEndOffsets.put(TP0, 2L);
+ newEndOffsets.put(TP1, 2L);
+ consumer.updateEndOffsets(newEndOffsets);
+ }
+ });
+
+ // Should keep polling until it reaches current log end offset for all partitions
+ consumer.scheduleNopPollTask();
+ consumer.scheduleNopPollTask();
+ consumer.scheduleNopPollTask();
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 1, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP1_KEY, TP1_VALUE));
+ }
+ });
+
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 1, TP1_KEY, TP1_VALUE_NEW));
+ }
+ });
+
+ // Already have FutureCallback that should be invoked/awaited, so no need for follow up finishedLatch
+ }
+ });
+ readEndFutureCallback.get(10000, TimeUnit.MILLISECONDS);
+ assertTrue(getInvokedAndPassed.get());
+
+ // Cleanup
+ store.stop();
+
+ assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+ assertTrue(consumer.closed());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testConsumerError() throws Exception {
+ expectStart();
+ expectStop();
+
+ PowerMock.replayAll();
+
+ final CountDownLatch finishedLatch = new CountDownLatch(1);
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 1L);
+ endOffsets.put(TP1, 1L);
+ consumer.updateEndOffsets(endOffsets);
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ // Trigger exception
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ consumer.setException(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception());
+ }
+ });
+
+ // Should keep polling until it reaches current log end offset for all partitions
+ consumer.scheduleNopPollTask();
+ consumer.scheduleNopPollTask();
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, TP0_KEY, TP0_VALUE_NEW));
+ consumer.addRecord(new ConsumerRecord<>(TOPIC, 1, 0, TP0_KEY, TP0_VALUE_NEW));
+ }
+ });
+
+ consumer.schedulePollTask(new Runnable() {
+ @Override
+ public void run() {
+ finishedLatch.countDown();
+ }
+ });
+ }
+ });
+ store.start();
+ assertTrue(finishedLatch.await(10000, TimeUnit.MILLISECONDS));
+ assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+ assertEquals(1L, consumer.position(TP0));
+
+ store.stop();
+
+ assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+ assertTrue(consumer.closed());
+ PowerMock.verifyAll();
+ }
+
+ @Test
+ public void testProducerError() throws Exception {
+ expectStart();
+ TestFuture<RecordMetadata> tp0Future = new TestFuture<>();
+ ProducerRecord<String, String> tp0Record = new ProducerRecord<>(TOPIC, TP0_KEY, TP0_VALUE);
+ Capture<org.apache.kafka.clients.producer.Callback> callback0 = EasyMock.newCapture();
+ EasyMock.expect(producer.send(EasyMock.eq(tp0Record), EasyMock.capture(callback0))).andReturn(tp0Future);
+
+ expectStop();
+
+ PowerMock.replayAll();
+
+ Map<TopicPartition, Long> endOffsets = new HashMap<>();
+ endOffsets.put(TP0, 0L);
+ endOffsets.put(TP1, 0L);
+ consumer.updateEndOffsets(endOffsets);
+ store.start();
+ assertEquals(CONSUMER_ASSIGNMENT, consumer.assignment());
+ assertEquals(0L, consumer.position(TP0));
+ assertEquals(0L, consumer.position(TP1));
+
+ final AtomicReference<Throwable> setException = new AtomicReference<>();
+ store.send(TP0_KEY, TP0_VALUE, new org.apache.kafka.clients.producer.Callback() {
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ assertNull(setException.get()); // Should only be invoked once
+ setException.set(exception);
+ }
+ });
+ KafkaException exc = new LeaderNotAvailableException("Error");
+ tp0Future.resolve(exc);
+ callback0.getValue().onCompletion(null, exc);
+ assertNotNull(setException.get());
+
+ store.stop();
+
+ assertFalse(Whitebox.<Thread>getInternalState(store, "thread").isAlive());
+ assertTrue(consumer.closed());
+ PowerMock.verifyAll();
+ }
+
+
+ private void expectStart() throws Exception {
+ PowerMock.expectPrivate(store, "createProducer")
+ .andReturn(producer);
+ PowerMock.expectPrivate(store, "createConsumer")
+ .andReturn(consumer);
+ }
+
+ private void expectStop() {
+ producer.close();
+ PowerMock.expectLastCall();
+ // MockConsumer close is checked after test.
+ }
+
+ private static ByteBuffer buffer(String v) {
+ return ByteBuffer.wrap(v.getBytes());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
new file mode 100644
index 0000000..85f6895
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/MockTime.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.util;
+
+import org.apache.kafka.common.utils.Time;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A clock that you can manually advance by calling sleep
+ */
+public class MockTime implements Time {
+
+ private long nanos = 0;
+
+ public MockTime() {
+ this.nanos = System.nanoTime();
+ }
+
+ @Override
+ public long milliseconds() {
+ return TimeUnit.MILLISECONDS.convert(this.nanos, TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public long nanoseconds() {
+ return nanos;
+ }
+
+ @Override
+ public void sleep(long ms) {
+ this.nanos += TimeUnit.NANOSECONDS.convert(ms, TimeUnit.MILLISECONDS);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
new file mode 100644
index 0000000..5c3c224
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ShutdownableThreadTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ShutdownableThreadTest {
+
+ @Test
+ public void testGracefulShutdown() throws InterruptedException {
+ ShutdownableThread thread = new ShutdownableThread("graceful") {
+ @Override
+ public void execute() {
+ while (getRunning()) {
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ }
+ };
+ thread.start();
+ Thread.sleep(10);
+ assertTrue(thread.gracefulShutdown(1000, TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testForcibleShutdown() throws InterruptedException {
+ final CountDownLatch startedLatch = new CountDownLatch(1);
+ ShutdownableThread thread = new ShutdownableThread("forcible") {
+ @Override
+ public void execute() {
+ try {
+ startedLatch.countDown();
+ Thread.sleep(100000);
+ } catch (InterruptedException e) {
+ // Ignore
+ }
+ }
+ };
+ thread.start();
+ startedLatch.await();
+ thread.forceShutdown();
+ // Not all threads can be forcibly stopped since interrupt() doesn't work on threads in
+ // certain conditions, but in this case we know the thread is interruptible so we should be
+ // able join() it
+ thread.join(1000);
+ assertFalse(thread.isAlive());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
new file mode 100644
index 0000000..12bac98
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestBackgroundThreadExceptionHandler.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+/**
+ * An UncaughtExceptionHandler that can be registered with one or more threads which tracks the
+ * first exception so the main thread can check for uncaught exceptions.
+ */
+public class TestBackgroundThreadExceptionHandler implements Thread.UncaughtExceptionHandler {
+ private Throwable firstException = null;
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ if (this.firstException == null)
+ this.firstException = e;
+ }
+
+ public void verifyNoExceptions() {
+ if (this.firstException != null)
+ throw new AssertionError(this.firstException);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
new file mode 100644
index 0000000..3683f91
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TestFuture.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public class TestFuture<T> implements Future<T> {
+ private volatile boolean resolved;
+ private T result;
+ private Throwable exception;
+ private CountDownLatch getCalledLatch;
+
+ private volatile boolean resolveOnGet;
+ private T resolveOnGetResult;
+ private Throwable resolveOnGetException;
+
+ public TestFuture() {
+ resolved = false;
+ getCalledLatch = new CountDownLatch(1);
+
+ resolveOnGet = false;
+ resolveOnGetResult = null;
+ resolveOnGetException = null;
+ }
+
+ public void resolve(T val) {
+ this.result = val;
+ resolved = true;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ public void resolve(Throwable t) {
+ exception = t;
+ resolved = true;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return false;
+ }
+
+ @Override
+ public boolean isDone() {
+ return resolved;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ getCalledLatch.countDown();
+ while (true) {
+ try {
+ return get(Integer.MAX_VALUE, TimeUnit.DAYS);
+ } catch (TimeoutException e) {
+ // ignore
+ }
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ getCalledLatch.countDown();
+
+ if (resolveOnGet) {
+ if (resolveOnGetException != null)
+ resolve(resolveOnGetException);
+ else
+ resolve(resolveOnGetResult);
+ }
+
+ synchronized (this) {
+ while (!resolved) {
+ this.wait(TimeUnit.MILLISECONDS.convert(timeout, unit));
+ }
+ }
+
+ if (exception != null) {
+ if (exception instanceof TimeoutException)
+ throw (TimeoutException) exception;
+ else if (exception instanceof InterruptedException)
+ throw (InterruptedException) exception;
+ else
+ throw new ExecutionException(exception);
+ }
+ return result;
+ }
+
+ /**
+ * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately.
+ * @param val the value to return from the future
+ */
+ public void resolveOnGet(T val) {
+ resolveOnGet = true;
+ resolveOnGetResult = val;
+ }
+
+ /**
+ * Set a flag to resolve the future as soon as one of the get() methods has been called. Returns immediately.
+ * @param t the exception to return from the future
+ */
+ public void resolveOnGet(Throwable t) {
+ resolveOnGet = true;
+ resolveOnGetException = t;
+ }
+
+ /**
+ * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with
+ * the specified value.
+ * @param val the value to return from the future
+ */
+ public void waitForGetAndResolve(T val) {
+ waitForGet();
+ resolve(val);
+ }
+
+ /**
+ * Block, waiting for another thread to call one of the get() methods, and then immediately resolve the future with
+ * the specified value.
+ * @param t the exception to use to resolve the future
+ */
+ public void waitForGetAndResolve(Throwable t) {
+ waitForGet();
+ resolve(t);
+ }
+
+ private void waitForGet() {
+ try {
+ getCalledLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Unexpected interruption: ", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
new file mode 100644
index 0000000..0241ea3
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ThreadedTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.util;
+
+import org.junit.After;
+import org.junit.Before;
+
+/**
+ * Base class for tests that use threads. It sets up uncaught exception handlers for all known
+ * thread classes and checks for errors at the end of the test so that failures in background
+ * threads will cause the test to fail.
+ */
+public class ThreadedTest {
+
+ protected TestBackgroundThreadExceptionHandler backgroundThreadExceptionHandler;
+
+ @Before
+ public void setup() {
+ backgroundThreadExceptionHandler = new TestBackgroundThreadExceptionHandler();
+ ShutdownableThread.funcaughtExceptionHandler = backgroundThreadExceptionHandler;
+ }
+
+ @After
+ public void teardown() {
+ backgroundThreadExceptionHandler.verifyNoExceptions();
+ ShutdownableThread.funcaughtExceptionHandler = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/connect/runtime/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/resources/log4j.properties b/connect/runtime/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d5e90fe
--- /dev/null
+++ b/connect/runtime/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+##
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=ERROR
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
deleted file mode 100644
index 4d0e1bd..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Connector.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * <p>
- * Connectors manage integration of Copycat with another system, either as an input that ingests
- * data into Kafka or an output that passes data to an external system. Implementations should
- * not use this class directly; they should inherit from SourceConnector or SinkConnector.
- * </p>
- * <p>
- * Connectors have two primary tasks. First, given some configuration, they are responsible for
- * creating configurations for a set of {@link Task}s that split up the data processing. For
- * example, a database Connector might create Tasks by dividing the set of tables evenly among
- * tasks. Second, they are responsible for monitoring inputs for changes that require
- * reconfiguration and notifying the Copycat runtime via the ConnectorContext. Continuing the
- * previous example, the connector might periodically check for new tables and notify Copycat of
- * additions and deletions. Copycat will then request new configurations and update the running
- * Tasks.
- * </p>
- */
-@InterfaceStability.Unstable
-public abstract class Connector {
-
- protected ConnectorContext context;
-
- /**
- * Get the version of this connector.
- *
- * @return the version, formatted as a String
- */
- public abstract String version();
-
- /**
- * Initialize this connector, using the provided ConnectorContext to notify the runtime of
- * input configuration changes.
- * @param ctx context object used to interact with the Copycat runtime
- */
- public void initialize(ConnectorContext ctx) {
- context = ctx;
- }
-
- /**
- * <p>
- * Initialize this connector, using the provided ConnectorContext to notify the runtime of
- * input configuration changes and using the provided set of Task configurations.
- * This version is only used to recover from failures.
- * </p>
- * <p>
- * The default implementation ignores the provided Task configurations. During recovery, Copycat will request
- * an updated set of configurations and update the running Tasks appropriately. However, Connectors should
- * implement special handling of this case if it will avoid unnecessary changes to running Tasks.
- * </p>
- *
- * @param ctx context object used to interact with the Copycat runtime
- * @param taskConfigs existing task configurations, which may be used when generating new task configs to avoid
- * churn in partition to task assignments
- */
- public void initialize(ConnectorContext ctx, List<Map<String, String>> taskConfigs) {
- context = ctx;
- // Ignore taskConfigs. May result in more churn of tasks during recovery if updated configs
- // are very different, but reduces the difficulty of implementing a Connector
- }
-
- /**
- * Start this Connector. This method will only be called on a clean Connector, i.e. it has
- * either just been instantiated and initialized or {@link #stop()} has been invoked.
- *
- * @param props configuration settings
- */
- public abstract void start(Map<String, String> props);
-
- /**
- * Reconfigure this Connector. Most implementations will not override this, using the default
- * implementation that calls {@link #stop()} followed by {@link #start(Map)}.
- * Implementations only need to override this if they want to handle this process more
- * efficiently, e.g. without shutting down network connections to the external system.
- *
- * @param props new configuration settings
- */
- public void reconfigure(Map<String, String> props) {
- stop();
- start(props);
- }
-
- /**
- * Returns the Task implementation for this Connector.
- */
- public abstract Class<? extends Task> taskClass();
-
- /**
- * Returns a set of configurations for Tasks based on the current configuration,
- * producing at most count configurations.
- *
- * @param maxTasks maximum number of configurations to generate
- * @return configurations for Tasks
- */
- public abstract List<Map<String, String>> taskConfigs(int maxTasks);
-
- /**
- * Stop this connector.
- */
- public abstract void stop();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
deleted file mode 100644
index ecba69a..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/ConnectorContext.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-/**
- * ConnectorContext allows Connectors to proactively interact with the Copycat runtime.
- */
-@InterfaceStability.Unstable
-public interface ConnectorContext {
- /**
- * Requests that the runtime reconfigure the Tasks for this source. This should be used to
- * indicate to the runtime that something about the input/output has changed (e.g. partitions
- * added/removed) and the running Tasks will need to be modified.
- */
- void requestTaskReconfiguration();
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
deleted file mode 100644
index 0d3e8dc..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/CopycatRecord.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.copycat.data.Schema;
-
-/**
- * <p>
- * Base class for records containing data to be copied to/from Kafka. This corresponds closely to
- * Kafka's ProducerRecord and ConsumerRecord classes, and holds the data that may be used by both
- * sources and sinks (topic, kafkaPartition, key, value). Although both implementations include a
- * notion of offset, it is not included here because they differ in type.
- * </p>
- */
-@InterfaceStability.Unstable
-public abstract class CopycatRecord {
- private final String topic;
- private final Integer kafkaPartition;
- private final Schema keySchema;
- private final Object key;
- private final Schema valueSchema;
- private final Object value;
-
- public CopycatRecord(String topic, Integer kafkaPartition, Schema valueSchema, Object value) {
- this(topic, kafkaPartition, null, null, valueSchema, value);
- }
-
- public CopycatRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value) {
- this.topic = topic;
- this.kafkaPartition = kafkaPartition;
- this.keySchema = keySchema;
- this.key = key;
- this.valueSchema = valueSchema;
- this.value = value;
- }
-
- public String topic() {
- return topic;
- }
-
- public Integer kafkaPartition() {
- return kafkaPartition;
- }
-
- public Object key() {
- return key;
- }
-
- public Schema keySchema() {
- return keySchema;
- }
-
- public Object value() {
- return value;
- }
-
- public Schema valueSchema() {
- return valueSchema;
- }
-
- @Override
- public String toString() {
- return "CopycatRecord{" +
- "topic='" + topic + '\'' +
- ", kafkaPartition=" + kafkaPartition +
- ", key=" + key +
- ", value=" + value +
- '}';
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- CopycatRecord that = (CopycatRecord) o;
-
- if (kafkaPartition != null ? !kafkaPartition.equals(that.kafkaPartition) : that.kafkaPartition != null)
- return false;
- if (topic != null ? !topic.equals(that.topic) : that.topic != null)
- return false;
- if (keySchema != null ? !keySchema.equals(that.keySchema) : that.keySchema != null)
- return false;
- if (key != null ? !key.equals(that.key) : that.key != null)
- return false;
- if (valueSchema != null ? !valueSchema.equals(that.valueSchema) : that.valueSchema != null)
- return false;
- if (value != null ? !value.equals(that.value) : that.value != null)
- return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = topic != null ? topic.hashCode() : 0;
- result = 31 * result + (kafkaPartition != null ? kafkaPartition.hashCode() : 0);
- result = 31 * result + (keySchema != null ? keySchema.hashCode() : 0);
- result = 31 * result + (key != null ? key.hashCode() : 0);
- result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
- result = 31 * result + (value != null ? value.hashCode() : 0);
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f2031d40/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
----------------------------------------------------------------------
diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java b/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
deleted file mode 100644
index cb8b719..0000000
--- a/copycat/api/src/main/java/org/apache/kafka/copycat/connector/Task.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.copycat.connector;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Map;
-
-/**
- * <p>
- * Tasks contain the code that actually copies data to/from another system. They receive
- * a configuration from their parent Connector, assigning them a fraction of a Copycat job's work.
- * The Copycat framework then pushes/pulls data from the Task. The Task must also be able to
- * respond to reconfiguration requests.
- * </p>
- * <p>
- * Task only contains the minimal shared functionality between
- * {@link org.apache.kafka.copycat.source.SourceTask} and
- * {@link org.apache.kafka.copycat.sink.SinkTask}.
- * </p>
- */
-@InterfaceStability.Unstable
-public interface Task {
- /**
- * Get the version of this task. Usually this should be the same as the corresponding {@link Connector} class's version.
- *
- * @return the version, formatted as a String
- */
- String version();
-
- /**
- * Start the Task
- * @param props initial configuration
- */
- void start(Map<String, String> props);
-
- /**
- * Stop this task.
- */
- void stop();
-}