You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/11/10 06:44:09 UTC

kafka git commit: HOTFIX: GlobalStateManagerImpl in trunk has renamed the consumer field

Repository: kafka
Updated Branches:
  refs/heads/trunk 12af521c4 -> 84ddff679


HOTFIX: GlobalStateManagerImpl in trunk has renamed the consumer field


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84ddff67
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84ddff67
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84ddff67

Branch: refs/heads/trunk
Commit: 84ddff6792a2082a1940664d42f5b03edc617c31
Parents: 12af521
Author: Guozhang Wang <wa...@gmail.com>
Authored: Thu Nov 9 22:43:59 2017 -0800
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Thu Nov 9 22:43:59 2017 -0800

----------------------------------------------------------------------
 .../kafka/streams/processor/internals/GlobalStateManagerImpl.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/84ddff67/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index f8e79e0..07276ba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -192,7 +192,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
                     if (record.key() != null) {
                         restoreRecords.add(KeyValue.pair(record.key(), record.value()));
                     }
-                    offset = consumer.position(topicPartition);
+                    offset = globalConsumer.position(topicPartition);
                 }
                 stateRestoreAdapter.restoreAll(restoreRecords);
                 stateRestoreListener.onBatchRestored(topicPartition, storeName, offset, restoreRecords.size());