You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/11/20 05:32:25 UTC
[kafka] branch 2.3 updated: KAFKA-9051: Prematurely complete source
offset read requests for stopped tasks (#7532)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new a161ecd KAFKA-9051: Prematurely complete source offset read requests for stopped tasks (#7532)
a161ecd is described below
commit a161ecda2421f722e59c824c6c10b17cbbe5a898
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Nov 19 21:18:21 2019 -0800
KAFKA-9051: Prematurely complete source offset read requests for stopped tasks (#7532)
Prematurely complete source offset read requests for stopped tasks, and added unit tests.
Author: Chris Egerton <ch...@confluent.io>
Reviewers: Arjun Satish <ar...@confluent.io>, Nigel Liang <ni...@nigelliang.com>, Jinxin Liu <li...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
.../org/apache/kafka/connect/runtime/Worker.java | 4 +-
.../kafka/connect/runtime/WorkerSourceTask.java | 12 +-
.../storage/CloseableOffsetStorageReader.java | 33 +++
.../connect/storage/KafkaOffsetBackingStore.java | 7 +-
.../connect/storage/MemoryOffsetBackingStore.java | 6 +-
.../kafka/connect/storage/OffsetBackingStore.java | 8 +-
.../connect/storage/OffsetStorageReaderImpl.java | 51 ++++-
.../connect/util/ConvertingFutureCallback.java | 58 ++++-
.../connect/runtime/ErrorHandlingTaskTest.java | 4 +-
.../connect/runtime/WorkerSourceTaskTest.java | 18 +-
.../storage/FileOffsetBackingStoreTest.java | 14 +-
.../storage/KafkaOffsetBackingStoreTest.java | 54 ++---
.../connect/util/ConvertingFutureCallbackTest.java | 242 +++++++++++++++++++++
13 files changed, 420 insertions(+), 91 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3d4479c..27c13de 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -50,10 +50,10 @@ import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -506,7 +506,7 @@ public class Worker {
retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
- OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
+ CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
internalKeyConverter, internalValueConverter);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index acdb287..72829e3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -39,9 +39,9 @@ import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectUtils;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -75,7 +75,7 @@ class WorkerSourceTask extends WorkerTask {
private final HeaderConverter headerConverter;
private final TransformationChain<SourceRecord> transformationChain;
private KafkaProducer<byte[], byte[]> producer;
- private final OffsetStorageReader offsetReader;
+ private final CloseableOffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final Time time;
private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
@@ -105,7 +105,7 @@ class WorkerSourceTask extends WorkerTask {
HeaderConverter headerConverter,
TransformationChain<SourceRecord> transformationChain,
KafkaProducer<byte[], byte[]> producer,
- OffsetStorageReader offsetReader,
+ CloseableOffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
ClusterConfigState configState,
@@ -173,6 +173,12 @@ class WorkerSourceTask extends WorkerTask {
}
@Override
+ public void cancel() {
+ super.cancel();
+ offsetReader.close();
+ }
+
+ @Override
public void stop() {
super.stop();
stopRequestedLatch.countDown();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java
new file mode 100644
index 0000000..b902739
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public interface CloseableOffsetStorageReader extends Closeable, OffsetStorageReader {
+
+ /**
+ * {@link Future#cancel(boolean) Cancel} all outstanding offset read requests, and throw an
+ * exception in all current and future calls to {@link #offsets(Collection)} and
+ * {@link #offset(Map)}. This is useful for unblocking task threads which need to shut down but
+ * are blocked on offset reads.
+ */
+ void close();
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 195c498..22bcb7a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -118,9 +118,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
}
@Override
- public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys,
- final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
- ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
+ public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
+ ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> convert(Void result) {
Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
@@ -230,6 +229,4 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
return null;
}
}
-
-
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index ab8130b..72439e7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -75,9 +75,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
}
@Override
- public Future<Map<ByteBuffer, ByteBuffer>> get(
- final Collection<ByteBuffer> keys,
- final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+ public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
@Override
public Map<ByteBuffer, ByteBuffer> call() throws Exception {
@@ -85,8 +83,6 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
for (ByteBuffer key : keys) {
result.put(key, data.get(key));
}
- if (callback != null)
- callback.onCompletion(null, result);
return result;
}
});
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
index 9998164..1e4375b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
@@ -53,12 +53,9 @@ public interface OffsetBackingStore {
/**
* Get the values for the specified keys
* @param keys list of keys to look up
- * @param callback callback to invoke on completion
* @return future for the resulting map from key to value
*/
- Future<Map<ByteBuffer, ByteBuffer>> get(
- Collection<ByteBuffer> keys,
- Callback<Map<ByteBuffer, ByteBuffer>> callback);
+ Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys);
/**
* Set the specified keys and values.
@@ -66,8 +63,7 @@ public interface OffsetBackingStore {
* @param callback callback to invoke on completion
* @return void future for the operation
*/
- Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
- Callback<Void> callback);
+ Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback);
/**
* Configure class with the given key-value pairs
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index 9f926dc..a1eea43 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -26,20 +26,27 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented
* directly, the interface is only separate from this implementation because it needs to be
* included in the public API package.
*/
-public class OffsetStorageReaderImpl implements OffsetStorageReader {
+public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
private final OffsetBackingStore backingStore;
private final String namespace;
private final Converter keyConverter;
private final Converter valueConverter;
+ private final AtomicBoolean closed;
+ private final Set<Future<Map<ByteBuffer, ByteBuffer>>> offsetReadFutures;
public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
Converter keyConverter, Converter valueConverter) {
@@ -47,6 +54,8 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
this.namespace = namespace;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
+ this.closed = new AtomicBoolean(false);
+ this.offsetReadFutures = new HashSet<>();
}
@Override
@@ -76,7 +85,30 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
// Get serialized key -> serialized value from backing store
Map<ByteBuffer, ByteBuffer> raw;
try {
- raw = backingStore.get(serializedToOriginal.keySet(), null).get();
+ Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+ synchronized (offsetReadFutures) {
+ if (closed.get()) {
+ throw new ConnectException(
+ "Offset reader is closed. This is likely because the task has already been "
+ + "scheduled to stop but has taken longer than the graceful shutdown "
+ + "period to do so.");
+ }
+ offsetReadFuture = backingStore.get(serializedToOriginal.keySet());
+ offsetReadFutures.add(offsetReadFuture);
+ }
+
+ try {
+ raw = offsetReadFuture.get();
+ } catch (CancellationException e) {
+ throw new ConnectException(
+ "Offset reader closed while attempting to read offsets. This is likely because "
+ + "the task was been scheduled to stop but has taken longer than the "
+ + "graceful shutdown period to do so.");
+ } finally {
+ synchronized (offsetReadFutures) {
+ offsetReadFutures.remove(offsetReadFuture);
+ }
+ }
} catch (Exception e) {
log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
throw new ConnectException("Failed to fetch offsets.", e);
@@ -108,4 +140,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
return result;
}
+
+ public void close() {
+ if (!closed.getAndSet(true)) {
+ synchronized (offsetReadFutures) {
+ for (Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture : offsetReadFutures) {
+ try {
+ offsetReadFuture.cancel(true);
+ } catch (Throwable t) {
+ log.error("Failed to cancel offset read future", t);
+ }
+ }
+ offsetReadFutures.clear();
+ }
+ }
+ }
}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
index d5abed9..e15c38e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
@@ -16,6 +16,9 @@
*/
package org.apache.kafka.connect.util;
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -24,10 +27,15 @@ import java.util.concurrent.TimeoutException;
public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> {
- private Callback<T> underlying;
- private CountDownLatch finishedLatch;
- private T result = null;
- private Throwable exception = null;
+ private final Callback<T> underlying;
+ private final CountDownLatch finishedLatch;
+ private volatile T result = null;
+ private volatile Throwable exception = null;
+ private volatile boolean cancelled = false;
+
+ public ConvertingFutureCallback() {
+ this(null);
+ }
public ConvertingFutureCallback(Callback<T> underlying) {
this.underlying = underlying;
@@ -38,21 +46,46 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
@Override
public void onCompletion(Throwable error, U result) {
- this.exception = error;
- this.result = convert(result);
- if (underlying != null)
- underlying.onCompletion(error, this.result);
- finishedLatch.countDown();
+ synchronized (this) {
+ if (isDone()) {
+ return;
+ }
+
+ if (error != null) {
+ this.exception = error;
+ } else {
+ this.result = convert(result);
+ }
+
+ if (underlying != null)
+ underlying.onCompletion(error, this.result);
+ finishedLatch.countDown();
+ }
}
@Override
- public boolean cancel(boolean b) {
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ synchronized (this) {
+ if (isDone()) {
+ return false;
+ }
+ if (mayInterruptIfRunning) {
+ this.cancelled = true;
+ finishedLatch.countDown();
+ return true;
+ }
+ }
+ try {
+ finishedLatch.await();
+ } catch (InterruptedException e) {
+ throw new ConnectException("Interrupted while waiting for task to complete", e);
+ }
return false;
}
@Override
public boolean isCancelled() {
- return false;
+ return cancelled;
}
@Override
@@ -75,6 +108,9 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
}
private T result() throws ExecutionException {
+ if (cancelled) {
+ throw new CancellationException();
+ }
if (exception != null) {
throw new ExecutionException(exception);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 5d223f4..428b3e4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -46,7 +46,7 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -127,7 +127,7 @@ public class ErrorHandlingTaskTest {
private KafkaProducer<byte[], byte[]> producer;
@Mock
- OffsetStorageReader offsetReader;
+ OffsetStorageReaderImpl offsetReader;
@Mock
OffsetStorageWriter offsetWriter;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 137cbc2..9c2dbe5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -35,9 +35,9 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -107,7 +107,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
@Mock private HeaderConverter headerConverter;
@Mock private TransformationChain<SourceRecord> transformationChain;
@Mock private KafkaProducer<byte[], byte[]> producer;
- @Mock private OffsetStorageReader offsetReader;
+ @Mock private CloseableOffsetStorageReader offsetReader;
@Mock private OffsetStorageWriter offsetWriter;
@Mock private ClusterConfigState clusterConfigState;
private WorkerSourceTask workerTask;
@@ -683,6 +683,20 @@ public class WorkerSourceTaskTest extends ThreadedTest {
}
@Test
+ public void testCancel() {
+ createWorkerTask();
+
+ offsetReader.close();
+ PowerMock.expectLastCall();
+
+ PowerMock.replayAll();
+
+ workerTask.cancel();
+
+ PowerMock.verifyAll();
+ }
+
+ @Test
public void testMetricsGroup() {
SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
index df955f8..1fdb91a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
@@ -71,12 +71,11 @@ public class FileOffsetBackingStoreTest {
@Test
public void testGetSet() throws Exception {
Callback<Void> setCallback = expectSuccessfulSetCallback();
- Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
PowerMock.replayAll();
store.set(firstSet, setCallback).get();
- Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
+ Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad"))).get();
assertEquals(buffer("value"), values.get(buffer("key")));
assertEquals(null, values.get(buffer("bad")));
@@ -86,7 +85,6 @@ public class FileOffsetBackingStoreTest {
@Test
public void testSaveRestore() throws Exception {
Callback<Void> setCallback = expectSuccessfulSetCallback();
- Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
PowerMock.replayAll();
store.set(firstSet, setCallback).get();
@@ -96,7 +94,7 @@ public class FileOffsetBackingStoreTest {
FileOffsetBackingStore restore = new FileOffsetBackingStore();
restore.configure(config);
restore.start();
- Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
+ Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key"))).get();
assertEquals(buffer("value"), values.get(buffer("key")));
PowerMock.verifyAll();
@@ -113,12 +111,4 @@ public class FileOffsetBackingStoreTest {
PowerMock.expectLastCall();
return setCallback;
}
-
- @SuppressWarnings("unchecked")
- private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
- Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
- getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
- PowerMock.expectLastCall();
- return getCallback;
- }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index ff9f2c9..4a05bc5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -217,17 +217,10 @@ public class KafkaOffsetBackingStoreTest {
store.start();
// Getting from empty store should return nulls
- final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
- store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
- @Override
- public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
- // Since we didn't read them yet, these will be null
- assertEquals(null, result.get(TP0_KEY));
- assertEquals(null, result.get(TP1_KEY));
- getInvokedAndPassed.set(true);
- }
- }).get(10000, TimeUnit.MILLISECONDS);
- assertTrue(getInvokedAndPassed.get());
+ Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+ // Since we didn't read them yet, these will be null
+ assertNull(offsets.get(TP0_KEY));
+ assertNull(offsets.get(TP1_KEY));
// Set some offsets
Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
@@ -250,28 +243,14 @@ public class KafkaOffsetBackingStoreTest {
assertTrue(invoked.get());
// Getting data should read to end of our published data and return it
- final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
- store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
- @Override
- public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
- assertEquals(TP0_VALUE, result.get(TP0_KEY));
- assertEquals(TP1_VALUE, result.get(TP1_KEY));
- secondGetInvokedAndPassed.set(true);
- }
- }).get(10000, TimeUnit.MILLISECONDS);
- assertTrue(secondGetInvokedAndPassed.get());
+ offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+ assertEquals(TP0_VALUE, offsets.get(TP0_KEY));
+ assertEquals(TP1_VALUE, offsets.get(TP1_KEY));
// Getting data should read to end of our published data and return it
- final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false);
- store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
- @Override
- public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
- assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
- assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
- thirdGetInvokedAndPassed.set(true);
- }
- }).get(10000, TimeUnit.MILLISECONDS);
- assertTrue(thirdGetInvokedAndPassed.get());
+ offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+ assertEquals(TP0_VALUE_NEW, offsets.get(TP0_KEY));
+ assertEquals(TP1_VALUE_NEW, offsets.get(TP1_KEY));
store.stop();
@@ -329,16 +308,9 @@ public class KafkaOffsetBackingStoreTest {
assertTrue(invoked.get());
// Getting data should read to end of our published data and return it
- final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
- store.get(Arrays.asList(null, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
- @Override
- public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
- assertEquals(TP0_VALUE, result.get(null));
- assertNull(result.get(TP1_KEY));
- secondGetInvokedAndPassed.set(true);
- }
- }).get(10000, TimeUnit.MILLISECONDS);
- assertTrue(secondGetInvokedAndPassed.get());
+ Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(null, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+ assertEquals(TP0_VALUE, offsets.get(null));
+ assertNull(offsets.get(TP1_KEY));
store.stop();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java
new file mode 100644
index 0000000..9535003
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConvertingFutureCallbackTest {
+
+ private ExecutorService executor;
+
+ @Before
+ public void setup() {
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ @Test
+ public void shouldConvertBeforeGetOnSuccessfulCompletion() throws Exception {
+ final Object expectedConversion = new Object();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ testCallback.onCompletion(null, expectedConversion);
+ assertEquals(1, testCallback.numberOfConversions());
+ assertEquals(expectedConversion, testCallback.get());
+ }
+
+ @Test
+ public void shouldConvertOnlyOnceBeforeGetOnSuccessfulCompletion() throws Exception {
+ final Object expectedConversion = new Object();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ testCallback.onCompletion(null, expectedConversion);
+ testCallback.onCompletion(null, 69);
+ testCallback.cancel(true);
+ testCallback.onCompletion(new RuntimeException(), null);
+ assertEquals(1, testCallback.numberOfConversions());
+ assertEquals(expectedConversion, testCallback.get());
+ }
+
+ @Test
+ public void shouldNotConvertBeforeGetOnFailedCompletion() throws Exception {
+ final Throwable expectedError = new Throwable();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ testCallback.onCompletion(expectedError, null);
+ assertEquals(0, testCallback.numberOfConversions());
+ try {
+ testCallback.get();
+ fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ assertEquals(expectedError, e.getCause());
+ }
+ }
+
+ @Test
+ public void shouldRecordOnlyFirstErrorBeforeGetOnFailedCompletion() throws Exception {
+ final Throwable expectedError = new Throwable();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ testCallback.onCompletion(expectedError, null);
+ testCallback.onCompletion(new RuntimeException(), null);
+ testCallback.cancel(true);
+ testCallback.onCompletion(null, "420");
+ assertEquals(0, testCallback.numberOfConversions());
+ try {
+ testCallback.get();
+ fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ assertEquals(expectedError, e.getCause());
+ }
+ }
+
+ @Test(expected = CancellationException.class)
+ public void shouldCancelBeforeGetIfMayCancelWhileRunning() throws Exception {
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ assertTrue(testCallback.cancel(true));
+ testCallback.get();
+ }
+
+ @Test
+ public void shouldBlockUntilSuccessfulCompletion() throws Exception {
+ AtomicReference<Exception> testThreadException = new AtomicReference<>();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ final Object expectedConversion = new Object();
+ executor.submit(() -> {
+ try {
+ testCallback.waitForGet();
+ testCallback.onCompletion(null, expectedConversion);
+ } catch (Exception e) {
+ testThreadException.compareAndSet(null, e);
+ }
+ });
+ assertFalse(testCallback.isDone());
+ assertEquals(expectedConversion, testCallback.get());
+ assertEquals(1, testCallback.numberOfConversions());
+ assertTrue(testCallback.isDone());
+ if (testThreadException.get() != null) {
+ throw testThreadException.get();
+ }
+ }
+
+ @Test
+ public void shouldBlockUntilFailedCompletion() throws Exception {
+ AtomicReference<Exception> testThreadException = new AtomicReference<>();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ final Throwable expectedError = new Throwable();
+ executor.submit(() -> {
+ try {
+ testCallback.waitForGet();
+ testCallback.onCompletion(expectedError, null);
+ } catch (Exception e) {
+ testThreadException.compareAndSet(null, e);
+ }
+ });
+ assertFalse(testCallback.isDone());
+ try {
+ testCallback.get();
+ fail("Expected ExecutionException");
+ } catch (ExecutionException e) {
+ assertEquals(expectedError, e.getCause());
+ }
+ assertEquals(0, testCallback.numberOfConversions());
+ assertTrue(testCallback.isDone());
+ if (testThreadException.get() != null) {
+ throw testThreadException.get();
+ }
+ }
+
+ @Test(expected = CancellationException.class)
+ public void shouldBlockUntilCancellation() throws Exception {
+ AtomicReference<Exception> testThreadException = new AtomicReference<>();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ executor.submit(() -> {
+ try {
+ testCallback.waitForGet();
+ testCallback.cancel(true);
+ } catch (Exception e) {
+ testThreadException.compareAndSet(null, e);
+ }
+ });
+ assertFalse(testCallback.isDone());
+ testCallback.get();
+ if (testThreadException.get() != null) {
+ throw testThreadException.get();
+ }
+ }
+
+ @Test
+ public void shouldNotCancelIfMayNotCancelWhileRunning() throws Exception {
+ AtomicReference<Exception> testThreadException = new AtomicReference<>();
+ TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+ final Object expectedConversion = new Object();
+ executor.submit(() -> {
+ try {
+ testCallback.waitForCancel();
+ testCallback.onCompletion(null, expectedConversion);
+ } catch (Exception e) {
+ testThreadException.compareAndSet(null, e);
+ }
+ });
+ assertFalse(testCallback.isCancelled());
+ assertFalse(testCallback.isDone());
+ testCallback.cancel(false);
+ assertFalse(testCallback.isCancelled());
+ assertTrue(testCallback.isDone());
+ assertEquals(expectedConversion, testCallback.get());
+ assertEquals(1, testCallback.numberOfConversions());
+ if (testThreadException.get() != null) {
+ throw testThreadException.get();
+ }
+ }
+
+ protected static class TestConvertingFutureCallback extends ConvertingFutureCallback<Object, Object> {
+ private AtomicInteger numberOfConversions = new AtomicInteger();
+ private CountDownLatch getInvoked = new CountDownLatch(1);
+ private CountDownLatch cancelInvoked = new CountDownLatch(1);
+
+ public int numberOfConversions() {
+ return numberOfConversions.get();
+ }
+
+ public void waitForGet() throws InterruptedException {
+ getInvoked.await();
+ }
+
+ public void waitForCancel() throws InterruptedException {
+ cancelInvoked.await();
+ }
+
+ @Override
+ public Object convert(Object result) {
+ numberOfConversions.incrementAndGet();
+ return result;
+ }
+
+ @Override
+ public Object get() throws InterruptedException, ExecutionException {
+ getInvoked.countDown();
+ return super.get();
+ }
+
+ @Override
+ public Object get(
+ long duration,
+ TimeUnit unit
+ ) throws InterruptedException, ExecutionException, TimeoutException {
+ getInvoked.countDown();
+ return super.get(duration, unit);
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ cancelInvoked.countDown();
+ return super.cancel(mayInterruptIfRunning);
+ }
+ }
+}