You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/16 20:59:22 UTC

kafka git commit: KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state

Repository: kafka
Updated Branches:
  refs/heads/trunk 4e3092d27 -> 73703a15c


KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state

Ensure checkpointable offsets for GlobalKTables are always written on close.

Author: Tommy Becker <to...@tivo.com>

Reviewers: Damian Guy, Eno Thereska, Guozhang Wang

Closes #3054 from twbecker/KAFKA-5241


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73703a15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73703a15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73703a15

Branch: refs/heads/trunk
Commit: 73703a15c5006ddca166a458dcde72e17c91de4a
Parents: 4e3092d
Author: Tommy Becker <to...@tivo.com>
Authored: Tue May 16 13:59:19 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 16 13:59:19 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/GlobalStateManagerImpl.java   |  4 ++--
 .../internals/GlobalStateManagerImplTest.java         | 14 +++++++++++++-
 2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73703a15/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 275dca5..6bd699f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -229,8 +229,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
-        if (!offsets.isEmpty()) {
-            checkpointableOffsets.putAll(offsets);
+        checkpointableOffsets.putAll(offsets);
+        if (!checkpointableOffsets.isEmpty()) {
             try {
                 checkpoint.write(checkpointableOffsets);
             } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/73703a15/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 54062fc..98ef8f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -426,6 +426,18 @@ public class GlobalStateManagerImplTest {
         assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value))));
     }
 
+    @Test
+    public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
+        stateManager.initialize(context);
+        final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+        initializeConsumer(10, 1, t1);
+        stateManager.register(store1, false, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+        final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();
+        assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L)));
+        assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
+    }
 
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
@@ -487,4 +499,4 @@ public class GlobalStateManagerImplTest {
             restored.add(KeyValue.pair(key, value));
         }
     }
-}
\ No newline at end of file
+}