You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/11/20 05:32:25 UTC

[kafka] branch 2.3 updated: KAFKA-9051: Prematurely complete source offset read requests for stopped tasks (#7532)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new a161ecd  KAFKA-9051: Prematurely complete source offset read requests for stopped tasks (#7532)
a161ecd is described below

commit a161ecda2421f722e59c824c6c10b17cbbe5a898
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Tue Nov 19 21:18:21 2019 -0800

    KAFKA-9051: Prematurely complete source offset read requests for stopped tasks (#7532)
    
    Prematurely complete source offset read requests for stopped tasks, and added unit tests.
    
    Author: Chris Egerton <ch...@confluent.io>
    Reviewers: Arjun Satish <ar...@confluent.io>, Nigel Liang <ni...@nigelliang.com>, Jinxin Liu <li...@gmail.com>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |   4 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    |  12 +-
 .../storage/CloseableOffsetStorageReader.java      |  33 +++
 .../connect/storage/KafkaOffsetBackingStore.java   |   7 +-
 .../connect/storage/MemoryOffsetBackingStore.java  |   6 +-
 .../kafka/connect/storage/OffsetBackingStore.java  |   8 +-
 .../connect/storage/OffsetStorageReaderImpl.java   |  51 ++++-
 .../connect/util/ConvertingFutureCallback.java     |  58 ++++-
 .../connect/runtime/ErrorHandlingTaskTest.java     |   4 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |  18 +-
 .../storage/FileOffsetBackingStoreTest.java        |  14 +-
 .../storage/KafkaOffsetBackingStoreTest.java       |  54 ++---
 .../connect/util/ConvertingFutureCallbackTest.java | 242 +++++++++++++++++++++
 13 files changed, 420 insertions(+), 91 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 3d4479c..27c13de 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -50,10 +50,10 @@ import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -506,7 +506,7 @@ public class Worker {
             retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics));
             TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
-            OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
+            CloseableOffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index acdb287..72829e3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -39,9 +39,9 @@ import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -75,7 +75,7 @@ class WorkerSourceTask extends WorkerTask {
     private final HeaderConverter headerConverter;
     private final TransformationChain<SourceRecord> transformationChain;
     private KafkaProducer<byte[], byte[]> producer;
-    private final OffsetStorageReader offsetReader;
+    private final CloseableOffsetStorageReader offsetReader;
     private final OffsetStorageWriter offsetWriter;
     private final Time time;
     private final SourceTaskMetricsGroup sourceTaskMetricsGroup;
@@ -105,7 +105,7 @@ class WorkerSourceTask extends WorkerTask {
                             HeaderConverter headerConverter,
                             TransformationChain<SourceRecord> transformationChain,
                             KafkaProducer<byte[], byte[]> producer,
-                            OffsetStorageReader offsetReader,
+                            CloseableOffsetStorageReader offsetReader,
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
                             ClusterConfigState configState,
@@ -173,6 +173,12 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
+    public void cancel() {
+        super.cancel();
+        offsetReader.close();
+    }
+
+    @Override
     public void stop() {
         super.stop();
         stopRequestedLatch.countDown();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java
new file mode 100644
index 0000000..b902739
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/CloseableOffsetStorageReader.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+public interface CloseableOffsetStorageReader extends Closeable, OffsetStorageReader {
+
+    /**
+     * {@link Future#cancel(boolean) Cancel} all outstanding offset read requests, and throw an
+     * exception in all current and future calls to {@link #offsets(Collection)} and
+     * {@link #offset(Map)}. This is useful for unblocking task threads which need to shut down but
+     * are blocked on offset reads.
+     */
+    void close();
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
index 195c498..22bcb7a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStore.java
@@ -118,9 +118,8 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
     }
 
     @Override
-    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys,
-                                                   final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
-        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>(callback) {
+    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
+        ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>> future = new ConvertingFutureCallback<Void, Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public Map<ByteBuffer, ByteBuffer> convert(Void result) {
                 Map<ByteBuffer, ByteBuffer> values = new HashMap<>();
@@ -230,6 +229,4 @@ public class KafkaOffsetBackingStore implements OffsetBackingStore {
             return null;
         }
     }
-
-
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
index ab8130b..72439e7 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/MemoryOffsetBackingStore.java
@@ -75,9 +75,7 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
     }
 
     @Override
-    public Future<Map<ByteBuffer, ByteBuffer>> get(
-            final Collection<ByteBuffer> keys,
-            final Callback<Map<ByteBuffer, ByteBuffer>> callback) {
+    public Future<Map<ByteBuffer, ByteBuffer>> get(final Collection<ByteBuffer> keys) {
         return executor.submit(new Callable<Map<ByteBuffer, ByteBuffer>>() {
             @Override
             public Map<ByteBuffer, ByteBuffer> call() throws Exception {
@@ -85,8 +83,6 @@ public class MemoryOffsetBackingStore implements OffsetBackingStore {
                 for (ByteBuffer key : keys) {
                     result.put(key, data.get(key));
                 }
-                if (callback != null)
-                    callback.onCompletion(null, result);
                 return result;
             }
         });
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
index 9998164..1e4375b 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetBackingStore.java
@@ -53,12 +53,9 @@ public interface OffsetBackingStore {
     /**
      * Get the values for the specified keys
      * @param keys list of keys to look up
-     * @param callback callback to invoke on completion
      * @return future for the resulting map from key to value
      */
-    Future<Map<ByteBuffer, ByteBuffer>> get(
-            Collection<ByteBuffer> keys,
-            Callback<Map<ByteBuffer, ByteBuffer>> callback);
+    Future<Map<ByteBuffer, ByteBuffer>> get(Collection<ByteBuffer> keys);
 
     /**
      * Set the specified keys and values.
@@ -66,8 +63,7 @@ public interface OffsetBackingStore {
      * @param callback callback to invoke on completion
      * @return void future for the operation
      */
-    Future<Void> set(Map<ByteBuffer, ByteBuffer> values,
-                            Callback<Void> callback);
+    Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callback);
 
     /**
      * Configure class with the given key-value pairs
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
index 9f926dc..a1eea43 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageReaderImpl.java
@@ -26,20 +26,27 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Implementation of OffsetStorageReader. Unlike OffsetStorageWriter which is implemented
  * directly, the interface is only separate from this implementation because it needs to be
  * included in the public API package.
  */
-public class OffsetStorageReaderImpl implements OffsetStorageReader {
+public class OffsetStorageReaderImpl implements CloseableOffsetStorageReader {
     private static final Logger log = LoggerFactory.getLogger(OffsetStorageReaderImpl.class);
 
     private final OffsetBackingStore backingStore;
     private final String namespace;
     private final Converter keyConverter;
     private final Converter valueConverter;
+    private final AtomicBoolean closed;
+    private final Set<Future<Map<ByteBuffer, ByteBuffer>>> offsetReadFutures;
 
     public OffsetStorageReaderImpl(OffsetBackingStore backingStore, String namespace,
                                    Converter keyConverter, Converter valueConverter) {
@@ -47,6 +54,8 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
         this.namespace = namespace;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
+        this.closed = new AtomicBoolean(false);
+        this.offsetReadFutures = new HashSet<>();
     }
 
     @Override
@@ -76,7 +85,30 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
         // Get serialized key -> serialized value from backing store
         Map<ByteBuffer, ByteBuffer> raw;
         try {
-            raw = backingStore.get(serializedToOriginal.keySet(), null).get();
+            Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture;
+            synchronized (offsetReadFutures) {
+                if (closed.get()) {
+                    throw new ConnectException(
+                        "Offset reader is closed. This is likely because the task has already been "
+                            + "scheduled to stop but has taken longer than the graceful shutdown "
+                            + "period to do so.");
+                }
+                offsetReadFuture = backingStore.get(serializedToOriginal.keySet());
+                offsetReadFutures.add(offsetReadFuture);
+            }
+
+            try {
+                raw = offsetReadFuture.get();
+            } catch (CancellationException e) {
+                throw new ConnectException(
+                    "Offset reader closed while attempting to read offsets. This is likely because "
+                        + "the task was been scheduled to stop but has taken longer than the "
+                        + "graceful shutdown period to do so.");
+            } finally {
+                synchronized (offsetReadFutures) {
+                    offsetReadFutures.remove(offsetReadFuture);
+                }
+            }
         } catch (Exception e) {
             log.error("Failed to fetch offsets from namespace {}: ", namespace, e);
             throw new ConnectException("Failed to fetch offsets.", e);
@@ -108,4 +140,19 @@ public class OffsetStorageReaderImpl implements OffsetStorageReader {
 
         return result;
     }
+
+    public void close() {
+        if (!closed.getAndSet(true)) {
+            synchronized (offsetReadFutures) {
+                for (Future<Map<ByteBuffer, ByteBuffer>> offsetReadFuture : offsetReadFutures) {
+                    try {
+                        offsetReadFuture.cancel(true);
+                    } catch (Throwable t) {
+                        log.error("Failed to cancel offset read future", t);
+                    }
+                }
+                offsetReadFutures.clear();
+            }
+        }
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
index d5abed9..e15c38e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java
@@ -16,6 +16,9 @@
  */
 package org.apache.kafka.connect.util;
 
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -24,10 +27,15 @@ import java.util.concurrent.TimeoutException;
 
 public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Future<T> {
 
-    private Callback<T> underlying;
-    private CountDownLatch finishedLatch;
-    private T result = null;
-    private Throwable exception = null;
+    private final Callback<T> underlying;
+    private final CountDownLatch finishedLatch;
+    private volatile T result = null;
+    private volatile Throwable exception = null;
+    private volatile boolean cancelled = false;
+
+    public ConvertingFutureCallback() {
+        this(null);
+    }
 
     public ConvertingFutureCallback(Callback<T> underlying) {
         this.underlying = underlying;
@@ -38,21 +46,46 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
 
     @Override
     public void onCompletion(Throwable error, U result) {
-        this.exception = error;
-        this.result = convert(result);
-        if (underlying != null)
-            underlying.onCompletion(error, this.result);
-        finishedLatch.countDown();
+        synchronized (this) {
+            if (isDone()) {
+                return;
+            }
+            
+            if (error != null) {
+                this.exception = error;
+            } else {
+                this.result = convert(result);
+            }
+
+            if (underlying != null)
+                underlying.onCompletion(error, this.result);
+            finishedLatch.countDown();
+        }
     }
 
     @Override
-    public boolean cancel(boolean b) {
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        synchronized (this) {
+            if (isDone()) {
+                return false;
+            }
+            if (mayInterruptIfRunning) {
+                this.cancelled = true;
+                finishedLatch.countDown();
+                return true;
+            }
+        }
+        try {
+            finishedLatch.await();
+        } catch (InterruptedException e) {
+            throw new ConnectException("Interrupted while waiting for task to complete", e);
+        }
         return false;
     }
 
     @Override
     public boolean isCancelled() {
-        return false;
+        return cancelled;
     }
 
     @Override
@@ -75,6 +108,9 @@ public abstract class ConvertingFutureCallback<U, T> implements Callback<U>, Fut
     }
 
     private T result() throws ExecutionException {
+        if (cancelled) {
+            throw new CancellationException();
+        }
         if (exception != null) {
             throw new ExecutionException(exception);
         }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 5d223f4..428b3e4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -46,7 +46,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
+import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.util.SimpleConfig;
@@ -127,7 +127,7 @@ public class ErrorHandlingTaskTest {
     private KafkaProducer<byte[], byte[]> producer;
 
     @Mock
-    OffsetStorageReader offsetReader;
+    OffsetStorageReaderImpl offsetReader;
     @Mock
     OffsetStorageWriter offsetWriter;
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 137cbc2..9c2dbe5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -35,9 +35,9 @@ import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
+import org.apache.kafka.connect.storage.CloseableOffsetStorageReader;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.HeaderConverter;
-import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
@@ -107,7 +107,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Mock private HeaderConverter headerConverter;
     @Mock private TransformationChain<SourceRecord> transformationChain;
     @Mock private KafkaProducer<byte[], byte[]> producer;
-    @Mock private OffsetStorageReader offsetReader;
+    @Mock private CloseableOffsetStorageReader offsetReader;
     @Mock private OffsetStorageWriter offsetWriter;
     @Mock private ClusterConfigState clusterConfigState;
     private WorkerSourceTask workerTask;
@@ -683,6 +683,20 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     @Test
+    public void testCancel() {
+        createWorkerTask();
+
+        offsetReader.close();
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        workerTask.cancel();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testMetricsGroup() {
         SourceTaskMetricsGroup group = new SourceTaskMetricsGroup(taskId, metrics);
         SourceTaskMetricsGroup group1 = new SourceTaskMetricsGroup(taskId1, metrics);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
index df955f8..1fdb91a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/FileOffsetBackingStoreTest.java
@@ -71,12 +71,11 @@ public class FileOffsetBackingStoreTest {
     @Test
     public void testGetSet() throws Exception {
         Callback<Void> setCallback = expectSuccessfulSetCallback();
-        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
         PowerMock.replayAll();
 
         store.set(firstSet, setCallback).get();
 
-        Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad")), getCallback).get();
+        Map<ByteBuffer, ByteBuffer> values = store.get(Arrays.asList(buffer("key"), buffer("bad"))).get();
         assertEquals(buffer("value"), values.get(buffer("key")));
         assertEquals(null, values.get(buffer("bad")));
 
@@ -86,7 +85,6 @@ public class FileOffsetBackingStoreTest {
     @Test
     public void testSaveRestore() throws Exception {
         Callback<Void> setCallback = expectSuccessfulSetCallback();
-        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = expectSuccessfulGetCallback();
         PowerMock.replayAll();
 
         store.set(firstSet, setCallback).get();
@@ -96,7 +94,7 @@ public class FileOffsetBackingStoreTest {
         FileOffsetBackingStore restore = new FileOffsetBackingStore();
         restore.configure(config);
         restore.start();
-        Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key")), getCallback).get();
+        Map<ByteBuffer, ByteBuffer> values = restore.get(Arrays.asList(buffer("key"))).get();
         assertEquals(buffer("value"), values.get(buffer("key")));
 
         PowerMock.verifyAll();
@@ -113,12 +111,4 @@ public class FileOffsetBackingStoreTest {
         PowerMock.expectLastCall();
         return setCallback;
     }
-
-    @SuppressWarnings("unchecked")
-    private Callback<Map<ByteBuffer, ByteBuffer>> expectSuccessfulGetCallback() {
-        Callback<Map<ByteBuffer, ByteBuffer>> getCallback = PowerMock.createMock(Callback.class);
-        getCallback.onCompletion(EasyMock.isNull(Throwable.class), EasyMock.anyObject(Map.class));
-        PowerMock.expectLastCall();
-        return getCallback;
-    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
index ff9f2c9..4a05bc5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaOffsetBackingStoreTest.java
@@ -217,17 +217,10 @@ public class KafkaOffsetBackingStoreTest {
         store.start();
 
         // Getting from empty store should return nulls
-        final AtomicBoolean getInvokedAndPassed = new AtomicBoolean(false);
-        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
-                // Since we didn't read them yet, these will be null
-                assertEquals(null, result.get(TP0_KEY));
-                assertEquals(null, result.get(TP1_KEY));
-                getInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(getInvokedAndPassed.get());
+        Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        // Since we didn't read them yet, these will be null
+        assertNull(offsets.get(TP0_KEY));
+        assertNull(offsets.get(TP1_KEY));
 
         // Set some offsets
         Map<ByteBuffer, ByteBuffer> toSet = new HashMap<>();
@@ -250,28 +243,14 @@ public class KafkaOffsetBackingStoreTest {
         assertTrue(invoked.get());
 
         // Getting data should read to end of our published data and return it
-        final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
-        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
-                assertEquals(TP0_VALUE, result.get(TP0_KEY));
-                assertEquals(TP1_VALUE, result.get(TP1_KEY));
-                secondGetInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(secondGetInvokedAndPassed.get());
+        offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertEquals(TP0_VALUE, offsets.get(TP0_KEY));
+        assertEquals(TP1_VALUE, offsets.get(TP1_KEY));
 
         // Getting data should read to end of our published data and return it
-        final AtomicBoolean thirdGetInvokedAndPassed = new AtomicBoolean(false);
-        store.get(Arrays.asList(TP0_KEY, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
-                assertEquals(TP0_VALUE_NEW, result.get(TP0_KEY));
-                assertEquals(TP1_VALUE_NEW, result.get(TP1_KEY));
-                thirdGetInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(thirdGetInvokedAndPassed.get());
+        offsets = store.get(Arrays.asList(TP0_KEY, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertEquals(TP0_VALUE_NEW, offsets.get(TP0_KEY));
+        assertEquals(TP1_VALUE_NEW, offsets.get(TP1_KEY));
 
         store.stop();
 
@@ -329,16 +308,9 @@ public class KafkaOffsetBackingStoreTest {
         assertTrue(invoked.get());
 
         // Getting data should read to end of our published data and return it
-        final AtomicBoolean secondGetInvokedAndPassed = new AtomicBoolean(false);
-        store.get(Arrays.asList(null, TP1_KEY), new Callback<Map<ByteBuffer, ByteBuffer>>() {
-            @Override
-            public void onCompletion(Throwable error, Map<ByteBuffer, ByteBuffer> result) {
-                assertEquals(TP0_VALUE, result.get(null));
-                assertNull(result.get(TP1_KEY));
-                secondGetInvokedAndPassed.set(true);
-            }
-        }).get(10000, TimeUnit.MILLISECONDS);
-        assertTrue(secondGetInvokedAndPassed.get());
+        Map<ByteBuffer, ByteBuffer> offsets = store.get(Arrays.asList(null, TP1_KEY)).get(10000, TimeUnit.MILLISECONDS);
+        assertEquals(TP0_VALUE, offsets.get(null));
+        assertNull(offsets.get(TP1_KEY));
 
         store.stop();
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java
new file mode 100644
index 0000000..9535003
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/ConvertingFutureCallbackTest.java
@@ -0,0 +1,242 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConvertingFutureCallbackTest {
+
+    private ExecutorService executor;
+
+    @Before
+    public void setup() {
+        executor = Executors.newSingleThreadExecutor();
+    }
+  
+    @Test
+    public void shouldConvertBeforeGetOnSuccessfulCompletion() throws Exception {
+        final Object expectedConversion = new Object();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        testCallback.onCompletion(null, expectedConversion);
+        assertEquals(1, testCallback.numberOfConversions());
+        assertEquals(expectedConversion, testCallback.get());
+    }
+
+    @Test
+    public void shouldConvertOnlyOnceBeforeGetOnSuccessfulCompletion() throws Exception {
+        final Object expectedConversion = new Object();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        testCallback.onCompletion(null, expectedConversion);
+        testCallback.onCompletion(null, 69);
+        testCallback.cancel(true);
+        testCallback.onCompletion(new RuntimeException(), null);
+        assertEquals(1, testCallback.numberOfConversions());
+        assertEquals(expectedConversion, testCallback.get());
+    }
+
+    @Test
+    public void shouldNotConvertBeforeGetOnFailedCompletion() throws Exception {
+        final Throwable expectedError = new Throwable();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        testCallback.onCompletion(expectedError, null);
+        assertEquals(0, testCallback.numberOfConversions());
+        try {
+            testCallback.get();
+            fail("Expected ExecutionException");
+        } catch (ExecutionException e) {
+            assertEquals(expectedError, e.getCause());
+        }
+    }
+
+    @Test
+    public void shouldRecordOnlyFirstErrorBeforeGetOnFailedCompletion() throws Exception {
+        final Throwable expectedError = new Throwable();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        testCallback.onCompletion(expectedError, null);
+        testCallback.onCompletion(new RuntimeException(), null);
+        testCallback.cancel(true);
+        testCallback.onCompletion(null, "420");
+        assertEquals(0, testCallback.numberOfConversions());
+        try {
+            testCallback.get();
+            fail("Expected ExecutionException");
+        } catch (ExecutionException e) {
+            assertEquals(expectedError, e.getCause());
+        }
+    }
+  
+    @Test(expected = CancellationException.class)
+    public void shouldCancelBeforeGetIfMayCancelWhileRunning() throws Exception {
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        assertTrue(testCallback.cancel(true));
+        testCallback.get();
+    }
+
+    @Test
+    public void shouldBlockUntilSuccessfulCompletion() throws Exception {
+        AtomicReference<Exception> testThreadException = new AtomicReference<>();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        final Object expectedConversion = new Object();
+        executor.submit(() -> {
+            try {
+                testCallback.waitForGet();
+                testCallback.onCompletion(null, expectedConversion);
+            } catch (Exception e) {
+                testThreadException.compareAndSet(null, e);
+            }
+        });
+        assertFalse(testCallback.isDone());
+        assertEquals(expectedConversion, testCallback.get());
+        assertEquals(1, testCallback.numberOfConversions());
+        assertTrue(testCallback.isDone());
+        if (testThreadException.get() != null) {
+            throw testThreadException.get();
+        }
+    }
+
+    @Test
+    public void shouldBlockUntilFailedCompletion() throws Exception {
+        AtomicReference<Exception> testThreadException = new AtomicReference<>();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        final Throwable expectedError = new Throwable();
+        executor.submit(() -> {
+            try {
+                testCallback.waitForGet();
+                testCallback.onCompletion(expectedError, null);
+            } catch (Exception e) {
+                testThreadException.compareAndSet(null, e);
+            }
+        });
+        assertFalse(testCallback.isDone());
+        try {
+            testCallback.get();
+            fail("Expected ExecutionException");
+        } catch (ExecutionException e) {
+            assertEquals(expectedError, e.getCause());
+        }
+        assertEquals(0, testCallback.numberOfConversions());
+        assertTrue(testCallback.isDone());
+        if (testThreadException.get() != null) {
+            throw testThreadException.get();
+        }
+    }
+
+    @Test(expected = CancellationException.class)
+    public void shouldBlockUntilCancellation() throws Exception {
+        AtomicReference<Exception> testThreadException = new AtomicReference<>();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        executor.submit(() -> {
+            try {
+                testCallback.waitForGet();
+                testCallback.cancel(true);
+            } catch (Exception e) {
+                testThreadException.compareAndSet(null, e);
+            }
+        });
+        assertFalse(testCallback.isDone());
+        testCallback.get();
+        if (testThreadException.get() != null) {
+            throw testThreadException.get();
+        }
+    }
+
+    @Test
+    public void shouldNotCancelIfMayNotCancelWhileRunning() throws Exception {
+        AtomicReference<Exception> testThreadException = new AtomicReference<>();
+        TestConvertingFutureCallback testCallback = new TestConvertingFutureCallback();
+        final Object expectedConversion = new Object();
+        executor.submit(() -> {
+            try {
+                testCallback.waitForCancel();
+                testCallback.onCompletion(null, expectedConversion);
+            } catch (Exception e) {
+                testThreadException.compareAndSet(null, e);
+            }
+        });
+        assertFalse(testCallback.isCancelled());
+        assertFalse(testCallback.isDone());
+        testCallback.cancel(false);
+        assertFalse(testCallback.isCancelled());
+        assertTrue(testCallback.isDone());
+        assertEquals(expectedConversion, testCallback.get());
+        assertEquals(1, testCallback.numberOfConversions());
+        if (testThreadException.get() != null) {
+            throw testThreadException.get();
+        }
+    }
+  
+    protected static class TestConvertingFutureCallback extends ConvertingFutureCallback<Object, Object> {
+        private AtomicInteger numberOfConversions = new AtomicInteger();
+        private CountDownLatch getInvoked = new CountDownLatch(1);
+        private CountDownLatch cancelInvoked = new CountDownLatch(1);
+    
+        public int numberOfConversions() {
+            return numberOfConversions.get();
+        }
+
+        public void waitForGet() throws InterruptedException {
+            getInvoked.await();
+        }
+
+        public void waitForCancel() throws InterruptedException {
+            cancelInvoked.await();
+        }
+    
+        @Override
+        public Object convert(Object result) {
+            numberOfConversions.incrementAndGet();
+            return result;
+        }
+
+        @Override
+        public Object get() throws InterruptedException, ExecutionException {
+            getInvoked.countDown();
+            return super.get();
+        }
+
+        @Override
+        public Object get(
+            long duration,
+            TimeUnit unit
+        ) throws InterruptedException, ExecutionException, TimeoutException {
+            getInvoked.countDown();
+            return super.get(duration, unit);
+        }
+
+        @Override
+        public boolean cancel(boolean mayInterruptIfRunning) {
+            cancelInvoked.countDown();
+            return super.cancel(mayInterruptIfRunning);
+        }
+    }
+}