You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/06 18:14:05 UTC

kafka git commit: KAFKA-5756; Connect WorkerSourceTask synchronization issue on flush

Repository: kafka
Updated Branches:
  refs/heads/trunk 45394d52c -> 510257646


KAFKA-5756; Connect WorkerSourceTask synchronization issue on flush

Author: oleg <ol...@nexla.com>

Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3702 from oleg-smith/KAFKA-5756


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51025764
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51025764
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51025764

Branch: refs/heads/trunk
Commit: 51025764601760684f3dbdc0171fd7aa2ebe70e7
Parents: 45394d5
Author: oleg <ol...@nexla.com>
Authored: Wed Sep 6 11:12:23 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Sep 6 11:12:23 2017 -0700

----------------------------------------------------------------------
 .../kafka/connect/runtime/WorkerSourceTask.java |  2 +-
 .../connect/storage/OffsetStorageWriter.java    | 65 +++++++++++---------
 2 files changed, 36 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/51025764/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0f42186..6a17b71 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -159,7 +159,7 @@ class WorkerSourceTask extends WorkerTask {
                 }
 
                 if (toSend == null) {
-                    log.debug("{} Nothing to send to Kafka. Polling source for additional records", this);
+                    log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
                     toSend = task.poll();
                 }
                 if (toSend == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/51025764/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index c5d1467..3239b67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Future;
  * time.
  * </p>
  * <p>
- * This class is not thread-safe. It should only be accessed from a Task's processing thread.
+ * This class is thread-safe.
  * </p>
  */
 public class OffsetStorageWriter {
@@ -72,7 +72,6 @@ public class OffsetStorageWriter {
     // Offset data in Connect format
     private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
 
-    // Not synchronized, should only be accessed by flush thread
     private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
     // Unique ID for each flush request to handle callbacks after timeouts
     private long currentFlushId = 0;
@@ -129,44 +128,50 @@ public class OffsetStorageWriter {
      * @return a Future, or null if there are no offsets to commitOffsets
      */
     public Future<Void> doFlush(final Callback<Void> callback) {
-        final long flushId = currentFlushId;
 
+        final long flushId;
         // Serialize
-        Map<ByteBuffer, ByteBuffer> offsetsSerialized;
-        try {
-            offsetsSerialized = new HashMap<>();
-            for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
-                // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
-                // for that data. The only enforcement of the format is here.
-                OffsetUtils.validateFormat(entry.getKey());
-                OffsetUtils.validateFormat(entry.getValue());
-                // When serializing the key, we add in the namespace information so the key is [namespace, real key]
-                byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey()));
-                ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
-                byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue());
-                ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
-                offsetsSerialized.put(keyBuffer, valueBuffer);
+        final Map<ByteBuffer, ByteBuffer> offsetsSerialized;
+
+        synchronized (this) {
+            flushId = currentFlushId;
+
+            try {
+                offsetsSerialized = new HashMap<>(toFlush.size());
+                for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
+                    // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
+                    // for that data. The only enforcement of the format is here.
+                    OffsetUtils.validateFormat(entry.getKey());
+                    OffsetUtils.validateFormat(entry.getValue());
+                    // When serializing the key, we add in the namespace information so the key is [namespace, real key]
+                    byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey()));
+                    ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
+                    byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue());
+                    ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
+                    offsetsSerialized.put(keyBuffer, valueBuffer);
+                }
+            } catch (Throwable t) {
+                // Must handle errors properly here or the writer will be left mid-flush forever and be
+                // unable to make progress.
+                log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
+                        + "offsets under namespace {}. This likely won't recover unless the "
+                        + "unserializable partition or offset information is overwritten.", namespace);
+                log.error("Cause of serialization failure:", t);
+                callback.onCompletion(t, null);
+                return null;
             }
-        } catch (Throwable t) {
-            // Must handle errors properly here or the writer will be left mid-flush forever and be
-            // unable to make progress.
-            log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
-                    + "offsets under namespace {}. This likely won't recover unless the "
-                    + "unserializable partition or offset information is overwritten.", namespace);
-            log.error("Cause of serialization failure:", t);
-            callback.onCompletion(t, null);
-            return null;
+
+            // And submit the data
+            log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), toFlush);
         }
 
-        // And submit the data
-        log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
-        log.debug("The offsets are: " + toFlush.toString());
         return backingStore.set(offsetsSerialized, new Callback<Void>() {
             @Override
             public void onCompletion(Throwable error, Void result) {
                 boolean isCurrent = handleFinishWrite(flushId, error, result);
-                if (isCurrent && callback != null)
+                if (isCurrent && callback != null) {
                     callback.onCompletion(error, result);
+                }
             }
         });
     }