You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/02/08 21:46:34 UTC

kafka git commit: HOTFIX: fix streams issues

Repository: kafka
Updated Branches:
  refs/heads/trunk 1a539c74c -> 4ee68b43c


HOTFIX: fix streams issues

* RocksDBStore.putInternal should bypass logging.
* StoreChangeLogger should not call context.recordCollector() when nothing to log
  * This is for standby tasks. In standby task, recordCollector() throws an exception. There should be nothing to log anyway.
* fixed ConcurrentModificationException in StreamThread

guozhangwang

Author: Yasuhiro Matsuda <ya...@confluent.io>

Reviewers: Guozhang Wang

Closes #877 from ymatsuda/hotfix2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4ee68b43
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4ee68b43
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4ee68b43

Branch: refs/heads/trunk
Commit: 4ee68b43c180d1f68648c0fb388a66b1ed0023e5
Parents: 1a539c7
Author: Yasuhiro Matsuda <ya...@confluent.io>
Authored: Tue Feb 9 04:46:11 2016 +0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Feb 9 04:46:11 2016 +0800

----------------------------------------------------------------------
 .../processor/internals/StreamThread.java       | 21 ++++++++++++++++----
 .../streams/state/internals/RocksDBStore.java   | 12 +++++------
 .../state/internals/StoreChangeLogger.java      |  3 +++
 3 files changed, 26 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 8948fc8..6a8eabc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -104,7 +104,7 @@ public class StreamThread extends Thread {
     private long lastCommit;
     private long recordsProcessed;
 
-    private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
+    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords;
     private boolean processStandbyRecords = false;
 
     static File makeStateDir(String jobId, String baseDirName) {
@@ -355,18 +355,22 @@ public class StreamThread extends Thread {
         if (!standbyTasks.isEmpty()) {
             if (processStandbyRecords) {
                 if (!standbyRecords.isEmpty()) {
+                    Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> remainingStandbyRecords = new HashMap<>();
+
                     for (TopicPartition partition : standbyRecords.keySet()) {
-                        StandbyTask task = standbyTasksByPartition.get(partition);
-                        List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.remove(partition);
+                        List<ConsumerRecord<byte[], byte[]>> remaining = standbyRecords.get(partition);
                         if (remaining != null) {
+                            StandbyTask task = standbyTasksByPartition.get(partition);
                             remaining = task.update(partition, remaining);
                             if (remaining != null) {
-                                standbyRecords.put(partition, remaining);
+                                remainingStandbyRecords.put(partition, remaining);
                             } else {
                                 restoreConsumer.resume(partition);
                             }
                         }
                     }
+
+                    standbyRecords = remainingStandbyRecords;
                 }
                 processStandbyRecords = false;
             }
@@ -376,6 +380,12 @@ public class StreamThread extends Thread {
             if (!records.isEmpty()) {
                 for (TopicPartition partition : records.partitions()) {
                     StandbyTask task = standbyTasksByPartition.get(partition);
+
+                    if (task == null) {
+                        log.error("missing standby task for partition {}", partition);
+                        throw new StreamsException("missing standby task for partition " + partition);
+                    }
+
                     List<ConsumerRecord<byte[], byte[]>> remaining = task.update(partition, records.records(partition));
                     if (remaining != null) {
                         restoreConsumer.pause(partition);
@@ -642,6 +652,9 @@ public class StreamThread extends Thread {
                 }
                 // collect checked pointed offsets to position the restore consumer
                 // this include all partitions from which we restore states
+                for (TopicPartition partition : task.checkpointedOffsets().keySet()) {
+                    standbyTasksByPartition.put(partition, task);
+                }
                 checkpointedOffsets.putAll(task.checkpointedOffsets());
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
index 556e7cd..5c57854 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
@@ -201,7 +201,7 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
 
     @Override
     public boolean persistent() {
-        return false;
+        return true;
     }
 
     @Override
@@ -241,6 +241,11 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
             byte[] rawKey = serdes.rawKey(key);
             byte[] rawValue = serdes.rawValue(value);
             putInternal(rawKey, rawValue);
+
+            if (loggingEnabled) {
+                changeLogger.add(rawKey);
+                changeLogger.maybeLogChange(this.getter);
+            }
         }
     }
 
@@ -260,11 +265,6 @@ public class RocksDBStore<K, V> implements KeyValueStore<K, V> {
                         " and value " + serdes.keyFrom(rawValue) + " from store " + this.name, e);
             }
         }
-
-        if (loggingEnabled) {
-            changeLogger.add(rawKey);
-            changeLogger.maybeLogChange(this.getter);
-        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/4ee68b43/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
index 3bbd522..aac4d85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/StoreChangeLogger.java
@@ -86,6 +86,9 @@ public class StoreChangeLogger<K, V> {
     }
 
     public void logChange(ValueGetter<K, V> getter) {
+        if (this.removed.isEmpty() && this.dirty.isEmpty())
+            return;
+
         RecordCollector collector = ((RecordCollector.Supplier) context).recordCollector();
         if (collector != null) {
             Serializer<K> keySerializer = serialization.keySerializer();