You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/15 16:31:14 UTC
[kafka] branch trunk updated: MINOR: enforce non-negative invariant
for checkpointed offsets (#8297)
This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 045c6c3 MINOR: enforce non-negative invariant for checkpointed offsets (#8297)
045c6c3 is described below
commit 045c6c3c48244c43cf5db2506827c20b01c353c8
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sun Mar 15 09:30:40 2020 -0700
MINOR: enforce non-negative invariant for checkpointed offsets (#8297)
While discussing KIP-441 we realize we don't strictly enforce that all checkpointed offset sums are positive (or 0, though there's not much point to checkingpoint a 0 offset is there)?
Rather than awkwardly try handle this within every user/reader of the checkpoint file, we should just make a guarantee that all returned checkpointed offsets are positive.
Reviewers: Guozhang Wang <wa...@gmail.com>
---
.../processor/internals/RecordCollectorImpl.java | 6 ++-
.../streams/state/internals/OffsetCheckpoint.java | 31 ++++++++---
.../state/internals/OffsetCheckpointTest.java | 60 ++++++++++++++++++++++
3 files changed, 89 insertions(+), 8 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 4fbcdb9..8e69e4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -184,7 +184,11 @@ public class RecordCollectorImpl implements RecordCollector {
if (exception == null) {
final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
- offsets.put(tp, metadata.offset());
+ if (metadata.offset() >= 0L) {
+ offsets.put(tp, metadata.offset());
+ } else {
+ log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
+ }
} else {
recordSendError(topic, exception, serializedRecord);
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 93a0561..003682e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -67,6 +67,8 @@ public class OffsetCheckpoint {
}
/**
+ * Write the given offsets to the checkpoint file. All offsets should be non-negative.
+ *
* @throws IOException if any file operation fails with an IO exception
*/
public void write(final Map<TopicPartition, Long> offsets) throws IOException {
@@ -87,7 +89,14 @@ public class OffsetCheckpoint {
writeIntLine(writer, offsets.size());
for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
- writeEntry(writer, entry.getKey(), entry.getValue());
+ final TopicPartition tp = entry.getKey();
+ final Long offset = entry.getValue();
+ if (offset >= 0L) {
+ writeEntry(writer, tp, offset);
+ } else {
+ LOG.error("Received offset={} to write to checkpoint file for {}", offset, tp);
+ throw new IllegalStateException("Attempted to write a negative offset to the checkpoint file");
+ }
}
writer.flush();
@@ -102,8 +111,8 @@ public class OffsetCheckpoint {
/**
* @throws IOException if file write operations failed with any IO exception
*/
- private void writeIntLine(final BufferedWriter writer,
- final int number) throws IOException {
+ static void writeIntLine(final BufferedWriter writer,
+ final int number) throws IOException {
writer.write(Integer.toString(number));
writer.newLine();
}
@@ -111,9 +120,9 @@ public class OffsetCheckpoint {
/**
* @throws IOException if file write operations failed with any IO exception
*/
- private void writeEntry(final BufferedWriter writer,
- final TopicPartition part,
- final long offset) throws IOException {
+ static void writeEntry(final BufferedWriter writer,
+ final TopicPartition part,
+ final long offset) throws IOException {
writer.write(part.topic());
writer.write(' ');
writer.write(Integer.toString(part.partition()));
@@ -124,6 +133,8 @@ public class OffsetCheckpoint {
/**
+ * Reads the offsets from the local checkpoint file, skipping any negative offsets it finds.
+ *
* @throws IOException if any file operation fails with an IO exception
* @throws IllegalArgumentException if the offset checkpoint version is unknown
*/
@@ -145,8 +156,14 @@ public class OffsetCheckpoint {
final String topic = pieces[0];
final int partition = Integer.parseInt(pieces[1]);
+ final TopicPartition tp = new TopicPartition(topic, partition);
final long offset = Long.parseLong(pieces[2]);
- offsets.put(new TopicPartition(topic, partition), offset);
+ if (offset >= 0L) {
+ offsets.put(tp, offset);
+ } else {
+ LOG.warn("Read offset={} from checkpoint file for {}", offset, tp);
+ }
+
line = reader.readLine();
}
if (offsets.size() != expectedSize) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index cb2e549..cc80d08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -16,8 +16,12 @@
*/
package org.apache.kafka.streams.state.internals;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -26,8 +30,11 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
+import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEntry;
+import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
public class OffsetCheckpointTest {
@@ -73,4 +80,57 @@ public class OffsetCheckpointTest {
// deleting a non-exist checkpoint file should be fine
checkpoint.delete();
}
+
+ @Test
+ public void shouldSkipNegativeOffsetsDuringRead() throws IOException {
+ final File file = TestUtils.tempFile();
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
+
+ try {
+ final Map<TopicPartition, Long> offsets = new HashMap<>();
+ offsets.put(new TopicPartition(topic, 0), -1L);
+
+ writeVersion0(offsets, file);
+ } finally {
+ checkpoint.delete();
+ }
+ }
+
+ @Test
+ public void shouldThrowOnNegativeOffsetInWrite() throws IOException {
+ final File f = TestUtils.tempFile();
+ final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+ try {
+ final Map<TopicPartition, Long> offsets = new HashMap<>();
+ offsets.put(new TopicPartition(topic, 0), 0L);
+ offsets.put(new TopicPartition(topic, 1), -1L);
+ offsets.put(new TopicPartition(topic, 2), 2L);
+
+ assertThrows(IllegalStateException.class, () -> checkpoint.write(offsets));
+ } finally {
+ checkpoint.delete();
+ }
+ }
+
+ /**
+ * Write all the offsets following the version 0 format without any verification (eg enforcing offsets >= 0)
+ */
+ static void writeVersion0(final Map<TopicPartition, Long> offsets, final File file) throws IOException {
+ final FileOutputStream fileOutputStream = new FileOutputStream(file);
+ try (final BufferedWriter writer = new BufferedWriter(
+ new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+ writeIntLine(writer, 0);
+ writeIntLine(writer, offsets.size());
+
+ for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+ final TopicPartition tp = entry.getKey();
+ final Long offset = entry.getValue();
+ writeEntry(writer, tp, offset);
+ }
+
+ writer.flush();
+ fileOutputStream.getFD().sync();
+ }
+ }
}