You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/09/06 18:14:05 UTC
kafka git commit: KAFKA-5756;
Connect WorkerSourceTask synchronization issue on flush
Repository: kafka
Updated Branches:
refs/heads/trunk 45394d52c -> 510257646
KAFKA-5756; Connect WorkerSourceTask synchronization issue on flush
Author: oleg <ol...@nexla.com>
Reviewers: Randall Hauch <rh...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #3702 from oleg-smith/KAFKA-5756
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/51025764
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/51025764
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/51025764
Branch: refs/heads/trunk
Commit: 51025764601760684f3dbdc0171fd7aa2ebe70e7
Parents: 45394d5
Author: oleg <ol...@nexla.com>
Authored: Wed Sep 6 11:12:23 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Sep 6 11:12:23 2017 -0700
----------------------------------------------------------------------
.../kafka/connect/runtime/WorkerSourceTask.java | 2 +-
.../connect/storage/OffsetStorageWriter.java | 65 +++++++++++---------
2 files changed, 36 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/51025764/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 0f42186..6a17b71 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -159,7 +159,7 @@ class WorkerSourceTask extends WorkerTask {
}
if (toSend == null) {
- log.debug("{} Nothing to send to Kafka. Polling source for additional records", this);
+ log.trace("{} Nothing to send to Kafka. Polling source for additional records", this);
toSend = task.poll();
}
if (toSend == null)
http://git-wip-us.apache.org/repos/asf/kafka/blob/51025764/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
index c5d1467..3239b67 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetStorageWriter.java
@@ -59,7 +59,7 @@ import java.util.concurrent.Future;
* time.
* </p>
* <p>
- * This class is not thread-safe. It should only be accessed from a Task's processing thread.
+ * This class is thread-safe.
* </p>
*/
public class OffsetStorageWriter {
@@ -72,7 +72,6 @@ public class OffsetStorageWriter {
// Offset data in Connect format
private Map<Map<String, Object>, Map<String, Object>> data = new HashMap<>();
- // Not synchronized, should only be accessed by flush thread
private Map<Map<String, Object>, Map<String, Object>> toFlush = null;
// Unique ID for each flush request to handle callbacks after timeouts
private long currentFlushId = 0;
@@ -129,44 +128,50 @@ public class OffsetStorageWriter {
* @return a Future, or null if there are no offsets to commitOffsets
*/
public Future<Void> doFlush(final Callback<Void> callback) {
- final long flushId = currentFlushId;
+ final long flushId;
// Serialize
- Map<ByteBuffer, ByteBuffer> offsetsSerialized;
- try {
- offsetsSerialized = new HashMap<>();
- for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
- // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
- // for that data. The only enforcement of the format is here.
- OffsetUtils.validateFormat(entry.getKey());
- OffsetUtils.validateFormat(entry.getValue());
- // When serializing the key, we add in the namespace information so the key is [namespace, real key]
- byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey()));
- ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
- byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue());
- ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
- offsetsSerialized.put(keyBuffer, valueBuffer);
+ final Map<ByteBuffer, ByteBuffer> offsetsSerialized;
+
+ synchronized (this) {
+ flushId = currentFlushId;
+
+ try {
+ offsetsSerialized = new HashMap<>(toFlush.size());
+ for (Map.Entry<Map<String, Object>, Map<String, Object>> entry : toFlush.entrySet()) {
+ // Offsets are specified as schemaless to the converter, using whatever internal schema is appropriate
+ // for that data. The only enforcement of the format is here.
+ OffsetUtils.validateFormat(entry.getKey());
+ OffsetUtils.validateFormat(entry.getValue());
+ // When serializing the key, we add in the namespace information so the key is [namespace, real key]
+ byte[] key = keyConverter.fromConnectData(namespace, null, Arrays.asList(namespace, entry.getKey()));
+ ByteBuffer keyBuffer = (key != null) ? ByteBuffer.wrap(key) : null;
+ byte[] value = valueConverter.fromConnectData(namespace, null, entry.getValue());
+ ByteBuffer valueBuffer = (value != null) ? ByteBuffer.wrap(value) : null;
+ offsetsSerialized.put(keyBuffer, valueBuffer);
+ }
+ } catch (Throwable t) {
+ // Must handle errors properly here or the writer will be left mid-flush forever and be
+ // unable to make progress.
+ log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
+ + "offsets under namespace {}. This likely won't recover unless the "
+ + "unserializable partition or offset information is overwritten.", namespace);
+ log.error("Cause of serialization failure:", t);
+ callback.onCompletion(t, null);
+ return null;
}
- } catch (Throwable t) {
- // Must handle errors properly here or the writer will be left mid-flush forever and be
- // unable to make progress.
- log.error("CRITICAL: Failed to serialize offset data, making it impossible to commit "
- + "offsets under namespace {}. This likely won't recover unless the "
- + "unserializable partition or offset information is overwritten.", namespace);
- log.error("Cause of serialization failure:", t);
- callback.onCompletion(t, null);
- return null;
+
+ // And submit the data
+ log.debug("Submitting {} entries to backing store. The offsets are: {}", offsetsSerialized.size(), toFlush);
}
- // And submit the data
- log.debug("Submitting {} entries to backing store", offsetsSerialized.size());
- log.debug("The offsets are: " + toFlush.toString());
return backingStore.set(offsetsSerialized, new Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
boolean isCurrent = handleFinishWrite(flushId, error, result);
- if (isCurrent && callback != null)
+ if (isCurrent && callback != null) {
callback.onCompletion(error, result);
+ }
}
});
}