You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/09/14 01:53:01 UTC

[kafka] branch 3.6 updated: KAFKA-15439: Transactions test with tiered storage (#14347)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 2508e306709 KAFKA-15439: Transactions test with tiered storage (#14347)
2508e306709 is described below

commit 2508e306709943ba333b58b8913e1951f2df29f5
Author: Kamal Chandraprakash <ka...@gmail.com>
AuthorDate: Thu Sep 14 07:22:13 2023 +0530

    KAFKA-15439: Transactions test with tiered storage (#14347)
    
    This test extends the existing TransactionsTest. It configures the broker and topic with tiered storage and expects at-least one log segment to be uploaded to the remote storage.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>,  Divij Vaidya <di...@amazon.com>
---
 .../integration/kafka/api/TransactionsTest.scala   |  83 ++++++---
 .../tiered/storage/TieredStorageTestHarness.java   |  89 +---------
 .../tiered/storage/actions/ConsumeAction.java      |   2 +-
 .../tiered/storage/actions/CreateTopicAction.java  |  28 ++--
 .../storage/actions/ExpectBrokerInISRAction.java   |   2 +-
 .../tiered/storage/actions/ExpectLeaderAction.java |   2 +-
 .../ExpectTopicIdToMatchInRemoteStorageAction.java |   2 +-
 ...tUserTopicMappedToMetadataPartitionsAction.java |   2 +-
 .../tiered/storage/actions/ProduceAction.java      |   2 +-
 .../storage/actions/ReassignReplicaAction.java     |   2 +-
 .../storage/actions/ShrinkReplicaAction.java       |   2 +-
 .../TransactionsWithTieredStoreTest.java           | 108 ++++++++++++
 .../kafka/tiered/storage/utils/ActionUtils.java    |  72 --------
 .../storage/utils/TieredStorageTestUtils.java      | 185 +++++++++++++++++++++
 14 files changed, 377 insertions(+), 204 deletions(-)

diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 8d849136dd3..5936414958c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -35,8 +35,9 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
-import scala.collection.mutable.Buffer
+import scala.collection.mutable.{Buffer, ListBuffer}
 import scala.concurrent.ExecutionException
 
 class TransactionsTest extends IntegrationTestHarness {
@@ -54,26 +55,43 @@ class TransactionsTest extends IntegrationTestHarness {
   val transactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]()
   val nonTransactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]()
 
-  serverConfig.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
-  // Set a smaller value for the number of partitions for the __consumer_offsets topic
-  // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
-  serverConfig.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
-  serverConfig.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
-  serverConfig.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
-  serverConfig.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
-  serverConfig.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
-  serverConfig.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
-  serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
-  serverConfig.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
-  serverConfig.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
+  def overridingProps(): Properties = {
+    val props = new Properties()
+    props.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+     // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+    props.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+    props.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
+    props.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
+    props.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
+    props.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+    props.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+    props.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+    props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+    props.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
+    props
+
+  }
+
+  override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+    props.foreach(p => p.putAll(overridingProps()))
+  }
+
+  override protected def kraftControllerConfigs(): Seq[Properties] = {
+    Seq(overridingProps())
+
+  }
+
+  def topicConfig(): Properties = {
+    val topicConfig = new Properties()
+    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+    topicConfig
+  }
 
   @BeforeEach
   override def setUp(testInfo: TestInfo): Unit = {
     super.setUp(testInfo)
-    val topicConfig = new Properties()
-    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
-    createTopic(topic1, numPartitions, brokerCount, topicConfig)
-    createTopic(topic2, numPartitions, brokerCount, topicConfig)
+    createTopic(topic1, numPartitions, brokerCount, topicConfig())
+    createTopic(topic2, numPartitions, brokerCount, topicConfig())
 
     for (_ <- 0 until transactionalProducerCount)
       createTransactionalProducer("transactional-producer")
@@ -97,20 +115,25 @@ class TransactionsTest extends IntegrationTestHarness {
     val producer = transactionalProducers.head
     val consumer = transactionalConsumers.head
     val unCommittedConsumer = nonTransactionalConsumers.head
+    val tp11 = new TopicPartition(topic1, 1)
+    val tp22 = new TopicPartition(topic2, 2)
 
     producer.initTransactions()
 
     producer.beginTransaction()
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = false))
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "4", "4", willBeCommitted = false))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "2", "2", willBeCommitted = false))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false))
     producer.flush()
+
     producer.abortTransaction()
 
     producer.beginTransaction()
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = true))
-    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "1", "1", willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "3", "3", willBeCommitted = true))
     producer.commitTransaction()
 
+    maybeWaitForAtLeastOneSegmentUpload(tp11, tp22)
+
     consumer.subscribe(List(topic1, topic2).asJava)
     unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
 
@@ -199,6 +222,7 @@ class TransactionsTest extends IntegrationTestHarness {
   def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
     val producer1 = transactionalProducers.head
     val producer2 = createTransactionalProducer("other")
+    val tp10 = new TopicPartition(topic1, 0)
 
     producer1.initTransactions()
     producer2.initTransactions()
@@ -218,13 +242,15 @@ class TransactionsTest extends IntegrationTestHarness {
     producer1.abortTransaction()
     producer2.commitTransaction()
 
+    maybeWaitForAtLeastOneSegmentUpload(tp10)
+
     // ensure that the consumer's fetch will sit in purgatory
     val consumerProps = new Properties()
     consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000")
     consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100")
     val readCommittedConsumer = createReadCommittedConsumer(props = consumerProps)
 
-    readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0)).asJava)
+    readCommittedConsumer.assign(Set(tp10).asJava)
     val records = consumeRecords(readCommittedConsumer, numRecords = 2)
     assertEquals(2, records.size)
 
@@ -309,6 +335,12 @@ class TransactionsTest extends IntegrationTestHarness {
       consumer.close()
     }
 
+    val partitions = ListBuffer.empty[TopicPartition]
+    for (partition <- 0 until numPartitions) {
+      partitions += new TopicPartition(topic2, partition)
+    }
+    maybeWaitForAtLeastOneSegmentUpload(partitions.toSeq: _*)
+
     // In spite of random aborts, we should still have exactly 500 messages in topic2. I.e. we should not
     // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
     val verifyingConsumer = transactionalConsumers(0)
@@ -592,10 +624,8 @@ class TransactionsTest extends IntegrationTestHarness {
     val unCommittedConsumer = nonTransactionalConsumers.head
     val topicWith10Partitions = "largeTopic"
     val topicWith10PartitionsAndOneReplica = "largeTopicOneReplica"
-    val topicConfig = new Properties()
-    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
 
-    createTopic(topicWith10Partitions, 10, brokerCount, topicConfig)
+    createTopic(topicWith10Partitions, 10, brokerCount, topicConfig())
     createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties())
 
     firstProducer.initTransactions()
@@ -802,4 +832,7 @@ class TransactionsTest extends IntegrationTestHarness {
     transactionalProducers += producer
     producer
   }
+
+  def maybeWaitForAtLeastOneSegmentUpload(topicPartitions: TopicPartition*): Unit = {
+  }
 }
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 34ff17f9d57..f91a542768b 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -16,26 +16,25 @@
  */
 package org.apache.kafka.tiered.storage;
 
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
 import kafka.api.IntegrationTestHarness;
 import kafka.log.remote.RemoteLogManager;
 import kafka.server.KafkaBroker;
 import kafka.server.KafkaConfig;
 import org.apache.kafka.common.replica.ReplicaSelector;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
-import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
 import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
 import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
 import scala.collection.Seq;
 
 import java.util.ArrayList;
@@ -44,22 +43,10 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import scala.collection.JavaConverters;
-
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
-
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
-
-import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
-import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
 
 /**
  * Base class for integration tests exercising the tiered storage functionality in Apache Kafka.
@@ -69,16 +56,6 @@ import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STOR
 @Tag("integration")
 public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
 
-    /**
-     * InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka.
-     * Hence, we need to wait at least that amount of time before segments eligible for deletion
-     * gets physically removed.
-     */
-    private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
-    // The default value of log cleanup interval is 30 secs, and it increases the test execution time.
-    private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
-    private static final Integer RLM_TASK_INTERVAL_MS = 500;
-
     private TieredStorageTestContext context;
     private String testClassName = "";
     private String storageDirPath = "";
@@ -102,52 +79,10 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
     }
 
     public Properties overridingProps() {
-        Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
-                "STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS");
-
-        Properties overridingProps = new Properties();
-        // Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background
-        // activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test.
-        //
-        // The replication factor of the remote log metadata topic needs to be chosen so that in resiliency
-        // tests, metadata can survive the loss of one replica for its topic-partitions.
-        //
-        // The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred
-        // data files on the local file system.
-        overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
-        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
-        overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
-                TopicBasedRemoteLogMetadataManager.class.getName());
-        overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
-        overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
-
-        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(""));
-        overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(""));
-
-        overridingProps.setProperty(
-                metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
-                String.valueOf(numRemoteLogMetadataPartitions()));
-        overridingProps.setProperty(
-                metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
-                String.valueOf(brokerCount()));
-        // This configuration ensures inactive log segments are deleted fast enough so that
-        // the integration tests can confirm a given log segment is present only in the second-tier storage.
-        // Note that this does not impact the eligibility of a log segment to be offloaded to the
-        // second-tier storage.
-        overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), LOG_CLEANUP_INTERVAL_MS.toString());
-        // This can be customized to read remote log segments from followers.
+        Properties overridingProps = createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(),
+                numRemoteLogMetadataPartitions(), new Properties());
         readReplicaSelectorClass()
                 .ifPresent(c -> overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName()));
-        // The directory of the second-tier storage needs to be constant across all instances of storage managers
-        // in every broker and throughout the test. Indeed, as brokers are restarted during the test.
-        // You can override this property with a fixed path of your choice if you wish to use a non-temporary
-        // directory to access its content after a test terminated.
-        overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), storageDirPath);
-        // This configuration will remove all the remote files when close is called in remote storage manager.
-        // Storage manager close is being called while the server is actively processing the socket requests,
-        // so enabling this config can break the existing tests.
-        // NOTE: When using TestUtils#tempDir(), the folder gets deleted when VM terminates.
-        overridingProps.setProperty(storageConfigPrefix(DELETE_ON_CLOSE_CONFIG), "false");
         return overridingProps;
     }
 
@@ -193,14 +128,6 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
         }
     }
 
-    private String storageConfigPrefix(String key) {
-        return "rsm.config." + testClassName + "." + key;
-    }
-
-    private String metadataConfigPrefix(String key) {
-        return "rlmm.config." + testClassName + "." + key;
-    }
-
     @SuppressWarnings("deprecation")
     public static List<LocalTieredStorage> remoteStorageManagers(Seq<KafkaBroker> brokers) {
         List<LocalTieredStorage> storages = new ArrayList<>();
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index 7980a7c5e07..0fd7ceed657 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.tieredStorageRecords;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
 import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java
index 746b9610e4f..c9d9b1b8959 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java
@@ -19,11 +19,13 @@ package org.apache.kafka.tiered.storage.actions;
 import org.apache.kafka.tiered.storage.TieredStorageTestAction;
 import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 import org.apache.kafka.tiered.storage.specs.TopicSpec;
-import org.apache.kafka.common.config.TopicConfig;
 
 import java.io.PrintStream;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
+
 public final class CreateTopicAction implements TieredStorageTestAction {
 
     private final TopicSpec spec;
@@ -34,23 +36,13 @@ public final class CreateTopicAction implements TieredStorageTestAction {
 
     @Override
     public void doExecute(TieredStorageTestContext context) throws ExecutionException, InterruptedException {
-        // Ensure offset and time indexes are generated for every record.
-        spec.getProperties().put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
-        // Leverage the use of the segment index size to create a log-segment accepting one and only one record.
-        // The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the
-        // time index. Hence, since the topic is configured to generate index entries for every record with, for
-        // a "small" number of records (i.e. such that the average record size times the number of records is
-        // much less than the segment size), the number of records which hold in a segment is the multiple of 12
-        // defined below.
-        if (spec.getMaxBatchCountPerSegment() != -1) {
-            spec.getProperties().put(
-                    TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * spec.getMaxBatchCountPerSegment()));
-        }
-        // To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
-        // want to delete log segments as soon as possible. When tiered storage is active, an inactive log
-        // segment is not eligible for deletion until it has been offloaded, which guarantees all segments
-        // should be offloaded before deletion, and their consumption is possible thereafter.
-        spec.getProperties().put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+        boolean enableRemoteStorage = true;
+        Map<String, String> topicConfigs = createTopicConfigForRemoteStorage(
+                enableRemoteStorage, spec.getMaxBatchCountPerSegment());
+        topicConfigs.putAll(spec.getProperties());
+
+        spec.getProperties().clear();
+        spec.getProperties().putAll(topicConfigs);
         context.createTopic(spec);
     }
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java
index cabe4c2d7b0..0c4a8b926d2 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java
@@ -25,7 +25,7 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
 
 import java.io.PrintStream;
 
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
 
 public final class ExpectBrokerInISRAction implements TieredStorageTestAction {
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
index 8f41f578f32..f81cfcb5cf8 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
 
 public final class ExpectLeaderAction implements TieredStorageTestAction {
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java
index 9f404f99500..1749ffc94d0 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java
@@ -27,7 +27,7 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public final class ExpectTopicIdToMatchInRemoteStorageAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java
index 72630fad3a0..ef7066ffe04 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopics;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopics;
 
 public final class ExpectUserTopicMappedToMetadataPartitionsAction implements TieredStorageTestAction {
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
index 56287cebdcb..344d8c4afbd 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
 import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.tieredStorageRecords;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
 import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java
index da989cbd6db..06d9013649b 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java
@@ -33,7 +33,7 @@ import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
 
 public final class ReassignReplicaAction implements TieredStorageTestAction {
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java
index f15a0ae26ca..fc840d04853 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java
@@ -33,7 +33,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
 
 public final class ShrinkReplicaAction implements TieredStorageTestAction {
 
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
new file mode 100644
index 00000000000..61e68d65f83
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import kafka.api.TransactionsTest;
+import kafka.server.HostedPartition;
+import kafka.server.KafkaBroker;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.JavaConverters;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
+
+public class TransactionsWithTieredStoreTest extends TransactionsTest {
+
+    private String testClassName;
+    private String storageDirPath;
+
+    @BeforeEach
+    @Override
+    public void setUp(TestInfo testInfo) {
+        testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
+        storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath();
+        super.setUp(testInfo);
+    }
+
+    @Override
+    public Properties overridingProps() {
+        Properties props = super.overridingProps();
+        int numRemoteLogMetadataPartitions = 3;
+        return createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(),
+                numRemoteLogMetadataPartitions, props);
+    }
+
+    @Override
+    public Properties topicConfig() {
+        boolean enableRemoteStorage = true;
+        int maxBatchCountPerSegment = 1;
+        Properties overridingTopicProps = super.topicConfig();
+        overridingTopicProps.putAll(createTopicConfigForRemoteStorage(
+                enableRemoteStorage, maxBatchCountPerSegment));
+        return overridingTopicProps;
+    }
+
+    @SuppressWarnings("deprecation")
+    public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.immutable.Seq<TopicPartition> topicPartitions) {
+        JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> {
+            List<BrokerLocalStorage> localStorages = JavaConverters.bufferAsJavaList(brokers()).stream()
+                    .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+                    .collect(Collectors.toList());
+            localStorages
+                    .stream()
+                    // Select brokers which are assigned a replica of the topic-partition
+                    .filter(s -> isAssignedReplica(topicPartition, s.getBrokerId()))
+                    // Filter out inactive brokers, which may still contain log segments we would expect
+                    // to be deleted based on the retention configuration.
+                    .filter(s -> isAlive(s.getBrokerId()))
+                    .forEach(localStorage ->
+                            // Wait until the brokers local storage have been cleared from the inactive log segments.
+                            localStorage.waitForAtLeastEarliestLocalOffset(topicPartition, 1L));
+        });
+    }
+
+    @SuppressWarnings("deprecation")
+    private boolean isAssignedReplica(TopicPartition topicPartition,
+                                      Integer replicaId) {
+        Optional<KafkaBroker> brokerOpt = JavaConverters.seqAsJavaList(brokers())
+                .stream()
+                .filter(b -> b.config().brokerId() == replicaId).findFirst();
+        boolean isAssigned = false;
+        if (brokerOpt.isPresent()) {
+            HostedPartition hostedPartition = brokerOpt.get().replicaManager().getPartition(topicPartition);
+            if (hostedPartition instanceof HostedPartition.Online) {
+                isAssigned = true;
+            }
+        }
+        return isAssigned;
+    }
+
+    private boolean isAlive(Integer brokerId) {
+        return aliveBrokers().exists(b -> b.config().brokerId() == brokerId);
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/ActionUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/ActionUtils.java
deleted file mode 100644
index f410c8fbe82..00000000000
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/ActionUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.utils;
-
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.tiered.storage.TieredStorageTestContext;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-public class ActionUtils {
-
-    public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
-            throws ExecutionException, InterruptedException {
-        return describeTopics(context, Collections.singletonList(topic)).get(topic);
-    }
-
-    public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
-                                                                List<String> topics)
-            throws ExecutionException, InterruptedException {
-        return context.admin()
-                .describeTopics(topics)
-                .allTopicNames()
-                .get();
-    }
-
-    /**
-     * Get the records found in the local tiered storage.
-     * Snapshot does not sort the filesets by base offset.
-     * @param context The test context.
-     * @param topicPartition The topic-partition of the records.
-     * @return The records found in the local tiered storage.
-     */
-    public static List<Record> tieredStorageRecords(TieredStorageTestContext context,
-                                                    TopicPartition topicPartition) {
-        return context.takeTieredStorageSnapshot()
-                .getFilesets(topicPartition)
-                .stream()
-                .map(fileset -> {
-                    try {
-                        return fileset.getRecords();
-                    } catch (IOException e) {
-                        throw new RuntimeException(e);
-                    }
-                })
-                .sorted(Comparator.comparingLong(records -> records.get(0).offset()))
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList());
-    }
-}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
new file mode 100644
index 00000000000..f3fb081dd4d
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.utils;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+public class TieredStorageTestUtils {
+
+    /**
+     * InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka.
+     * Hence, we need to wait at least that amount of time before segments eligible for deletion
+     * gets physically removed.
+     */
+    public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+    // The default value of log cleanup interval is 30 secs, and it increases the test execution time.
+    private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
+    private static final Integer RLM_TASK_INTERVAL_MS = 500;
+    private static final Integer RLMM_INIT_RETRY_INTERVAL_MS = 300;
+
+    public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
+            throws ExecutionException, InterruptedException {
+        return describeTopics(context, Collections.singletonList(topic)).get(topic);
+    }
+
+    public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
+                                                                List<String> topics)
+            throws ExecutionException, InterruptedException {
+        return context.admin()
+                .describeTopics(topics)
+                .allTopicNames()
+                .get();
+    }
+
+    /**
+     * Get the records found in the local tiered storage.
+     * Snapshot does not sort the filesets by base offset.
+     * @param context The test context.
+     * @param topicPartition The topic-partition of the records.
+     * @return The records found in the local tiered storage.
+     */
+    public static List<Record> tieredStorageRecords(TieredStorageTestContext context,
+                                                    TopicPartition topicPartition) {
+        return context.takeTieredStorageSnapshot()
+                .getFilesets(topicPartition)
+                .stream()
+                .map(fileset -> {
+                    try {
+                        return fileset.getRecords();
+                    } catch (IOException e) {
+                        throw new RuntimeException(e);
+                    }
+                })
+                .sorted(Comparator.comparingLong(records -> records.get(0).offset()))
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
+    public static Properties createPropsForRemoteStorage(String testClassName,
+                                                         String storageDirPath,
+                                                         int brokerCount,
+                                                         int numRemoteLogMetadataPartitions,
+                                                         Properties overridingProps) {
+        Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
+                "STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS");
+
+        // Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background
+        // activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test.
+        //
+        // The replication factor of the remote log metadata topic needs to be chosen so that in resiliency
+        // tests, metadata can survive the loss of one replica for its topic-partitions.
+        //
+        // The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred
+        // data files on the local file system.
+        overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
+        overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+                TopicBasedRemoteLogMetadataManager.class.getName());
+        overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
+        overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
+
+        overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(testClassName, ""));
+        overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(testClassName, ""));
+
+        overridingProps.setProperty(
+                metadataConfigPrefix(testClassName, TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
+                String.valueOf(numRemoteLogMetadataPartitions));
+        overridingProps.setProperty(
+                metadataConfigPrefix(testClassName, TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
+                String.valueOf(brokerCount));
+        // This configuration ensures inactive log segments are deleted fast enough so that
+        // the integration tests can confirm a given log segment is present only in the second-tier storage.
+        // Note that this does not impact the eligibility of a log segment to be offloaded to the
+        // second-tier storage.
+        overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), LOG_CLEANUP_INTERVAL_MS.toString());
+        // The directory of the second-tier storage needs to be constant across all instances of storage managers
+        // in every broker and throughout the test. Indeed, as brokers are restarted during the test.
+        // You can override this property with a fixed path of your choice if you wish to use a non-temporary
+        // directory to access its content after a test terminated.
+        overridingProps.setProperty(storageConfigPrefix(testClassName, STORAGE_DIR_CONFIG), storageDirPath);
+        // This configuration will remove all the remote files when close is called in remote storage manager.
+        // Storage manager close is being called while the server is actively processing the socket requests,
+        // so enabling this config can break the existing tests.
+        // NOTE: When using TestUtils#tempDir(), the folder gets deleted when VM terminates.
+        overridingProps.setProperty(storageConfigPrefix(testClassName, DELETE_ON_CLOSE_CONFIG), "false");
+        // Set a small number of retry interval for retrying RemoteLogMetadataManager resources initialization to speed up the test
+        overridingProps.setProperty(metadataConfigPrefix(testClassName, REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP), RLMM_INIT_RETRY_INTERVAL_MS.toString());
+        return overridingProps;
+    }
+
+    public static Map<String, String> createTopicConfigForRemoteStorage(boolean enableRemoteStorage,
+            int maxRecordBatchPerSegment) {
+        Map<String, String> topicProps = new HashMap<>();
+        // Enables remote log storage for this topic.
+        topicProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(enableRemoteStorage));
+        // Ensure offset and time indexes are generated for every record.
+        topicProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+        // Leverage the use of the segment index size to create a log-segment accepting one and only one record.
+        // The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the
+        // time index. Hence, since the topic is configured to generate index entries for every record with, for
+        // a "small" number of records (i.e. such that the average record size times the number of records is
+        // much less than the segment size), the number of records which hold in a segment is the multiple of 12
+        // defined below.
+        topicProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * maxRecordBatchPerSegment));
+        // To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
+        // want to delete log segments as soon as possible. When tiered storage is active, an inactive log
+        // segment is not eligible for deletion until it has been offloaded, which guarantees all segments
+        // should be offloaded before deletion, and their consumption is possible thereafter.
+        topicProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+        return topicProps;
+    }
+
+    private static String storageConfigPrefix(String testClassName, String key) {
+        return "rsm.config." + testClassName + "." + key;
+    }
+
+    private static String metadataConfigPrefix(String testClassName, String key) {
+        return "rlmm.config." + testClassName + "." + key;
+    }
+}