You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/10 17:35:46 UTC
[kafka] 01/02: KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 50fa5b40a0fbde2341ddca453ac92733314bfe34
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Thu Jun 30 21:03:54 2022 -0700
KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
The NPE causes the kraft controller to be in an inconsistent state.
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../kafka/timeline/SnapshottableHashTable.java | 22 +++++++++++++---------
.../kafka/timeline/SnapshottableHashTableTest.java | 19 +++++++++++++++++++
2 files changed, 32 insertions(+), 9 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index cbd0a280fc1..299f65a6f78 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -111,15 +111,19 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
@Override
public void mergeFrom(long epoch, Delta source) {
HashTier<T> other = (HashTier<T>) source;
- List<T> list = new ArrayList<>();
- Object[] otherElements = other.deltaTable.baseElements();
- for (int slot = 0; slot < otherElements.length; slot++) {
- BaseHashTable.unpackSlot(list, otherElements, slot);
- for (T element : list) {
- // When merging in a later hash tier, we want to keep only the elements
- // that were present at our epoch.
- if (element.startEpoch() <= epoch) {
- deltaTable.baseAddOrReplace(element);
+ // As an optimization, the deltaTable might not exist for a new key
+ // as there is no previous value
+ if (other.deltaTable != null) {
+ List<T> list = new ArrayList<>();
+ Object[] otherElements = other.deltaTable.baseElements();
+ for (int slot = 0; slot < otherElements.length; slot++) {
+ BaseHashTable.unpackSlot(list, otherElements, slot);
+ for (T element : list) {
+ // When merging in a later hash tier, we want to keep only the elements
+ // that were present at our epoch.
+ if (element.startEpoch() <= epoch) {
+ deltaTable.baseAddOrReplace(element);
+ }
}
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index 7f1ddcc3ff5..1b9dd1559ea 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -97,6 +97,25 @@ public class SnapshottableHashTableTest {
new SnapshottableHashTable<>(registry, 1);
assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
}
+ @Test
+ public void testDeleteOnEmptyDeltaTable() {
+ // A simple test case to validate the behavior of the TimelineHashSet
+ // when the deltaTable for a snapshot is null
+ SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+ TimelineHashSet<String> set = new TimelineHashSet<>(registry, 5);
+
+ registry.getOrCreateSnapshot(100);
+ set.add("bar");
+ registry.getOrCreateSnapshot(200);
+ set.add("baz");
+ registry.revertToSnapshot(100);
+ assertTrue(set.isEmpty());
+ set.add("foo");
+ registry.getOrCreateSnapshot(300);
+ set.remove("bar");
+ registry.revertToSnapshot(100);
+ assertTrue(set.isEmpty());
+ }
@Test
public void testAddAndRemove() {