You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/06 00:53:45 UTC
[kafka] branch 3.6 updated: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4) (#14329)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.6 by this push:
new 2be8b153230 KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4) (#14329)
2be8b153230 is described below
commit 2be8b153230450e34e34ce4f55598acb6385a2d2
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Wed Sep 6 05:50:12 2023 +0530
KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4) (#14329)
Added delete topic integration tests for tiered storage enabled topics with LocalTieredStorage and TBRLMM
Reviewers: Satish Duggana <sa...@apache.org>, Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>
---
.../java/kafka/log/remote/RemoteLogManager.java | 68 +++++++++++--------
.../main/scala/kafka/server/ReplicaManager.scala | 14 +---
.../kafka/log/remote/RemoteLogManagerTest.java | 32 ++++++---
.../unit/kafka/server/ReplicaManagerTest.scala | 39 ++++++-----
.../remote/metadata/storage/ConsumerManager.java | 27 +++-----
.../log/remote/metadata/storage/ConsumerTask.java | 5 +-
.../TopicBasedRemoteLogMetadataManager.java | 26 +++++---
...ogMetadataManagerMultipleSubscriptionsTest.java | 4 +-
.../storage/integration/DeleteTopicTest.java | 77 ++++++++++++++++++++++
9 files changed, 197 insertions(+), 95 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 8296acbc8ad..f4b20014b49 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -23,6 +23,7 @@ import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
+import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@@ -349,41 +350,49 @@ public class RemoteLogManager implements Closeable {
}
/**
- * Deletes the internal topic partition info if delete flag is set as true.
+ * Stop the remote-log-manager task for the given partitions. And, calls the
+ * {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog()} is true.
+ * Deletes the partitions from the remote storage when {@link StopPartition#deleteRemoteLog()} is true.
*
- * @param topicPartitions topic partitions that needs to be stopped.
- * @param delete flag to indicate whether the given topic partitions to be deleted or not.
+ * @param stopPartitions topic partitions that needs to be stopped.
* @param errorHandler callback to handle any errors while stopping the partitions.
*/
- public void stopPartitions(Set<TopicPartition> topicPartitions,
- boolean delete,
+ public void stopPartitions(Set<StopPartition> stopPartitions,
BiConsumer<TopicPartition, Throwable> errorHandler) {
- LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete);
- Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream()
- .filter(topicIdByPartitionMap::containsKey)
- .map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp))
- .collect(Collectors.toSet());
-
- topicIdPartitions.forEach(tpId -> {
+ LOGGER.debug("Stop partitions: {}", stopPartitions);
+ for (StopPartition stopPartition: stopPartitions) {
+ TopicPartition tp = stopPartition.topicPartition();
try {
- RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
- if (task != null) {
- LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
- task.cancel();
- }
- if (delete) {
- LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
- deleteRemoteLogPartition(tpId);
+ // We are assuming that if the topic exists in topicIdByPartitionMap then it has active archival
+ // otherwise not. Ideally, `stopPartitions` should not be called for internal and non-tiered-storage
+ // topics. See KAFKA-15432 for more details.
+ if (topicIdByPartitionMap.containsKey(tp)) {
+ TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+ RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+ if (task != null) {
+ LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+ task.cancel();
+ }
+ if (stopPartition.deleteRemoteLog()) {
+ LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
+ deleteRemoteLogPartition(tpId);
+ }
}
} catch (Exception ex) {
- errorHandler.accept(tpId.topicPartition(), ex);
- LOGGER.error("Error while stopping the partition: {}, delete: {}", tpId.topicPartition(), delete, ex);
+ errorHandler.accept(tp, ex);
+ LOGGER.error("Error while stopping the partition: {}", stopPartition, ex);
}
- });
- remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
- if (delete) {
- // NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted
- topicPartitions.forEach(topicIdByPartitionMap::remove);
+ }
+ // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around.
+ Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
+ .filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition()))
+ .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition()))
+ .collect(Collectors.toSet());
+ if (!deleteLocalPartitions.isEmpty()) {
+ // NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and
+ // ReplicaDeletionStarted
+ remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
+ deleteLocalPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition()));
}
}
@@ -1455,6 +1464,11 @@ public class RemoteLogManager implements Closeable {
LOGGER.info("Shutting down of thread pool {} is completed", poolName);
}
+ //Visible for testing
+ RLMTaskWithFuture task(TopicIdPartition partition) {
+ return leaderOrFollowerTasks.get(partition);
+ }
+
static class RLMScheduledThreadPool {
private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 52a3cc0341b..948bd30d747 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -558,7 +558,6 @@ class ReplicaManager(val config: KafkaConfig,
// Second remove deleted partitions from the partition map. Fetchers rely on the
// ReplicaManager to get Partition's information so they must be stopped first.
val partitionsToDelete = mutable.Set.empty[TopicPartition]
- val remotePartitionsToDelete = mutable.Set.empty[TopicPartition]
partitionsToStop.foreach { stopPartition =>
val topicPartition = stopPartition.topicPartition
if (stopPartition.deleteLocalLog) {
@@ -575,9 +574,6 @@ class ReplicaManager(val config: KafkaConfig,
}
partitionsToDelete += topicPartition
}
- if (stopPartition.deleteRemoteLog)
- remotePartitionsToDelete += topicPartition
-
// If we were the leader, we may have some operations still waiting for completion.
// We force completion to prevent them from timing out.
completeDelayedFetchOrProduceRequests(topicPartition)
@@ -591,13 +587,9 @@ class ReplicaManager(val config: KafkaConfig,
}
remoteLogManager.foreach { rlm =>
// exclude the partitions with offline/error state
- errorMap.keySet.foreach(remotePartitionsToDelete.remove)
- if (remotePartitionsToDelete.nonEmpty) {
- rlm.stopPartitions(remotePartitionsToDelete.asJava, true, (tp, e) => errorMap.put(tp, e))
- }
- val remotePartitionsToNotDelete = partitions.diff(remotePartitionsToDelete)
- if (remotePartitionsToNotDelete.nonEmpty) {
- rlm.stopPartitions(remotePartitionsToNotDelete.asJava, false, (tp, e) => errorMap.put(tp, e))
+ val partitions = partitionsToStop.filterNot(sp => errorMap.contains(sp.topicPartition)).toSet.asJava
+ if (!partitions.isEmpty) {
+ rlm.stopPartitions(partitions, (tp, e) => errorMap.put(tp, e))
}
}
errorMap
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 53456e58e70..682b58b9725 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -23,6 +23,7 @@ import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
+import kafka.server.StopPartition;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
@@ -107,6 +108,8 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -896,12 +899,12 @@ public class RemoteLogManagerTest {
verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
// Evicts from topicId cache
- remoteLogManager.stopPartitions(Collections.singleton(leaderTopicIdPartition.topicPartition()), true, (tp, ex) -> { });
+ remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition);
verifyInCache(followerTopicIdPartition);
// Evicts from topicId cache
- remoteLogManager.stopPartitions(Collections.singleton(followerTopicIdPartition.topicPartition()), true, (tp, ex) -> { });
+ remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { });
verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
}
@@ -1351,13 +1354,17 @@ public class RemoteLogManagerTest {
@Test
public void testStopPartitionsWithoutDeletion() throws RemoteStorageException {
BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> fail("shouldn't be called");
- Set<TopicPartition> partitions = new HashSet<>();
- partitions.add(leaderTopicIdPartition.topicPartition());
- partitions.add(followerTopicIdPartition.topicPartition());
+ Set<StopPartition> partitions = new HashSet<>();
+ partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false));
+ partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+ assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
+ assertNotNull(remoteLogManager.task(followerTopicIdPartition));
- remoteLogManager.stopPartitions(partitions, false, errorHandler);
+ remoteLogManager.stopPartitions(partitions, errorHandler);
+ assertNull(remoteLogManager.task(leaderTopicIdPartition));
+ assertNull(remoteLogManager.task(followerTopicIdPartition));
verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
verify(remoteLogMetadataManager, times(0)).updateRemoteLogSegmentMetadata(any());
@@ -1367,11 +1374,14 @@ public class RemoteLogManagerTest {
public void testStopPartitionsWithDeletion() throws RemoteStorageException {
BiConsumer<TopicPartition, Throwable> errorHandler =
(topicPartition, ex) -> fail("shouldn't be called: " + ex);
- Set<TopicPartition> partitions = new HashSet<>();
- partitions.add(leaderTopicIdPartition.topicPartition());
- partitions.add(followerTopicIdPartition.topicPartition());
+ Set<StopPartition> partitions = new HashSet<>();
+ partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true));
+ partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true));
remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+ assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
+ assertNotNull(remoteLogManager.task(followerTopicIdPartition));
+
when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
.thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024).iterator());
when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
@@ -1381,7 +1391,9 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
.thenReturn(dummyFuture);
- remoteLogManager.stopPartitions(partitions, true, errorHandler);
+ remoteLogManager.stopPartitions(partitions, errorHandler);
+ assertNull(remoteLogManager.task(leaderTopicIdPartition));
+ assertNull(remoteLogManager.task(followerTopicIdPartition));
verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
verify(remoteLogMetadataManager, times(16)).updateRemoteLogSegmentMetadata(any());
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 783c10b3b44..b4bc4f540ad 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3386,7 +3386,7 @@ class ReplicaManagerTest {
val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
if (enableRemoteStorage) {
- verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), any())
+ verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -3424,7 +3424,7 @@ class ReplicaManagerTest {
assertEquals(Errors.NONE, error)
assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
if (enableRemoteStorage) {
- verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), any())
+ verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -3487,8 +3487,12 @@ class ReplicaManagerTest {
assertTrue(replicaManager.logManager.getLog(tp0).isDefined)
}
if (enableRemoteStorage) {
- verify(mockRemoteLogManager, times(1))
- .stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)), ArgumentMatchers.eq(false), any())
+ if (throwIOException) {
+ verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
+ } else {
+ verify(mockRemoteLogManager, times(1))
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(tp0, deleteLocalLog = deletePartitions))), any())
+ }
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -3871,14 +3875,17 @@ class ReplicaManagerTest {
assertFalse(readRecoveryPointCheckpoint().contains(tp0))
assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
if (enableRemoteStorage) {
- verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
- ArgumentMatchers.eq(leaderEpoch == LeaderAndIsr.EpochDuringDelete), any())
+ val stopPartition = StopPartition(tp0,
+ deleteLocalLog = deletePartition,
+ deleteRemoteLog = leaderEpoch == LeaderAndIsr.EpochDuringDelete)
+ verify(mockRemoteLogManager)
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(stopPartition)), any())
}
}
if (expectedOutput == Errors.NONE && !deletePartition && enableRemoteStorage) {
- verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
- ArgumentMatchers.eq(false), any())
+ verify(mockRemoteLogManager)
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(tp0, deleteLocalLog = false))), any())
}
} finally {
replicaManager.shutdown(checkpointHW = false)
@@ -4528,8 +4535,8 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
- verify(mockRemoteLogManager, times(1)).stopPartitions(
- ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
+ verify(mockRemoteLogManager, times(1))
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
}
// Check that the partition was removed
@@ -4575,8 +4582,8 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
- verify(mockRemoteLogManager, times(1)).stopPartitions(
- ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
+ verify(mockRemoteLogManager, times(1))
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
}
// Check that the partition was removed
@@ -4622,8 +4629,8 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
- verify(mockRemoteLogManager, times(1)).stopPartitions(
- ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
+ verify(mockRemoteLogManager, times(1))
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
}
// Check that the partition was removed
@@ -4669,8 +4676,8 @@ class ReplicaManagerTest {
if (enableRemoteStorage) {
verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
- verify(mockRemoteLogManager, times(1)).stopPartitions(
- ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(true), any())
+ verify(mockRemoteLogManager, times(1))
+ .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true, deleteRemoteLog = true))), any())
}
// Check that the partition was removed
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 186cbb17c56..ed310f45149 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -98,35 +98,28 @@ public class ConsumerManager implements Closeable {
*/
public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
long timeoutMs) throws TimeoutException {
- final int partition = recordMetadata.partition();
- final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
-
- log.info("Waiting until consumer is caught up with the target partition: [{}]", partition);
-
+ int partition = recordMetadata.partition();
// If the current assignment does not have the subscription for this partition then return immediately.
if (!consumerTask.isMetadataPartitionAssigned(partition)) {
- throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " +
- "Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned());
+ throw new KafkaException("This consumer is not assigned to the target partition " + partition +
+ ". Currently assigned partitions: " + consumerTask.metadataPartitionsAssigned());
}
-
- final long offset = recordMetadata.offset();
+ long offset = recordMetadata.offset();
long startTimeMs = time.milliseconds();
+ long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
+ log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset);
while (true) {
- log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset);
long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
if (readOffset >= offset) {
return;
}
-
- log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}], Sleeping for [{}] to retry again",
- offset, partition, readOffset, consumeCheckIntervalMs);
-
+ log.debug("Expected offset for partition {} is {}, but the read offset is {}. " +
+ "Sleeping for {} ms to retry again", partition, offset, readOffset, consumeCheckIntervalMs);
if (time.milliseconds() - startTimeMs > timeoutMs) {
- log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ",
- partition, readOffset, offset);
+ log.warn("Expected offset for partition {} is {}, but the read offset is {}",
+ partition, offset, readOffset);
throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
}
-
time.sleep(consumeCheckIntervalMs);
}
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index b53c4ee3374..f36909b66b6 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -157,9 +157,10 @@ class ConsumerTask implements Runnable, Closeable {
remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset());
} else {
- log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata);
+ log.trace("The event {} is skipped because it is either already processed or not assigned to this consumer",
+ remoteLogMetadata);
}
- log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition());
+ log.trace("Updating consumed offset: {} for partition {}", record.offset(), record.partition());
readOffsetsByMetadataPartition.put(record.partition(), record.offset());
}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index e1bf145bbd8..0d1106db05f 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.server.log.remote.metadata.storage;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
@@ -175,12 +176,10 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition,
RemoteLogMetadata remoteLogMetadata)
throws RemoteStorageException {
- log.debug("Storing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata);
-
+ log.debug("Storing the partition: {} metadata: {}", topicIdPartition, remoteLogMetadata);
try {
// Publish the message to the metadata topic.
CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata);
-
// Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset.
return produceFuture.thenAcceptAsync(recordMetadata -> {
try {
@@ -377,14 +376,16 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest();
boolean topicCreated = false;
long startTimeMs = time.milliseconds();
- try (AdminClient adminClient = AdminClient.create(rlmmConfig.commonProperties())) {
+ Admin adminClient = null;
+ try {
+ adminClient = AdminClient.create(rlmmConfig.commonProperties());
// Stop if it is already initialized or closing.
while (!(initialized.get() || closing.get())) {
// If it is timed out then raise an error to exit.
if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) {
- log.error("Timed out in initializing the resources, retried to initialize the resource for [{}] ms.",
- rlmmConfig.initializationRetryMaxTimeoutMs());
+ log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.",
+ rlmmConfig.initializationRetryMaxTimeoutMs());
initializationFailed = true;
return;
}
@@ -395,7 +396,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
if (!topicCreated) {
// Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again.
- log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
+ log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue;
} else {
@@ -407,7 +408,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
initializationFailed = true;
}
} catch (Exception e) {
- log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
+ log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
continue;
}
@@ -438,10 +439,15 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
lock.writeLock().unlock();
}
}
+ } catch (Exception e) {
+ log.error("Encountered error while initializing topic-based RLMM resources", e);
+ initializationFailed = true;
+ } finally {
+ Utils.closeQuietly(adminClient, "AdminClient");
}
}
- private boolean isPartitionsCountSameAsConfigured(AdminClient adminClient,
+ private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
String topicName) throws InterruptedException, ExecutionException {
log.debug("Getting topic details to check for partition count and replication factor.");
TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName))
@@ -471,7 +477,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
* @param topic topic to be created.
* @return Returns true if the topic already exists, or it is created successfully.
*/
- private boolean createTopic(AdminClient adminClient, NewTopic topic) {
+ private boolean createTopic(Admin adminClient, NewTopic topic) {
boolean topicCreated = false;
try {
adminClient.createTopics(Collections.singleton(topic)).all().get();
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index 3386b94f895..e30e7c410f3 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -134,14 +134,14 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
- Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []",
+ Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());
RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
0, 100, -1L, 0,
time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
- Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []",
+ Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
exception.getMessage());
// `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteTopicTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteTopicTest.java
new file mode 100644
index 00000000000..34c5b7a42dc
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteTopicTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public final class DeleteTopicTest extends TieredStorageTestHarness {
+
+ @Override
+ public int brokerCount() {
+ return 2;
+ }
+
+ @Override
+ protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+ final Integer broker0 = 0;
+ final Integer broker1 = 1;
+ final String topicA = "topicA";
+ final Integer p0 = 0;
+ final Integer p1 = 1;
+ final Integer partitionCount = 2;
+ final Integer replicationFactor = 2;
+ final Integer maxBatchCountPerSegment = 1;
+ final boolean enableRemoteLogStorage = true;
+ final Map<Integer, List<Integer>> assignment = mkMap(
+ mkEntry(p0, Arrays.asList(broker0, broker1)),
+ mkEntry(p1, Arrays.asList(broker1, broker0))
+ );
+
+ builder
+ .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
+ assignment, enableRemoteLogStorage)
+ // send records to partition 0
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+ .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // send records to partition 1
+ .expectSegmentToBeOffloaded(broker1, topicA, p1, 0, new KeyValueSpec("k0", "v0"))
+ .expectSegmentToBeOffloaded(broker1, topicA, p1, 1, new KeyValueSpec("k1", "v1"))
+ .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+ .produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+ new KeyValueSpec("k2", "v2"))
+ // delete the topic
+ .expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 2)
+ .expectDeletionInRemoteStorage(broker1, topicA, p1, DELETE_SEGMENT, 2)
+ .deleteTopic(Collections.singletonList(topicA))
+ .expectEmptyRemoteStorage(topicA, p0)
+ .expectEmptyRemoteStorage(topicA, p1);
+ }
+}