You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2020/03/15 16:31:14 UTC

[kafka] branch trunk updated: MINOR: enforce non-negative invariant for checkpointed offsets (#8297)

This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 045c6c3  MINOR: enforce non-negative invariant for checkpointed offsets (#8297)
045c6c3 is described below

commit 045c6c3c48244c43cf5db2506827c20b01c353c8
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Sun Mar 15 09:30:40 2020 -0700

    MINOR: enforce non-negative invariant for checkpointed offsets (#8297)
    
    While discussing KIP-441 we realize we don't strictly enforce that all checkpointed offset sums are positive (or 0, though there's not much point to checkingpoint a 0 offset is there)?
    
    Rather than awkwardly try handle this within every user/reader of the checkpoint file, we should just make a guarantee that all returned checkpointed offsets are positive.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../processor/internals/RecordCollectorImpl.java   |  6 ++-
 .../streams/state/internals/OffsetCheckpoint.java  | 31 ++++++++---
 .../state/internals/OffsetCheckpointTest.java      | 60 ++++++++++++++++++++++
 3 files changed, 89 insertions(+), 8 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 4fbcdb9..8e69e4d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -184,7 +184,11 @@ public class RecordCollectorImpl implements RecordCollector {
 
             if (exception == null) {
                 final TopicPartition tp = new TopicPartition(metadata.topic(), metadata.partition());
-                offsets.put(tp, metadata.offset());
+                if (metadata.offset() >= 0L) {
+                    offsets.put(tp, metadata.offset());
+                } else {
+                    log.warn("Received offset={} in produce response for {}", metadata.offset(), tp);
+                }
             } else {
                 recordSendError(topic, exception, serializedRecord);
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
index 93a0561..003682e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/OffsetCheckpoint.java
@@ -67,6 +67,8 @@ public class OffsetCheckpoint {
     }
 
     /**
+     * Write the given offsets to the checkpoint file. All offsets should be non-negative.
+     *
      * @throws IOException if any file operation fails with an IO exception
      */
     public void write(final Map<TopicPartition, Long> offsets) throws IOException {
@@ -87,7 +89,14 @@ public class OffsetCheckpoint {
                 writeIntLine(writer, offsets.size());
 
                 for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
-                    writeEntry(writer, entry.getKey(), entry.getValue());
+                    final TopicPartition tp = entry.getKey();
+                    final Long offset = entry.getValue();
+                    if (offset >= 0L) {
+                        writeEntry(writer, tp, offset);
+                    } else {
+                        LOG.error("Received offset={} to write to checkpoint file for {}", offset, tp);
+                        throw new IllegalStateException("Attempted to write a negative offset to the checkpoint file");
+                    }
                 }
 
                 writer.flush();
@@ -102,8 +111,8 @@ public class OffsetCheckpoint {
     /**
      * @throws IOException if file write operations failed with any IO exception
      */
-    private void writeIntLine(final BufferedWriter writer,
-                              final int number) throws IOException {
+    static void writeIntLine(final BufferedWriter writer,
+                             final int number) throws IOException {
         writer.write(Integer.toString(number));
         writer.newLine();
     }
@@ -111,9 +120,9 @@ public class OffsetCheckpoint {
     /**
      * @throws IOException if file write operations failed with any IO exception
      */
-    private void writeEntry(final BufferedWriter writer,
-                            final TopicPartition part,
-                            final long offset) throws IOException {
+    static void writeEntry(final BufferedWriter writer,
+                           final TopicPartition part,
+                           final long offset) throws IOException {
         writer.write(part.topic());
         writer.write(' ');
         writer.write(Integer.toString(part.partition()));
@@ -124,6 +133,8 @@ public class OffsetCheckpoint {
 
 
     /**
+     * Reads the offsets from the local checkpoint file, skipping any negative offsets it finds.
+     *
      * @throws IOException if any file operation fails with an IO exception
      * @throws IllegalArgumentException if the offset checkpoint version is unknown
      */
@@ -145,8 +156,14 @@ public class OffsetCheckpoint {
 
                             final String topic = pieces[0];
                             final int partition = Integer.parseInt(pieces[1]);
+                            final TopicPartition tp = new TopicPartition(topic, partition);
                             final long offset = Long.parseLong(pieces[2]);
-                            offsets.put(new TopicPartition(topic, partition), offset);
+                            if (offset >= 0L) {
+                                offsets.put(tp, offset);
+                            } else {
+                                LOG.warn("Read offset={} from checkpoint file for {}", offset, tp);
+                            }
+
                             line = reader.readLine();
                         }
                         if (offsets.size() != expectedSize) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
index cb2e549..cc80d08 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/OffsetCheckpointTest.java
@@ -16,8 +16,12 @@
  */
 package org.apache.kafka.streams.state.internals;
 
+import java.io.BufferedWriter;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,8 +30,11 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
+import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeEntry;
+import static org.apache.kafka.streams.state.internals.OffsetCheckpoint.writeIntLine;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
 
 public class OffsetCheckpointTest {
 
@@ -73,4 +80,57 @@ public class OffsetCheckpointTest {
         // deleting a non-exist checkpoint file should be fine
         checkpoint.delete();
     }
+
+    @Test
+    public void shouldSkipNegativeOffsetsDuringRead() throws IOException {
+        final File file = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(file);
+
+        try {
+            final Map<TopicPartition, Long> offsets = new HashMap<>();
+            offsets.put(new TopicPartition(topic, 0), -1L);
+
+            writeVersion0(offsets, file);
+        } finally {
+            checkpoint.delete();
+        }
+    }
+
+    @Test
+    public void shouldThrowOnNegativeOffsetInWrite() throws IOException {
+        final File f = TestUtils.tempFile();
+        final OffsetCheckpoint checkpoint = new OffsetCheckpoint(f);
+
+        try {
+            final Map<TopicPartition, Long> offsets = new HashMap<>();
+            offsets.put(new TopicPartition(topic, 0), 0L);
+            offsets.put(new TopicPartition(topic, 1), -1L);
+            offsets.put(new TopicPartition(topic, 2), 2L);
+
+            assertThrows(IllegalStateException.class, () -> checkpoint.write(offsets));
+        } finally {
+            checkpoint.delete();
+        }
+    }
+
+    /**
+     * Write all the offsets following the version 0 format without any verification (eg enforcing offsets >= 0)
+     */
+    static void writeVersion0(final Map<TopicPartition, Long> offsets, final File file) throws IOException {
+        final FileOutputStream fileOutputStream = new FileOutputStream(file);
+        try (final BufferedWriter writer = new BufferedWriter(
+            new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))) {
+            writeIntLine(writer, 0);
+            writeIntLine(writer, offsets.size());
+
+            for (final Map.Entry<TopicPartition, Long> entry : offsets.entrySet()) {
+                final TopicPartition tp = entry.getKey();
+                final Long offset = entry.getValue();
+                writeEntry(writer, tp, offset);
+            }
+
+            writer.flush();
+            fileOutputStream.getFD().sync();
+        }
+    }
 }