You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2023/05/03 17:10:54 UTC

[kafka] branch 3.4 updated: KAFKA-14946: fix NPE when merging the deltatable (#13653)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.4 by this push:
     new 09d794852a1 KAFKA-14946: fix NPE when merging the deltatable (#13653)
09d794852a1 is described below

commit 09d794852a165fb1f911922d61fbd1c3ed6f8b87
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu May 4 01:08:25 2023 +0800

    KAFKA-14946: fix NPE when merging the deltatable (#13653)
    
    Fix NPE while merging the deltatable. Because it's possible that hashTier is
    not null but deltatable is null (ex: removing data), we should have null check
    while merging for deltatable like other places did. Also added tests that will
    fail without this change.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../kafka/timeline/SnapshottableHashTable.java     |  3 +++
 .../kafka/timeline/SnapshottableHashTableTest.java | 26 ++++++++++++++++++++++
 2 files changed, 29 insertions(+)

diff --git a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index 299f65a6f78..9284d5964c5 100644
--- a/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/server-common/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -122,6 +122,9 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
                         // When merging in a later hash tier, we want to keep only the elements
                         // that were present at our epoch.
                         if (element.startEpoch() <= epoch) {
+                            if (deltaTable == null) {
+                                deltaTable = new BaseHashTable<>(1);
+                            }
                             deltaTable.baseAddOrReplace(element);
                         }
                     }
diff --git a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index 93e6b127ab3..01ea35935cd 100644
--- a/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/server-common/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -108,11 +108,37 @@ public class SnapshottableHashTableTest {
         set.add("bar");
         registry.getOrCreateSnapshot(200);
         set.add("baz");
+
+        // The deltatable of epoch 200 is null, it should not throw exception while reverting (deltatable merge)
         registry.revertToSnapshot(100);
         assertTrue(set.isEmpty());
         set.add("foo");
         registry.getOrCreateSnapshot(300);
+        // After reverting to epoch 100, "bar" is not existed anymore
         set.remove("bar");
+        // No deltatable merging is needed because nothing change in snapshot epoch 300
+        registry.revertToSnapshot(100);
+        assertTrue(set.isEmpty());
+
+        set.add("qux");
+        registry.getOrCreateSnapshot(400);
+        assertEquals(1, set.size());
+        set.add("fred");
+        set.add("thud");
+        registry.getOrCreateSnapshot(500);
+        assertEquals(3, set.size());
+
+        // remove the value in epoch 101(after epoch 100), it'll create an entry in deltatable in the snapshot of epoch 500 for the deleted value in epoch 101
+        set.remove("qux");
+        assertEquals(2, set.size());
+        // When reverting to snapshot of epoch 400, we'll merge the deltatable in epoch 500 with the one in epoch 400.
+        // The deltatable in epoch 500 has an entry created above, but the deltatable in epoch 400 is null.
+        // It should not throw exception while reverting (deltatable merge)
+        registry.revertToSnapshot(400);
+        // After reverting, the deltatable in epoch 500 should merge to the current epoch
+        assertEquals(1, set.size());
+
+        // When reverting to epoch 100, the deltatable in epoch 400 won't be merged because the entry change is epoch 101(after epoch 100)
         registry.revertToSnapshot(100);
         assertTrue(set.isEmpty());
     }