You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2020/10/30 23:42:31 UTC
[kafka] 01/02: KAFKA-10664: Delete existing checkpoint when writing
empty offsets (#9534)
This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit db947b749e518aaeb6b019f649311433ec01222e
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Oct 30 13:28:31 2020 -0700
KAFKA-10664: Delete existing checkpoint when writing empty offsets (#9534)
Delete the existing checkpoint file if told to write empty offsets map to ensure that corrupted offsets are not re-initialized from
Reviewers: Bruno Cadonna <br...@confluent.io>, Guozhang Wang <gu...@apache.org>
---
.../streams/state/internals/OffsetCheckpoint.java | 4 +++-
.../state/internals/OffsetCheckpointTest.java | 21 ++++++++++++++++++++-
2 files changed, 23 insertions(+), 2 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 59afbb3..3ec2386 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -78,8 +78,10 @@ public class OffsetCheckpoint {
* @throws IOException if any file operation fails with an IO exception
*/
public void write(final Map<TopicPartition, Long> offsets) throws IOException {
- // if there is no offsets, skip writing the file to save disk IOs
+ // if there are no offsets, skip writing the file to save disk IOs
+ // but make sure to delete the existing file if one exists
if (offsets.isEmpty()) {
+ Utils.delete(file);
return;
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index dafc21a..9970a1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -34,6 +34,7 @@ import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEnt
import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -75,7 +76,7 @@ public class OffsetCheckpointTest {
final File f = new File(TestUtils.tempDirectory().getAbsolutePath(), "kafka.tmp");
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
- checkpoint.write(Collections.<TopicPartition, Long>emptyMap());
+ checkpoint.write(Collections.emptyMap());
assertFalse(f.exists());
@@ -86,6 +87,24 @@ public class OffsetCheckpointTest {
}
@Test
+ public void shouldDeleteExistingCheckpointWhenNoOffsets() throws IOException {
+ final File file = TestUtils.tempFile();
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
+
+ final Map<TopicPartition, Long> offsets = Collections.singletonMap(new TopicPartition(topic, 0), 1L);
+
+ checkpoint.write(offsets);
+
+ assertThat(file.exists(), is(true));
+ assertThat(offsets, is(checkpoint.read()));
+
+ checkpoint.write(Collections.emptyMap());
+
+ assertThat(file.exists(), is(false));
+ assertThat(Collections.<TopicPartition, Long>emptyMap(), is(checkpoint.read()));
+ }
+
+ @Test
public void shouldSkipInvalidOffsetsDuringRead() throws IOException {
final File file = TestUtils.tempFile();
final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);