You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/07/28 20:00:21 UTC

[kafka] 01/01: KAFKA-15271: Historicalterator can exposes elements that are too new

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-14271
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 09c3cb10cbd1e8980abd5765de594fbc65f0673a
Author: Colin P. McCabe <cm...@apache.org>
AuthorDate: Fri Jul 28 12:57:05 2023 -0700

    KAFKA-15271: Historicalterator can exposes elements that are too new
    
    A HistoricalIterator at epoch N is supposed to only reveal elements at epoch N or earlier. However,
    due to a bug, we sometimes will reveal elements which are at a newer epoch than N. The bug does
    not affect elements that are in the latest epoch (aka topTier). It only affects elements that are
    newer than N, but which do not persist until the latest epoch.  This PR fixes the bug and adds a
    unit test for this case.
---
 .../org/apache/kafka/timeline/SnapshottableHashTable.java |  6 ++++--
 .../apache/kafka/timeline/SnapshottableHashTableTest.java | 15 +++++++++++++++
 2 files changed, 19 insertions(+), 2 deletions(-)

diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 9284d5964c5..2977e061643 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -261,8 +261,10 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
                         int tierSlot = slot >>> shift;
                         BaseHashTable.unpackSlot(temp, deltaTable.baseElements(), tierSlot);
                         for (T object : temp) {
-                            if (BaseHashTable.findSlot(object, topTier.length) == slot) {
-                                ready.add(object);
+                            if (object.startEpoch() <= snapshot.epoch()) {
+                                if (BaseHashTable.findSlot(object, topTier.length) == slot) {
+                                    ready.add(object);
+                                }
                             }
                         }
                         temp.clear();
diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index 5f85463c15c..2d4a4cf9f04 100644
--- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -275,6 +275,21 @@ public class SnapshottableHashTableTest {
         assertIteratorYields(table.snapshottableIterator(Long.MAX_VALUE));
     }
 
+    @Test
+    public void testIteratorAtOlderEpoch() {
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+        SnapshottableHashTable<TestElement> table =
+                new SnapshottableHashTable<>(registry, 4);
+        assertNull(table.snapshottableAddOrReplace(E_3B));
+        registry.getOrCreateSnapshot(0);
+        assertNull(table.snapshottableAddOrReplace(E_1A));
+        registry.getOrCreateSnapshot(1);
+        assertEquals(E_1A, table.snapshottableAddOrReplace(E_1B));
+        registry.getOrCreateSnapshot(2);
+        assertEquals(E_1B, table.snapshottableRemove(E_1B));
+        assertIteratorYields(table.snapshottableIterator(1), E_3B, E_1A);
+    }
+
     /**
      * Assert that the given iterator contains the given elements, in any order.
      * We compare using reference equality here, rather than object equality.