You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/01 17:50:52 UTC

[kafka] branch 3.6 updated (b6c5ac0913b -> 8eb9105e51c)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a change to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


    from b6c5ac0913b KAFKA-15404: Disable the flaky integration tests. (#14296)
     new 771f14ca386 KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled (#14301)
     new 8eb9105e51c KAFKA-15427: Fix resource leak in integration tests for tiered storage (#14319)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 core/src/main/scala/kafka/log/LogLoader.scala      |  9 +++-
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 22 ++++++----
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 48 +++++++++++++++++++++
 ...RemoteLogMetadataManagerWrapperWithHarness.java |  2 +-
 .../tiered/storage/TieredStorageTestContext.java   | 33 +++++++++++++--
 .../OffloadAndConsumeFromLeaderTest.java           | 49 +++++++++++-----------
 6 files changed, 125 insertions(+), 38 deletions(-)


[kafka] 02/02: KAFKA-15427: Fix resource leak in integration tests for tiered storage (#14319)

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8eb9105e51ccb8cb232281c66a4ddaae84986be1
Author: Christo Lolov <lo...@amazon.com>
AuthorDate: Fri Sep 1 18:42:57 2023 +0100

    KAFKA-15427: Fix resource leak in integration tests for tiered storage (#14319)
    
    Co-authored-by: Nikhil Ramakrishnan <ni...@amazon.com>
    
    Reviewers: Satish Duggana <sa...@apache.org>, Luke Chen <sh...@gmail.com>
---
 .../storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
index e73ac31b16c..70ce9190d81 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
@@ -93,7 +93,7 @@ public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements Rem
 
     @Override
     public void close() throws IOException {
-        remoteLogMetadataManagerHarness.remoteLogMetadataManager().close();
+        remoteLogMetadataManagerHarness.close();
     }
 
     @Override


[kafka] 01/02: KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled (#14301)

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 771f14ca386ac23f13b563f7093070cb018f4266
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Fri Sep 1 06:33:33 2023 +0530

    KAFKA-15351: Ensure log-start-offset not updated to local-log-start-offset when remote storage enabled (#14301)
    
    When tiered storage is enabled on the topic, and the last-standing-replica is restarted, then the log-start-offset should not reset its offset to first-local-log-segment-base-offset.
    
    Reviewers: Satish Duggana <sa...@apache.org>, Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Christo Lolov <lo...@amazon.com>
---
 core/src/main/scala/kafka/log/LogLoader.scala      |  9 +++-
 core/src/main/scala/kafka/log/UnifiedLog.scala     | 22 ++++++----
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 48 +++++++++++++++++++++
 .../tiered/storage/TieredStorageTestContext.java   | 33 +++++++++++++--
 .../OffloadAndConsumeFromLeaderTest.java           | 49 +++++++++++-----------
 5 files changed, 124 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogLoader.scala b/core/src/main/scala/kafka/log/LogLoader.scala
index cc0232aef28..37210bb226c 100644
--- a/core/src/main/scala/kafka/log/LogLoader.scala
+++ b/core/src/main/scala/kafka/log/LogLoader.scala
@@ -78,7 +78,8 @@ class LogLoader(
   recoveryPointCheckpoint: Long,
   leaderEpochCache: Option[LeaderEpochFileCache],
   producerStateManager: ProducerStateManager,
-  numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]
+  numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
+  isRemoteLogEnabled: Boolean = false,
 ) extends Logging {
   logIdent = s"[LogLoader partition=$topicPartition, dir=${dir.getParent}] "
 
@@ -180,7 +181,11 @@ class LogLoader(
     }
 
     leaderEpochCache.foreach(_.truncateFromEnd(nextOffset))
-    val newLogStartOffset = math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
+    val newLogStartOffset = if (isRemoteLogEnabled) {
+      logStartOffsetCheckpoint
+    } else {
+      math.max(logStartOffsetCheckpoint, segments.firstSegment.get.baseOffset)
+    }
     // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
     leaderEpochCache.foreach(_.truncateFromStart(logStartOffsetCheckpoint))
 
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 10c86183b3f..c289cf25fef 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -187,12 +187,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   def remoteLogEnabled(): Boolean = {
-    // Remote log is enabled only for non-compact and non-internal topics
-    remoteStorageSystemEnable &&
-      !(config.compact || Topic.isInternal(topicPartition.topic())
-        || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic())
-        || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) &&
-      config.remoteStorageEnable()
+    UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic())
   }
 
   /**
@@ -1882,6 +1877,17 @@ object UnifiedLog extends Logging {
 
   val UnknownOffset = LocalLog.UnknownOffset
 
+  def isRemoteLogEnabled(remoteStorageSystemEnable: Boolean,
+                         config: LogConfig,
+                         topic: String): Boolean = {
+    // Remote log is enabled only for non-compact and non-internal topics
+    remoteStorageSystemEnable &&
+      !(config.compact || Topic.isInternal(topic)
+        || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topic)
+        || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topic)) &&
+      config.remoteStorageEnable()
+  }
+
   def apply(dir: File,
             config: LogConfig,
             logStartOffset: Long,
@@ -1911,6 +1917,7 @@ object UnifiedLog extends Logging {
       s"[UnifiedLog partition=$topicPartition, dir=${dir.getParent}] ")
     val producerStateManager = new ProducerStateManager(topicPartition, dir,
       maxTransactionTimeoutMs, producerStateManagerConfig, time)
+    val isRemoteLogEnabled = UnifiedLog.isRemoteLogEnabled(remoteStorageSystemEnable, config, topicPartition.topic)
     val offsets = new LogLoader(
       dir,
       topicPartition,
@@ -1924,7 +1931,8 @@ object UnifiedLog extends Logging {
       recoveryPoint,
       leaderEpochCache,
       producerStateManager,
-      numRemainingSegments
+      numRemainingSegments,
+      isRemoteLogEnabled,
     ).load()
     val localLog = new LocalLog(dir, config, segments, offsets.recoveryPoint,
       offsets.nextOffsetMetadata, scheduler, time, topicPartition, logDirFailureChannel)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index 64527c70790..13870904ca6 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -37,6 +37,8 @@ import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, EpochE
 import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
 import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.CsvSource
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyLong}
 import org.mockito.Mockito.{mock, reset, times, verify, when}
@@ -1753,4 +1755,50 @@ class LogLoaderTest {
 
     log.close()
   }
+
+  @ParameterizedTest
+  @CsvSource(Array("false, 5", "true, 0"))
+  def testLogStartOffsetWhenRemoteStorageIsEnabled(isRemoteLogEnabled: Boolean,
+                                                   expectedLogStartOffset: Long): Unit = {
+    val logDirFailureChannel = null
+    val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
+    val logConfig = LogTestUtils.createLogConfig()
+    val stateManager: ProducerStateManager = mock(classOf[ProducerStateManager])
+    when(stateManager.isEmpty).thenReturn(true)
+
+    val log = createLog(logDir, logConfig)
+    // Create segments: [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-6], [7-7], [8-8], [9-]
+    //                   |---> logStartOffset                                           |---> active segment (empty)
+    //                                                                                  |---> logEndOffset
+    for (i <- 0 until 9) {
+      val record = new SimpleRecord(mockTime.milliseconds, i.toString.getBytes)
+      log.appendAsLeader(TestUtils.records(List(record)), leaderEpoch = 0)
+      log.roll()
+    }
+    log.maybeIncrementHighWatermark(new LogOffsetMetadata(9L))
+    log.maybeIncrementLogStartOffset(5L, LogStartOffsetIncrementReason.SegmentDeletion)
+    log.deleteOldSegments()
+
+    val segments = new LogSegments(topicPartition)
+    log.logSegments.foreach(segment => segments.add(segment))
+    assertEquals(5, segments.firstSegment.get.baseOffset)
+
+    val leaderEpochCache = UnifiedLog.maybeCreateLeaderEpochCache(logDir, topicPartition, logDirFailureChannel, logConfig.recordVersion, "")
+    val offsets = new LogLoader(
+      logDir,
+      topicPartition,
+      logConfig,
+      mockTime.scheduler,
+      mockTime,
+      logDirFailureChannel,
+      hadCleanShutdown = true,
+      segments,
+      0L,
+      0L,
+      leaderEpochCache,
+      stateManager,
+      isRemoteLogEnabled = isRemoteLogEnabled
+    ).load()
+    assertEquals(expectedLogStartOffset, offsets.logStartOffset)
+  }
 }
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
index 593d69cb38c..99e76293e45 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestContext.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.tiered.storage;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.tiered.storage.specs.ExpandPartitionCountSpec;
@@ -90,15 +92,21 @@ public final class TieredStorageTestContext implements AutoCloseable {
 
     @SuppressWarnings("deprecation")
     private void initClients() {
+        // rediscover the new bootstrap-server port incase of broker restarts
+        ListenerName listenerName = harness.listenerName();
+        Properties commonOverrideProps = new Properties();
+        commonOverrideProps.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, harness.bootstrapServers(listenerName));
+
         // Set a producer linger of 60 seconds, in order to optimistically generate batches of
         // records with a pre-determined size.
         Properties producerOverrideProps = new Properties();
         producerOverrideProps.put(LINGER_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(60)));
-        producer = harness.createProducer(ser, ser, producerOverrideProps);
+        producerOverrideProps.putAll(commonOverrideProps);
 
-        consumer = harness.createConsumer(de, de, new Properties(),
+        producer = harness.createProducer(ser, ser, producerOverrideProps);
+        consumer = harness.createConsumer(de, de, commonOverrideProps,
                 JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toList());
-        admin = harness.createAdminClient(harness.listenerName(), new Properties());
+        admin = harness.createAdminClient(listenerName, commonOverrideProps);
     }
 
     private void initContext() {
@@ -228,7 +236,11 @@ public final class TieredStorageTestContext implements AutoCloseable {
 
     public void bounce(int brokerId) {
         harness.killBroker(brokerId);
+        boolean allBrokersDead = harness.aliveBrokers().isEmpty();
         harness.startBroker(brokerId);
+        if (allBrokersDead) {
+            reinitClients();
+        }
         initContext();
     }
 
@@ -238,7 +250,11 @@ public final class TieredStorageTestContext implements AutoCloseable {
     }
 
     public void start(int brokerId) {
+        boolean allBrokersDead = harness.aliveBrokers().isEmpty();
         harness.startBroker(brokerId);
+        if (allBrokersDead) {
+            reinitClients();
+        }
         initContext();
     }
 
@@ -310,5 +326,16 @@ public final class TieredStorageTestContext implements AutoCloseable {
 
     @Override
     public void close() throws IOException {
+        // IntegrationTestHarness closes the clients on tearDown, no need to close them explicitly.
+    }
+
+    private void reinitClients() {
+        // Broker uses a random port (TestUtils.RandomPort) for the listener. If the initial bootstrap-server config
+        // becomes invalid, then the clients won't be able to reconnect to the cluster.
+        // To avoid this, we reinitialize the clients after all the brokers are bounced.
+        Utils.closeQuietly(producer, "Producer client");
+        Utils.closeQuietly(consumer, "Consumer client");
+        Utils.closeQuietly(admin, "Admin client");
+        initClients();
     }
 }
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
index b5da2308d14..ffb8e666d18 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/OffloadAndConsumeFromLeaderTest.java
@@ -24,7 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Test Cases (A):
+ * Test Cases:
  *    Elementary offloads and fetches from tiered storage.
  */
 public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarness {
@@ -46,14 +46,14 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
         final Integer p0 = 0;
         final Integer partitionCount = 1;
         final Integer replicationFactor = 1;
-        final Integer maxBatchCountPerSegment = 1;
+        final Integer oneBatchPerSegment = 1;
+        final Integer twoBatchPerSegment = 2;
         final Map<Integer, List<Integer>> replicaAssignment = null;
         final boolean enableRemoteLogStorage = true;
-        final Integer batchSize = 1;
 
         builder
                 /*
-                 * (A.1) Create a topic which segments contain only one batch and produce three records
+                 * (1) Create a topic which segments contain only one batch and produce three records
                  *       with a batch size of 1.
                  *
                  *       The topic and broker are configured so that the two rolled segments are picked from
@@ -68,23 +68,23 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
                  *           Log tA-p0                         Log tA-p0
                  *          *-------------------*             *-------------------*
                  *          | base offset = 2   |             |  base offset = 0  |
-                 *          | (k3, v3)          |             |  (k1, v1)         |
+                 *          | (k2, v2)          |             |  (k0, v0)         |
                  *          *-------------------*             *-------------------*
                  *                                            *-------------------*
                  *                                            |  base offset = 1  |
-                 *                                            |  (k2, v2)         |
+                 *                                            |  (k1, v1)         |
                  *                                            *-------------------*
                  */
-                .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment, replicaAssignment,
+                .createTopic(topicA, partitionCount, replicationFactor, oneBatchPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
-                .withBatchSize(topicA, p0, batchSize)
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k1", "v1"))
-                .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k2", "v2"))
-                .produce(topicA, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"),
-                        new KeyValueSpec("k3", "v3"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
 
                 /*
-                 * (A.2) Similar scenario as above, but with segments of two records.
+                 * (2) Similar scenario as above, but with segments of two records.
                  *
                  *       Acceptance:
                  *       -----------
@@ -95,28 +95,27 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
                  *           Log tB-p0                         Log tB-p0
                  *          *-------------------*             *-------------------*
                  *          | base offset = 4   |             |  base offset = 0  |
-                 *          | (k5, v5)          |             |  (k1, v1)         |
-                 *          *-------------------*             |  (k2, v2)         |
+                 *          | (k4, v4)          |             |  (k0, v0)         |
+                 *          *-------------------*             |  (k1, v1)         |
                  *                                            *-------------------*
                  *                                            *-------------------*
                  *                                            |  base offset = 2  |
+                 *                                            |  (k2, v2)         |
                  *                                            |  (k3, v3)         |
-                 *                                            |  (k4, v4)         |
                  *                                            *-------------------*
                  */
-                .createTopic(topicB, partitionCount, replicationFactor, 2, replicaAssignment,
+                .createTopic(topicB, partitionCount, replicationFactor, twoBatchPerSegment, replicaAssignment,
                         enableRemoteLogStorage)
-                .withBatchSize(topicB, p0, batchSize)
                 .expectEarliestLocalOffsetInLogDirectory(topicB, p0, 4L)
                 .expectSegmentToBeOffloaded(broker, topicB, p0, 0,
-                        new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"))
+                        new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"))
                 .expectSegmentToBeOffloaded(broker, topicB, p0, 2,
-                        new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))
-                .produce(topicB, p0, new KeyValueSpec("k1", "v1"), new KeyValueSpec("k2", "v2"),
-                        new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"), new KeyValueSpec("k5", "v5"))
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"))
+                .produce(topicB, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"), new KeyValueSpec("k3", "v3"), new KeyValueSpec("k4", "v4"))
 
                 /*
-                 * (A.3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption
+                 * (3) Stops and restarts the broker. The purpose of this test is to a) exercise consumption
                  *       from a given offset and b) verify that upon broker start, existing remote log segments
                  *       metadata are loaded by Kafka and these log segments available.
                  *
@@ -124,10 +123,10 @@ public final class OffloadAndConsumeFromLeaderTest extends TieredStorageTestHarn
                  *       -----------
                  *       - For topic A, this offset is defined such that only the second segment is fetched from
                  *         the tiered storage.
-                 *       - For topic B, only one segment is present in the tiered storage, as asserted by the
+                 *       - For topic B, two segments are present in the tiered storage, as asserted by the
                  *         previous sub-test-case.
                  */
-                // .bounce(broker)
+                .bounce(broker)
                 .expectFetchFromTieredStorage(broker, topicA, p0, 1)
                 .consume(topicA, p0, 1L, 2, 1)
                 .expectFetchFromTieredStorage(broker, topicB, p0, 2)