You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/01 04:07:58 UTC

[kafka] branch 3.1 updated: KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.1 by this push:
     new 2d07fdd58e KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
2d07fdd58e is described below

commit 2d07fdd58e699231b1e9694cd352ce19f42cd83d
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Thu Jun 30 21:03:54 2022 -0700

    KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
    
    The NPE causes the kraft controller to be in an inconsistent state.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/timeline/SnapshottableHashTable.java     | 22 +++++++++++++---------
 .../kafka/timeline/SnapshottableHashTableTest.java | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index cbd0a280fc..299f65a6f7 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -111,15 +111,19 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
         @Override
         public void mergeFrom(long epoch, Delta source) {
             HashTier<T> other = (HashTier<T>) source;
-            List<T> list = new ArrayList<>();
-            Object[] otherElements = other.deltaTable.baseElements();
-            for (int slot = 0; slot < otherElements.length; slot++) {
-                BaseHashTable.unpackSlot(list, otherElements, slot);
-                for (T element : list) {
-                    // When merging in a later hash tier, we want to keep only the elements
-                    // that were present at our epoch.
-                    if (element.startEpoch() <= epoch) {
-                        deltaTable.baseAddOrReplace(element);
+            // As an optimization, the deltaTable might not exist for a new key
+            // as there is no previous value
+            if (other.deltaTable != null) {
+                List<T> list = new ArrayList<>();
+                Object[] otherElements = other.deltaTable.baseElements();
+                for (int slot = 0; slot < otherElements.length; slot++) {
+                    BaseHashTable.unpackSlot(list, otherElements, slot);
+                    for (T element : list) {
+                        // When merging in a later hash tier, we want to keep only the elements
+                        // that were present at our epoch.
+                        if (element.startEpoch() <= epoch) {
+                            deltaTable.baseAddOrReplace(element);
+                        }
                     }
                 }
             }
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index 7f1ddcc3ff..1b9dd1559e 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -97,6 +97,25 @@ public class SnapshottableHashTableTest {
             new SnapshottableHashTable<>(registry, 1);
         assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
     }
+    @Test
+    public void testDeleteOnEmptyDeltaTable() {
+        // A simple test case to validate the behavior of the TimelineHashSet
+        // when the deltaTable for a snapshot is null
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+        TimelineHashSet<String> set = new TimelineHashSet<>(registry, 5);
+
+        registry.getOrCreateSnapshot(100);
+        set.add("bar");
+        registry.getOrCreateSnapshot(200);
+        set.add("baz");
+        registry.revertToSnapshot(100);
+        assertTrue(set.isEmpty());
+        set.add("foo");
+        registry.getOrCreateSnapshot(300);
+        set.remove("bar");
+        registry.revertToSnapshot(100);
+        assertTrue(set.isEmpty());
+    }
 
     @Test
     public void testAddAndRemove() {