You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/10/30 23:42:31 UTC

[kafka] 01/02: KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit db947b749e518aaeb6b019f649311433ec01222e
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Oct 30 13:28:31 2020 -0700

    KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)
    
    Delete the existing checkpoint file if told to write empty offsets map to ensure that corrupted offsets are not re-initialized from
    
    Reviewers: Bruno Cadonna <br...@confluent.io>, Guozhang Wang <gu...@apache.org>
---
 .../streams/state/internals/OffsetCheckpoint.java   |  4 +++-
 .../state/internals/OffsetCheckpointTest.java       | 21 ++++++++++++++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 59afbb3..3ec2386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -78,8 +78,10 @@ public class OffsetCheckpoint {
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
-        // if there is no offsets, skip writing the file to save disk IOs
+        // if there are no offsets, skip writing the file to save disk IOs
+        // but make sure to delete the existing file if one exists
         if (offsets.isEmpty()) {
+            Utils.delete(file);
             return;
         }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index dafc21a..9970a1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -34,6 +34,7 @@ import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEnt
 import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -75,7 +76,7 @@ public class OffsetCheckpointTest {
         final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp");
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
 
-        checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+        checkpoint.write(Collections.emptyMap());
 
         assertFalse(f.exists());
 
@@ -86,6 +87,24 @@ public class OffsetCheckpointTest {
     }
 
     @Test
+    public void shouldDeleteExistingCheckpointWhenNoOffsets() throws IOException {
+        final File file = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
+
+        final Map<TopicPartition, Long> offsets = Collections.singletonMap(new TopicPartition(topic, 0), 1L);
+
+        checkpoint.write(offsets);
+
+        assertThat(file.exists(), is(true));
+        assertThat(offsets, is(checkpoint.read()));
+
+        checkpoint.write(Collections.emptyMap());
+
+        assertThat(file.exists(), is(false));
+        assertThat(Collections.<TopicPartition, Long>emptyMap(), is(checkpoint.read()));
+    }
+
+    @Test
     public void shouldSkipInvalidOffsetsDuringRead() throws IOException {
         final File file = TestUtils.tempFile();
         final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);