You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/10 17:35:45 UTC

[kafka] branch 3.0 updated (87d6d78913d -> 77230b567ab)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a change to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from 87d6d78913d MINOR: Replace left single quote with single quote in Connect worker's log message (#12201)
     new 50fa5b40a0f KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
     new 77230b567ab KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 79 ++++++++++++++++++----
 .../kafka/timeline/SnapshottableHashTable.java     | 22 +++---
 .../kafka/timeline/SnapshottableHashTableTest.java | 19 ++++++
 4 files changed, 101 insertions(+), 21 deletions(-)


[kafka] 01/02: KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 50fa5b40a0fbde2341ddca453ac92733314bfe34
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Thu Jun 30 21:03:54 2022 -0700

    KAFKA-14035; Fix NPE in `SnapshottableHashTable::mergeFrom()` (#12371)
    
    The NPE causes the kraft controller to be in an inconsistent state.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../kafka/timeline/SnapshottableHashTable.java     | 22 +++++++++++++---------
 .../kafka/timeline/SnapshottableHashTableTest.java | 19 +++++++++++++++++++
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
index cbd0a280fc1..299f65a6f78 100644
--- a/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
+++ b/metadata/src/main/java/org/apache/kafka/timeline/SnapshottableHashTable.java
@@ -111,15 +111,19 @@ class SnapshottableHashTable<T extends SnapshottableHashTable.ElementWithStartEp
         @Override
         public void mergeFrom(long epoch, Delta source) {
             HashTier<T> other = (HashTier<T>) source;
-            List<T> list = new ArrayList<>();
-            Object[] otherElements = other.deltaTable.baseElements();
-            for (int slot = 0; slot < otherElements.length; slot++) {
-                BaseHashTable.unpackSlot(list, otherElements, slot);
-                for (T element : list) {
-                    // When merging in a later hash tier, we want to keep only the elements
-                    // that were present at our epoch.
-                    if (element.startEpoch() <= epoch) {
-                        deltaTable.baseAddOrReplace(element);
+            // As an optimization, the deltaTable might not exist for a new key
+            // as there is no previous value
+            if (other.deltaTable != null) {
+                List<T> list = new ArrayList<>();
+                Object[] otherElements = other.deltaTable.baseElements();
+                for (int slot = 0; slot < otherElements.length; slot++) {
+                    BaseHashTable.unpackSlot(list, otherElements, slot);
+                    for (T element : list) {
+                        // When merging in a later hash tier, we want to keep only the elements
+                        // that were present at our epoch.
+                        if (element.startEpoch() <= epoch) {
+                            deltaTable.baseAddOrReplace(element);
+                        }
                     }
                 }
             }
diff --git a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
index 7f1ddcc3ff5..1b9dd1559ea 100644
--- a/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
+++ b/metadata/src/test/java/org/apache/kafka/timeline/SnapshottableHashTableTest.java
@@ -97,6 +97,25 @@ public class SnapshottableHashTableTest {
             new SnapshottableHashTable<>(registry, 1);
         assertEquals(0, table.snapshottableSize(Long.MAX_VALUE));
     }
+    @Test
+    public void testDeleteOnEmptyDeltaTable() {
+        // A simple test case to validate the behavior of the TimelineHashSet
+        // when the deltaTable for a snapshot is null
+        SnapshotRegistry registry = new SnapshotRegistry(new LogContext());
+        TimelineHashSet<String> set = new TimelineHashSet<>(registry, 5);
+
+        registry.getOrCreateSnapshot(100);
+        set.add("bar");
+        registry.getOrCreateSnapshot(200);
+        set.add("baz");
+        registry.revertToSnapshot(100);
+        assertTrue(set.isEmpty());
+        set.add("foo");
+        registry.getOrCreateSnapshot(300);
+        set.remove("bar");
+        registry.revertToSnapshot(100);
+        assertTrue(set.isEmpty());
+    }
 
     @Test
     public void testAddAndRemove() {


[kafka] 02/02: KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)

Posted by jg...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 77230b567ab51726302466058ca5f5e734e81664
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sun Jul 10 10:16:39 2022 -0700

    KAFKA-14055; Txn markers should not be removed by matching records in the offset map (#12390)
    
    When cleaning a topic with transactional data, if the keys used in the user data happen to conflict with the keys in the transaction markers, it is possible for the markers to get removed before the corresponding data from the transaction is removed. This results in a hanging transaction or the loss of the transaction's atomicity since it would effectively get bundled into the next transaction in the log. Currently control records are excluded when building the offset map, but not whe [...]
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala     |  2 +
 .../test/scala/unit/kafka/log/LogCleanerTest.scala | 79 ++++++++++++++++++----
 2 files changed, 69 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 12099f0f2c1..97de1db57c2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -678,6 +678,8 @@ private[log] class Cleaner(val id: Int,
         if (discardBatchRecords)
           // The batch is only retained to preserve producer sequence information; the records can be removed
           false
+        else if (batch.isControlBatch)
+          true
         else
           Cleaner.this.shouldRetainRecord(map, retainDeletesAndTxnMarkers, batch, record, stats)
       }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 5b942342193..253bf5490c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1027,6 +1027,50 @@ class LogCleanerTest {
     assertEquals(List(3, 4, 5), offsetsInLog(log))
   }
 
+
+  @Test
+  def testCleaningWithKeysConflictingWithTxnMarkerKeys(): Unit = {
+    val cleaner = makeCleaner(10)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    val leaderEpoch = 5
+    val producerEpoch = 0.toShort
+
+    // First we append one committed transaction
+    val producerId1 = 1L
+    val appendProducer = appendTransactionalAsLeader(log, producerId1, producerEpoch, leaderEpoch)
+    appendProducer(Seq(1))
+    log.appendAsLeader(commitMarker(producerId1, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+
+    // Now we append one transaction with a key which conflicts with the COMMIT marker appended above
+    def commitRecordKey(): ByteBuffer = {
+      val keySize = ControlRecordType.COMMIT.recordKey().sizeOf()
+      val key = ByteBuffer.allocate(keySize)
+      ControlRecordType.COMMIT.recordKey().writeTo(key)
+      key.flip()
+      key
+    }
+
+    val producerId2 = 2L
+    val records = MemoryRecords.withTransactionalRecords(
+      CompressionType.NONE,
+      producerId2,
+      producerEpoch,
+      0,
+      new SimpleRecord(time.milliseconds(), commitRecordKey(), ByteBuffer.wrap("foo".getBytes))
+    )
+    log.appendAsLeader(records, leaderEpoch, origin = AppendOrigin.Client)
+    log.appendAsLeader(commitMarker(producerId2, producerEpoch), leaderEpoch, origin = AppendOrigin.Coordinator)
+    log.roll()
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+
+    // After cleaning, the marker should not be removed
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+    assertEquals(List(0, 1, 2, 3), lastOffsetsPerBatchInLog(log))
+    assertEquals(List(0, 1, 2, 3), offsetsInLog(log))
+  }
+
   @Test
   def testPartialSegmentClean(): Unit = {
     // because loadFactor is 0.75, this means we can fit 1 message in the map
@@ -1917,20 +1961,31 @@ class LogCleanerTest {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def appendTransactionalAsLeader(log: Log,
-                                          producerId: Long,
-                                          producerEpoch: Short,
-                                          leaderEpoch: Int = 0,
-                                          origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
-    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true, origin = origin)
+  private def appendTransactionalAsLeader(
+    log: Log,
+    producerId: Long,
+    producerEpoch: Short,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(
+      log,
+      producerId,
+      producerEpoch,
+      isTransactional = true,
+      leaderEpoch = leaderEpoch,
+      origin = origin
+    )
   }
 
-  private def appendIdempotentAsLeader(log: Log,
-                                       producerId: Long,
-                                       producerEpoch: Short,
-                                       isTransactional: Boolean = false,
-                                       leaderEpoch: Int = 0,
-                                       origin: AppendOrigin = AppendOrigin.Client): Seq[Int] => LogAppendInfo = {
+  private def appendIdempotentAsLeader(
+    log: Log,
+    producerId: Long,
+    producerEpoch: Short,
+    isTransactional: Boolean = false,
+    leaderEpoch: Int = 0,
+    origin: AppendOrigin = AppendOrigin.Client
+  ): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>