You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2023/09/14 01:53:01 UTC
[kafka] branch 3.6 updated: KAFKA-15439: Transactions test with tiered storage (#14347)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 2508e306709 KAFKA-15439: Transactions test with tiered storage (#14347)
2508e306709 is described below
commit 2508e306709943ba333b58b8913e1951f2df29f5
Author: Kamal Chandraprakash <ka...@gmail.com>
AuthorDate: Thu Sep 14 07:22:13 2023 +0530
KAFKA-15439: Transactions test with tiered storage (#14347)
This test extends the existing TransactionsTest. It configures the broker and topic with tiered storage and expects at-least one log segment to be uploaded to the remote storage.
Reviewers: Luke Chen <sh...@gmail.com>, Satish Duggana <sa...@apache.org>, Divij Vaidya <di...@amazon.com>
---
.../integration/kafka/api/TransactionsTest.scala | 83 ++++++---
.../tiered/storage/TieredStorageTestHarness.java | 89 +---------
.../tiered/storage/actions/ConsumeAction.java | 2 +-
.../tiered/storage/actions/CreateTopicAction.java | 28 ++--
.../storage/actions/ExpectBrokerInISRAction.java | 2 +-
.../tiered/storage/actions/ExpectLeaderAction.java | 2 +-
.../ExpectTopicIdToMatchInRemoteStorageAction.java | 2 +-
...tUserTopicMappedToMetadataPartitionsAction.java | 2 +-
.../tiered/storage/actions/ProduceAction.java | 2 +-
.../storage/actions/ReassignReplicaAction.java | 2 +-
.../storage/actions/ShrinkReplicaAction.java | 2 +-
.../TransactionsWithTieredStoreTest.java | 108 ++++++++++++
.../kafka/tiered/storage/utils/ActionUtils.java | 72 --------
.../storage/utils/TieredStorageTestUtils.java | 185 +++++++++++++++++++++
14 files changed, 377 insertions(+), 204 deletions(-)
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 8d849136dd3..5936414958c 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -35,8 +35,9 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
+import scala.collection.Seq
import scala.jdk.CollectionConverters._
-import scala.collection.mutable.Buffer
+import scala.collection.mutable.{Buffer, ListBuffer}
import scala.concurrent.ExecutionException
class TransactionsTest extends IntegrationTestHarness {
@@ -54,26 +55,43 @@ class TransactionsTest extends IntegrationTestHarness {
val transactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]()
val nonTransactionalConsumers = Buffer[Consumer[Array[Byte], Array[Byte]]]()
- serverConfig.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
- // Set a smaller value for the number of partitions for the __consumer_offsets topic
- // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
- serverConfig.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
- serverConfig.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
- serverConfig.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
- serverConfig.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
- serverConfig.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
- serverConfig.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
- serverConfig.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
- serverConfig.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
- serverConfig.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
+ def overridingProps(): Properties = {
+ val props = new Properties()
+ props.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+ // Set a smaller value for the number of partitions for the __consumer_offsets topic + // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+ props.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+ props.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
+ props.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
+ props.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
+ props.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+ props.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+ props.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+ props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+ props.put(KafkaConfig.TransactionsAbortTimedOutTransactionCleanupIntervalMsProp, "200")
+ props
+
+ }
+
+ override protected def modifyConfigs(props: Seq[Properties]): Unit = {
+ props.foreach(p => p.putAll(overridingProps()))
+ }
+
+ override protected def kraftControllerConfigs(): Seq[Properties] = {
+ Seq(overridingProps())
+
+ }
+
+ def topicConfig(): Properties = {
+ val topicConfig = new Properties()
+ topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+ topicConfig
+ }
@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
super.setUp(testInfo)
- val topicConfig = new Properties()
- topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- createTopic(topic1, numPartitions, brokerCount, topicConfig)
- createTopic(topic2, numPartitions, brokerCount, topicConfig)
+ createTopic(topic1, numPartitions, brokerCount, topicConfig())
+ createTopic(topic2, numPartitions, brokerCount, topicConfig())
for (_ <- 0 until transactionalProducerCount)
createTransactionalProducer("transactional-producer")
@@ -97,20 +115,25 @@ class TransactionsTest extends IntegrationTestHarness {
val producer = transactionalProducers.head
val consumer = transactionalConsumers.head
val unCommittedConsumer = nonTransactionalConsumers.head
+ val tp11 = new TopicPartition(topic1, 1)
+ val tp22 = new TopicPartition(topic2, 2)
producer.initTransactions()
producer.beginTransaction()
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "2", willBeCommitted = false))
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "4", "4", willBeCommitted = false))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "2", "2", willBeCommitted = false))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "4", "4", willBeCommitted = false))
producer.flush()
+
producer.abortTransaction()
producer.beginTransaction()
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "1", willBeCommitted = true))
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "3", "3", willBeCommitted = true))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, 1, "1", "1", willBeCommitted = true))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, 2, "3", "3", willBeCommitted = true))
producer.commitTransaction()
+ maybeWaitForAtLeastOneSegmentUpload(tp11, tp22)
+
consumer.subscribe(List(topic1, topic2).asJava)
unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
@@ -199,6 +222,7 @@ class TransactionsTest extends IntegrationTestHarness {
def testDelayedFetchIncludesAbortedTransaction(quorum: String): Unit = {
val producer1 = transactionalProducers.head
val producer2 = createTransactionalProducer("other")
+ val tp10 = new TopicPartition(topic1, 0)
producer1.initTransactions()
producer2.initTransactions()
@@ -218,13 +242,15 @@ class TransactionsTest extends IntegrationTestHarness {
producer1.abortTransaction()
producer2.commitTransaction()
+ maybeWaitForAtLeastOneSegmentUpload(tp10)
+
// ensure that the consumer's fetch will sit in purgatory
val consumerProps = new Properties()
consumerProps.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "100000")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "100")
val readCommittedConsumer = createReadCommittedConsumer(props = consumerProps)
- readCommittedConsumer.assign(Set(new TopicPartition(topic1, 0)).asJava)
+ readCommittedConsumer.assign(Set(tp10).asJava)
val records = consumeRecords(readCommittedConsumer, numRecords = 2)
assertEquals(2, records.size)
@@ -309,6 +335,12 @@ class TransactionsTest extends IntegrationTestHarness {
consumer.close()
}
+ val partitions = ListBuffer.empty[TopicPartition]
+ for (partition <- 0 until numPartitions) {
+ partitions += new TopicPartition(topic2, partition)
+ }
+ maybeWaitForAtLeastOneSegmentUpload(partitions.toSeq: _*)
+
// In spite of random aborts, we should still have exactly 500 messages in topic2. I.e. we should not
// re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
val verifyingConsumer = transactionalConsumers(0)
@@ -592,10 +624,8 @@ class TransactionsTest extends IntegrationTestHarness {
val unCommittedConsumer = nonTransactionalConsumers.head
val topicWith10Partitions = "largeTopic"
val topicWith10PartitionsAndOneReplica = "largeTopicOneReplica"
- val topicConfig = new Properties()
- topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
- createTopic(topicWith10Partitions, 10, brokerCount, topicConfig)
+ createTopic(topicWith10Partitions, 10, brokerCount, topicConfig())
createTopic(topicWith10PartitionsAndOneReplica, 10, 1, new Properties())
firstProducer.initTransactions()
@@ -802,4 +832,7 @@ class TransactionsTest extends IntegrationTestHarness {
transactionalProducers += producer
producer
}
+
+ def maybeWaitForAtLeastOneSegmentUpload(topicPartitions: TopicPartition*): Unit = {
+ }
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
index 34ff17f9d57..f91a542768b 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/TieredStorageTestHarness.java
@@ -16,26 +16,25 @@
*/
package org.apache.kafka.tiered.storage;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.test.TestUtils;
-import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import kafka.api.IntegrationTestHarness;
import kafka.log.remote.RemoteLogManager;
import kafka.server.KafkaBroker;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.replica.ReplicaSelector;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
-import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
import org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import scala.collection.JavaConverters;
import scala.collection.Seq;
import java.util.ArrayList;
@@ -44,22 +43,10 @@ import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import scala.collection.JavaConverters;
-
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
-
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
-import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
-
-import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
-import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
/**
* Base class for integration tests exercising the tiered storage functionality in Apache Kafka.
@@ -69,16 +56,6 @@ import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STOR
@Tag("integration")
public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
- /**
- * InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka.
- * Hence, we need to wait at least that amount of time before segments eligible for deletion
- * gets physically removed.
- */
- private static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
- // The default value of log cleanup interval is 30 secs, and it increases the test execution time.
- private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
- private static final Integer RLM_TASK_INTERVAL_MS = 500;
-
private TieredStorageTestContext context;
private String testClassName = "";
private String storageDirPath = "";
@@ -102,52 +79,10 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
}
public Properties overridingProps() {
- Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
- "STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS");
-
- Properties overridingProps = new Properties();
- // Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background
- // activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test.
- //
- // The replication factor of the remote log metadata topic needs to be chosen so that in resiliency
- // tests, metadata can survive the loss of one replica for its topic-partitions.
- //
- // The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred
- // data files on the local file system.
- overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
- overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
- overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
- TopicBasedRemoteLogMetadataManager.class.getName());
- overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
- overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
-
- overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(""));
- overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(""));
-
- overridingProps.setProperty(
- metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
- String.valueOf(numRemoteLogMetadataPartitions()));
- overridingProps.setProperty(
- metadataConfigPrefix(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
- String.valueOf(brokerCount()));
- // This configuration ensures inactive log segments are deleted fast enough so that
- // the integration tests can confirm a given log segment is present only in the second-tier storage.
- // Note that this does not impact the eligibility of a log segment to be offloaded to the
- // second-tier storage.
- overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), LOG_CLEANUP_INTERVAL_MS.toString());
- // This can be customized to read remote log segments from followers.
+ Properties overridingProps = createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(),
+ numRemoteLogMetadataPartitions(), new Properties());
readReplicaSelectorClass()
.ifPresent(c -> overridingProps.put(KafkaConfig.ReplicaSelectorClassProp(), c.getName()));
- // The directory of the second-tier storage needs to be constant across all instances of storage managers
- // in every broker and throughout the test. Indeed, as brokers are restarted during the test.
- // You can override this property with a fixed path of your choice if you wish to use a non-temporary
- // directory to access its content after a test terminated.
- overridingProps.setProperty(storageConfigPrefix(STORAGE_DIR_CONFIG), storageDirPath);
- // This configuration will remove all the remote files when close is called in remote storage manager.
- // Storage manager close is being called while the server is actively processing the socket requests,
- // so enabling this config can break the existing tests.
- // NOTE: When using TestUtils#tempDir(), the folder gets deleted when VM terminates.
- overridingProps.setProperty(storageConfigPrefix(DELETE_ON_CLOSE_CONFIG), "false");
return overridingProps;
}
@@ -193,14 +128,6 @@ public abstract class TieredStorageTestHarness extends IntegrationTestHarness {
}
}
- private String storageConfigPrefix(String key) {
- return "rsm.config." + testClassName + "." + key;
- }
-
- private String metadataConfigPrefix(String key) {
- return "rlmm.config." + testClassName + "." + key;
- }
-
@SuppressWarnings("deprecation")
public static List<LocalTieredStorage> remoteStorageManagers(Seq<KafkaBroker> brokers) {
List<LocalTieredStorage> storages = new ArrayList<>();
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
index 7980a7c5e07..0fd7ceed657 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ConsumeAction.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.FETCH_SEGMENT;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.tieredStorageRecords;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java
index 746b9610e4f..c9d9b1b8959 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/CreateTopicAction.java
@@ -19,11 +19,13 @@ package org.apache.kafka.tiered.storage.actions;
import org.apache.kafka.tiered.storage.TieredStorageTestAction;
import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import org.apache.kafka.tiered.storage.specs.TopicSpec;
-import org.apache.kafka.common.config.TopicConfig;
import java.io.PrintStream;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
+
public final class CreateTopicAction implements TieredStorageTestAction {
private final TopicSpec spec;
@@ -34,23 +36,13 @@ public final class CreateTopicAction implements TieredStorageTestAction {
@Override
public void doExecute(TieredStorageTestContext context) throws ExecutionException, InterruptedException {
- // Ensure offset and time indexes are generated for every record.
- spec.getProperties().put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
- // Leverage the use of the segment index size to create a log-segment accepting one and only one record.
- // The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the
- // time index. Hence, since the topic is configured to generate index entries for every record with, for
- // a "small" number of records (i.e. such that the average record size times the number of records is
- // much less than the segment size), the number of records which hold in a segment is the multiple of 12
- // defined below.
- if (spec.getMaxBatchCountPerSegment() != -1) {
- spec.getProperties().put(
- TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * spec.getMaxBatchCountPerSegment()));
- }
- // To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
- // want to delete log segments as soon as possible. When tiered storage is active, an inactive log
- // segment is not eligible for deletion until it has been offloaded, which guarantees all segments
- // should be offloaded before deletion, and their consumption is possible thereafter.
- spec.getProperties().put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+ boolean enableRemoteStorage = true;
+ Map<String, String> topicConfigs = createTopicConfigForRemoteStorage(
+ enableRemoteStorage, spec.getMaxBatchCountPerSegment());
+ topicConfigs.putAll(spec.getProperties());
+
+ spec.getProperties().clear();
+ spec.getProperties().putAll(topicConfigs);
context.createTopic(spec);
}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java
index cabe4c2d7b0..0c4a8b926d2 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectBrokerInISRAction.java
@@ -25,7 +25,7 @@ import org.apache.kafka.tiered.storage.TieredStorageTestContext;
import java.io.PrintStream;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ExpectBrokerInISRAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
index 8f41f578f32..f81cfcb5cf8 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectLeaderAction.java
@@ -36,7 +36,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ExpectLeaderAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java
index 9f404f99500..1749ffc94d0 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectTopicIdToMatchInRemoteStorageAction.java
@@ -27,7 +27,7 @@ import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
import static org.junit.jupiter.api.Assertions.assertEquals;
public final class ExpectTopicIdToMatchInRemoteStorageAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java
index 72630fad3a0..ef7066ffe04 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ExpectUserTopicMappedToMetadataPartitionsAction.java
@@ -34,7 +34,7 @@ import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopics;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopics;
public final class ExpectUserTopicMappedToMetadataPartitionsAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
index 56287cebdcb..344d8c4afbd 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ProduceAction.java
@@ -39,7 +39,7 @@ import java.util.stream.Collectors;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageCondition.expectEvent;
import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.COPY_SEGMENT;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.tieredStorageRecords;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.tieredStorageRecords;
import static org.apache.kafka.tiered.storage.utils.RecordsKeyValueMatcher.correspondTo;
import static org.hamcrest.MatcherAssert.assertThat;
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java
index da989cbd6db..06d9013649b 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ReassignReplicaAction.java
@@ -33,7 +33,7 @@ import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ReassignReplicaAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java
index f15a0ae26ca..fc840d04853 100644
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/actions/ShrinkReplicaAction.java
@@ -33,7 +33,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import static org.apache.kafka.tiered.storage.utils.ActionUtils.describeTopic;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.describeTopic;
public final class ShrinkReplicaAction implements TieredStorageTestAction {
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
new file mode 100644
index 00000000000..61e68d65f83
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/TransactionsWithTieredStoreTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import kafka.api.TransactionsTest;
+import kafka.server.HostedPartition;
+import kafka.server.KafkaBroker;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.tiered.storage.utils.BrokerLocalStorage;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestInfo;
+import scala.collection.JavaConverters;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createPropsForRemoteStorage;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.createTopicConfigForRemoteStorage;
+import static org.apache.kafka.tiered.storage.utils.TieredStorageTestUtils.STORAGE_WAIT_TIMEOUT_SEC;
+
+public class TransactionsWithTieredStoreTest extends TransactionsTest {
+
+ private String testClassName;
+ private String storageDirPath;
+
+ @BeforeEach
+ @Override
+ public void setUp(TestInfo testInfo) {
+ testClassName = testInfo.getTestClass().get().getSimpleName().toLowerCase(Locale.getDefault());
+ storageDirPath = TestUtils.tempDirectory("kafka-remote-tier-" + testClassName).getAbsolutePath();
+ super.setUp(testInfo);
+ }
+
+ @Override
+ public Properties overridingProps() {
+ Properties props = super.overridingProps();
+ int numRemoteLogMetadataPartitions = 3;
+ return createPropsForRemoteStorage(testClassName, storageDirPath, brokerCount(),
+ numRemoteLogMetadataPartitions, props);
+ }
+
+ @Override
+ public Properties topicConfig() {
+ boolean enableRemoteStorage = true;
+ int maxBatchCountPerSegment = 1;
+ Properties overridingTopicProps = super.topicConfig();
+ overridingTopicProps.putAll(createTopicConfigForRemoteStorage(
+ enableRemoteStorage, maxBatchCountPerSegment));
+ return overridingTopicProps;
+ }
+
+ @SuppressWarnings("deprecation")
+ public void maybeWaitForAtLeastOneSegmentUpload(scala.collection.immutable.Seq<TopicPartition> topicPartitions) {
+ JavaConverters.seqAsJavaList(topicPartitions).forEach(topicPartition -> {
+ List<BrokerLocalStorage> localStorages = JavaConverters.bufferAsJavaList(brokers()).stream()
+ .map(b -> new BrokerLocalStorage(b.config().brokerId(), b.config().logDirs().head(), STORAGE_WAIT_TIMEOUT_SEC))
+ .collect(Collectors.toList());
+ localStorages
+ .stream()
+ // Select brokers which are assigned a replica of the topic-partition
+ .filter(s -> isAssignedReplica(topicPartition, s.getBrokerId()))
+ // Filter out inactive brokers, which may still contain log segments we would expect
+ // to be deleted based on the retention configuration.
+ .filter(s -> isAlive(s.getBrokerId()))
+ .forEach(localStorage ->
+ // Wait until the brokers local storage have been cleared from the inactive log segments.
+ localStorage.waitForAtLeastEarliestLocalOffset(topicPartition, 1L));
+ });
+ }
+
+ @SuppressWarnings("deprecation")
+ private boolean isAssignedReplica(TopicPartition topicPartition,
+ Integer replicaId) {
+ Optional<KafkaBroker> brokerOpt = JavaConverters.seqAsJavaList(brokers())
+ .stream()
+ .filter(b -> b.config().brokerId() == replicaId).findFirst();
+ boolean isAssigned = false;
+ if (brokerOpt.isPresent()) {
+ HostedPartition hostedPartition = brokerOpt.get().replicaManager().getPartition(topicPartition);
+ if (hostedPartition instanceof HostedPartition.Online) {
+ isAssigned = true;
+ }
+ }
+ return isAssigned;
+ }
+
+ private boolean isAlive(Integer brokerId) {
+ return aliveBrokers().exists(b -> b.config().brokerId() == brokerId);
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/ActionUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/ActionUtils.java
deleted file mode 100644
index f410c8fbe82..00000000000
--- a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/ActionUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.tiered.storage.utils;
-
-import org.apache.kafka.clients.admin.TopicDescription;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.record.Record;
-import org.apache.kafka.tiered.storage.TieredStorageTestContext;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-public class ActionUtils {
-
- public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
- throws ExecutionException, InterruptedException {
- return describeTopics(context, Collections.singletonList(topic)).get(topic);
- }
-
- public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
- List<String> topics)
- throws ExecutionException, InterruptedException {
- return context.admin()
- .describeTopics(topics)
- .allTopicNames()
- .get();
- }
-
- /**
- * Get the records found in the local tiered storage.
- * Snapshot does not sort the filesets by base offset.
- * @param context The test context.
- * @param topicPartition The topic-partition of the records.
- * @return The records found in the local tiered storage.
- */
- public static List<Record> tieredStorageRecords(TieredStorageTestContext context,
- TopicPartition topicPartition) {
- return context.takeTieredStorageSnapshot()
- .getFilesets(topicPartition)
- .stream()
- .map(fileset -> {
- try {
- return fileset.getRecords();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- })
- .sorted(Comparator.comparingLong(records -> records.get(0).offset()))
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- }
-}
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
new file mode 100644
index 00000000000..f3fb081dd4d
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/utils/TieredStorageTestUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.utils;
+
+import kafka.server.KafkaConfig;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig;
+import org.apache.kafka.server.log.remote.storage.LocalTieredStorage;
+import org.apache.kafka.tiered.storage.TieredStorageTestContext;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.DELETE_ON_CLOSE_CONFIG;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorage.STORAGE_DIR_CONFIG;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP;
+import static org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP;
+
+public class TieredStorageTestUtils {
+
+ /**
+ * InitialTaskDelayMs is set to 30 seconds for the delete-segment scheduler in Apache Kafka.
+ * Hence, we need to wait at least that amount of time before segments eligible for deletion
+ * gets physically removed.
+ */
+ public static final Integer STORAGE_WAIT_TIMEOUT_SEC = 35;
+ // The default value of log cleanup interval is 30 secs, and it increases the test execution time.
+ private static final Integer LOG_CLEANUP_INTERVAL_MS = 500;
+ private static final Integer RLM_TASK_INTERVAL_MS = 500;
+ private static final Integer RLMM_INIT_RETRY_INTERVAL_MS = 300;
+
+ public static TopicDescription describeTopic(TieredStorageTestContext context, String topic)
+ throws ExecutionException, InterruptedException {
+ return describeTopics(context, Collections.singletonList(topic)).get(topic);
+ }
+
+ public static Map<String, TopicDescription> describeTopics(TieredStorageTestContext context,
+ List<String> topics)
+ throws ExecutionException, InterruptedException {
+ return context.admin()
+ .describeTopics(topics)
+ .allTopicNames()
+ .get();
+ }
+
+ /**
+ * Get the records found in the local tiered storage.
+ * Snapshot does not sort the filesets by base offset.
+ * @param context The test context.
+ * @param topicPartition The topic-partition of the records.
+ * @return The records found in the local tiered storage.
+ */
+ public static List<Record> tieredStorageRecords(TieredStorageTestContext context,
+ TopicPartition topicPartition) {
+ return context.takeTieredStorageSnapshot()
+ .getFilesets(topicPartition)
+ .stream()
+ .map(fileset -> {
+ try {
+ return fileset.getRecords();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .sorted(Comparator.comparingLong(records -> records.get(0).offset()))
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+ }
+
+ public static Properties createPropsForRemoteStorage(String testClassName,
+ String storageDirPath,
+ int brokerCount,
+ int numRemoteLogMetadataPartitions,
+ Properties overridingProps) {
+ Assertions.assertTrue(STORAGE_WAIT_TIMEOUT_SEC > TimeUnit.MILLISECONDS.toSeconds(RLM_TASK_INTERVAL_MS),
+ "STORAGE_WAIT_TIMEOUT_SEC should be greater than RLM_TASK_INTERVAL_MS");
+
+ // Configure the tiered storage in Kafka. Set an interval of 1 second for the remote log manager background
+ // activity to ensure the tiered storage has enough room to be exercised within the lifetime of a test.
+ //
+ // The replication factor of the remote log metadata topic needs to be chosen so that in resiliency
+ // tests, metadata can survive the loss of one replica for its topic-partitions.
+ //
+ // The second-tier storage system is mocked via the LocalTieredStorage instance which persists transferred
+ // data files on the local file system.
+ overridingProps.setProperty(REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true");
+ overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, LocalTieredStorage.class.getName());
+ overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+ TopicBasedRemoteLogMetadataManager.class.getName());
+ overridingProps.setProperty(REMOTE_LOG_MANAGER_TASK_INTERVAL_MS_PROP, RLM_TASK_INTERVAL_MS.toString());
+ overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_LISTENER_NAME_PROP, "PLAINTEXT");
+
+ overridingProps.setProperty(REMOTE_STORAGE_MANAGER_CONFIG_PREFIX_PROP, storageConfigPrefix(testClassName, ""));
+ overridingProps.setProperty(REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, metadataConfigPrefix(testClassName, ""));
+
+ overridingProps.setProperty(
+ metadataConfigPrefix(testClassName, TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP),
+ String.valueOf(numRemoteLogMetadataPartitions));
+ overridingProps.setProperty(
+ metadataConfigPrefix(testClassName, TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP),
+ String.valueOf(brokerCount));
+ // This configuration ensures inactive log segments are deleted fast enough so that
+ // the integration tests can confirm a given log segment is present only in the second-tier storage.
+ // Note that this does not impact the eligibility of a log segment to be offloaded to the
+ // second-tier storage.
+ overridingProps.setProperty(KafkaConfig.LogCleanupIntervalMsProp(), LOG_CLEANUP_INTERVAL_MS.toString());
+ // The directory of the second-tier storage needs to be constant across all instances of storage managers
+ // in every broker and throughout the test. Indeed, as brokers are restarted during the test.
+ // You can override this property with a fixed path of your choice if you wish to use a non-temporary
+ // directory to access its content after a test terminated.
+ overridingProps.setProperty(storageConfigPrefix(testClassName, STORAGE_DIR_CONFIG), storageDirPath);
+ // This configuration will remove all the remote files when close is called in remote storage manager.
+ // Storage manager close is being called while the server is actively processing the socket requests,
+ // so enabling this config can break the existing tests.
+ // NOTE: When using TestUtils#tempDir(), the folder gets deleted when VM terminates.
+ overridingProps.setProperty(storageConfigPrefix(testClassName, DELETE_ON_CLOSE_CONFIG), "false");
+ // Set a small number of retry interval for retrying RemoteLogMetadataManager resources initialization to speed up the test
+ overridingProps.setProperty(metadataConfigPrefix(testClassName, REMOTE_LOG_METADATA_INITIALIZATION_RETRY_INTERVAL_MS_PROP), RLMM_INIT_RETRY_INTERVAL_MS.toString());
+ return overridingProps;
+ }
+
+ public static Map<String, String> createTopicConfigForRemoteStorage(boolean enableRemoteStorage,
+ int maxRecordBatchPerSegment) {
+ Map<String, String> topicProps = new HashMap<>();
+ // Enables remote log storage for this topic.
+ topicProps.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, String.valueOf(enableRemoteStorage));
+ // Ensure offset and time indexes are generated for every record.
+ topicProps.put(TopicConfig.INDEX_INTERVAL_BYTES_CONFIG, "1");
+ // Leverage the use of the segment index size to create a log-segment accepting one and only one record.
+ // The minimum size of the indexes is that of an entry, which is 8 for the offset index and 12 for the
+ // time index. Hence, since the topic is configured to generate index entries for every record with, for
+ // a "small" number of records (i.e. such that the average record size times the number of records is
+ // much less than the segment size), the number of records which hold in a segment is the multiple of 12
+ // defined below.
+ topicProps.put(TopicConfig.SEGMENT_INDEX_BYTES_CONFIG, String.valueOf(12 * maxRecordBatchPerSegment));
+ // To verify records physically absent from Kafka's storage can be consumed via the second tier storage, we
+ // want to delete log segments as soon as possible. When tiered storage is active, an inactive log
+ // segment is not eligible for deletion until it has been offloaded, which guarantees all segments
+ // should be offloaded before deletion, and their consumption is possible thereafter.
+ topicProps.put(TopicConfig.LOCAL_LOG_RETENTION_BYTES_CONFIG, "1");
+ return topicProps;
+ }
+
+ private static String storageConfigPrefix(String testClassName, String key) {
+ return "rsm.config." + testClassName + "." + key;
+ }
+
+ private static String metadataConfigPrefix(String testClassName, String key) {
+ return "rlmm.config." + testClassName + "." + key;
+ }
+}