You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/08 21:46:34 UTC
kafka git commit: HOTFIX: fix streams issues
Repository: kafka
Updated Branches:
refs/heads/trunk 1a539c74c -> 4ee68b43c
HOTFIX: fix streams issues
* RocksDBStore.putInternal should bypass logging.
* StoreChangeLogger should not call context.recordCollector() when nothing to log
* This is for standby tasks. In standby task, recordCollector() throws an exception. There should be nothing to log anyway.
* fixed ConcurrentModificationException in StreamThread
guozhangwang
Author: Yasuhiro Matsuda <ya...@confluent.io>
Reviewers: Guozhang Wang
Closes #877 from ymatsuda/hotfix2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ee68b43
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ee68b43
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ee68b43
Branch: refs/heads/trunk
Commit: 4ee68b43c180d1f68648c0fb388a66b1ed0023e5
Parents: 1a539c7
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 04:46:11 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 9 04:46:11 2016 +0800
----------------------------------------------------------------------
.../processor/internals/StreamThread.java | 21 ++++++++++++++++----
.../streams/state/internals/RocksDBStore.java | 12 +++++------
.../state/internals/StoreChangeLogger.java | 3 +++
3 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8948fc8..6a8eabc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -104,7 +104,7 @@ public class StreamThread extends Thread {
private long lastCommit;
private long recordsProcessed;
- private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
+ private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
private boolean processStandbyRecords = false;
static File makeStateDir(String jobId, String baseDirName) {
@@ -355,18 +355,22 @@ public class StreamThread extends Thread {
if (!standbyTasks.isEmpty()) {
if (processStandbyRecords) {
if (!standbyRecords.isEmpty()) {
+ Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>();
+
for (TopicPartition partition : standbyRecords.keySet()) {
- StandbyTask task = standbyTasksByPartition.get(partition);
- List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition);
+ List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.get(partition);
if (remaining != null) {
+ StandbyTask task = standbyTasksByPartition.get(partition);
remaining = task.update(partition, remaining);
if (remaining != null) {
- standbyRecords.put(partition, remaining);
+ remainingStandbyRecords.put(partition, remaining);
} else {
restoreConsumer.resume(partition);
}
}
}
+
+ standbyRecords = remainingStandbyRecords;
}
processStandbyRecords = false;
}
@@ -376,6 +380,12 @@ public class StreamThread extends Thread {
if (!records.isEmpty()) {
for (TopicPartition partition : records.partitions()) {
StandbyTask task = standbyTasksByPartition.get(partition);
+
+ if (task == null) {
+ log.error("missing standby task for partition {}", partition);
+ throw new StreamsException("missing standby task for partition " + partition);
+ }
+
List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
if (remaining != null) {
restoreConsumer.pause(partition);
@@ -642,6 +652,9 @@ public class StreamThread extends Thread {
}
// collect checked pointed offsets to position the restore consumer
// this include all partitions from which we restore states
+ for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
+ standbyTasksByPartition.put(partition, task);
+ }
checkpointedOffsets.putAll(task.checkpointedOffsets());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 556e7cd..5c57854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -201,7 +201,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
@Override
public boolean persistent() {
- return false;
+ return true;
}
@Override
@@ -241,6 +241,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
byte[] rawKey = serdes.rawKey(key);
byte[] rawValue = serdes.rawValue(value);
putInternal(rawKey, rawValue);
+
+ if (loggingEnabled) {
+ changeLogger.add(rawKey);
+ changeLogger.maybeLogChange(this.getter);
+ }
}
}
@@ -260,11 +265,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
" and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e);
}
}
-
- if (loggingEnabled) {
- changeLogger.add(rawKey);
- changeLogger.maybeLogChange(this.getter);
- }
}
@Override
http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 3bbd522..aac4d85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -86,6 +86,9 @@ public class StoreChangeLogger<K, V> {
}
public void logChange(ValueGetter<K, V> getter) {
+ if (this.removed.isEmpty() && this.dirty.isEmpty())
+ return;
+
RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
if (collector != null) {
Serializer<K> keySerializer = serialization.keySerializer();