You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2017/05/16 20:59:22 UTC
kafka git commit: KAFKA-5241: GlobalKTable should checkpoint offsets
after restoring state
Repository: kafka
Updated Branches:
refs/heads/trunk 4e3092d27 -> 73703a15c
KAFKA-5241: GlobalKTable should checkpoint offsets after restoring state
Ensure checkpointable offsets for GlobalKTables are always written on close.
Author: Tommy Becker <to...@tivo.com>
Reviewers: Damian Guy, Eno Thereska, Guozhang Wang
Closes #3054 from twbecker/KAFKA-5241
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73703a15
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73703a15
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73703a15
Branch: refs/heads/trunk
Commit: 73703a15c5006ddca166a458dcde72e17c91de4a
Parents: 4e3092d
Author: Tommy Becker <to...@tivo.com>
Authored: Tue May 16 13:59:19 2017 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue May 16 13:59:19 2017 -0700
----------------------------------------------------------------------
.../processor/internals/GlobalStateManagerImpl.java | 4 ++--
.../internals/GlobalStateManagerImplTest.java | 14 +++++++++++++-
2 files changed, 15 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/73703a15/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 275dca5..6bd699f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -229,8 +229,8 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
@Override
public void checkpoint(final Map<TopicPartition, Long> offsets) {
- if (!offsets.isEmpty()) {
- checkpointableOffsets.putAll(offsets);
+ checkpointableOffsets.putAll(offsets);
+ if (!checkpointableOffsets.isEmpty()) {
try {
checkpoint.write(checkpointableOffsets);
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/73703a15/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 54062fc..98ef8f6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -426,6 +426,18 @@ public class GlobalStateManagerImplTest {
assertThat(stateRestoreCallback.restored, equalTo(Collections.singletonList(KeyValue.pair(restoredKv.key, restoredKv.value))));
}
+ @Test
+ public void shouldCheckpointRestoredOffsetsToFile() throws IOException {
+ stateManager.initialize(context);
+ final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();
+ initializeConsumer(10, 1, t1);
+ stateManager.register(store1, false, stateRestoreCallback);
+ stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+ final Map<TopicPartition, Long> checkpointMap = stateManager.checkpointed();
+ assertThat(checkpointMap, equalTo(Collections.singletonMap(t1, 11L)));
+ assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
+ }
private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
@@ -487,4 +499,4 @@ public class GlobalStateManagerImplTest {
restored.add(KeyValue.pair(key, value));
}
}
-}
\ No newline at end of file
+}