You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/09/06 00:53:45 UTC

[kafka] branch 3.6 updated: KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4) (#14329)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch 3.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.6 by this push:
     new 2be8b153230 KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4) (#14329)
2be8b153230 is described below

commit 2be8b153230450e34e34ce4f55598acb6385a2d2
Author: Kamal Chandraprakash <kc...@uber.com>
AuthorDate: Wed Sep 6 05:50:12 2023 +0530

    KAFKA-15410: Delete topic integration test with LocalTieredStorage and TBRLMM (3/4) (#14329)
    
    Added delete topic integration tests for tiered storage enabled topics with LocalTieredStorage and TBRLMM
    
    Reviewers: Satish Duggana <sa...@apache.org>, Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>
---
 .../java/kafka/log/remote/RemoteLogManager.java    | 68 +++++++++++--------
 .../main/scala/kafka/server/ReplicaManager.scala   | 14 +---
 .../kafka/log/remote/RemoteLogManagerTest.java     | 32 ++++++---
 .../unit/kafka/server/ReplicaManagerTest.scala     | 39 ++++++-----
 .../remote/metadata/storage/ConsumerManager.java   | 27 +++-----
 .../log/remote/metadata/storage/ConsumerTask.java  |  5 +-
 .../TopicBasedRemoteLogMetadataManager.java        | 26 +++++---
 ...ogMetadataManagerMultipleSubscriptionsTest.java |  4 +-
 .../storage/integration/DeleteTopicTest.java       | 77 ++++++++++++++++++++++
 9 files changed, 197 insertions(+), 95 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 8296acbc8ad..f4b20014b49 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -23,6 +23,7 @@ import kafka.log.LogSegment;
 import kafka.log.UnifiedLog;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
+import kafka.server.StopPartition;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
@@ -349,41 +350,49 @@ public class RemoteLogManager implements Closeable {
     }
 
     /**
-     * Deletes the internal topic partition info if delete flag is set as true.
+     * Stop the remote-log-manager task for the given partitions. And, calls the
+     * {@link RemoteLogMetadataManager#onStopPartitions(Set)} when {@link StopPartition#deleteLocalLog()} is true.
+     * Deletes the partitions from the remote storage when {@link StopPartition#deleteRemoteLog()} is true.
      *
-     * @param topicPartitions topic partitions that needs to be stopped.
-     * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+     * @param stopPartitions topic partitions that needs to be stopped.
      * @param errorHandler   callback to handle any errors while stopping the partitions.
      */
-    public void stopPartitions(Set<TopicPartition> topicPartitions,
-                               boolean delete,
+    public void stopPartitions(Set<StopPartition> stopPartitions,
                                BiConsumer<TopicPartition, Throwable> errorHandler) {
-        LOGGER.debug("Stopping {} partitions, delete: {}", topicPartitions.size(), delete);
-        Set<TopicIdPartition> topicIdPartitions = topicPartitions.stream()
-                .filter(topicIdByPartitionMap::containsKey)
-                .map(tp -> new TopicIdPartition(topicIdByPartitionMap.get(tp), tp))
-                .collect(Collectors.toSet());
-
-        topicIdPartitions.forEach(tpId -> {
+        LOGGER.debug("Stop partitions: {}", stopPartitions);
+        for (StopPartition stopPartition: stopPartitions) {
+            TopicPartition tp = stopPartition.topicPartition();
             try {
-                RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
-                if (task != null) {
-                    LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
-                    task.cancel();
-                }
-                if (delete) {
-                    LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
-                    deleteRemoteLogPartition(tpId);
+                // We are assuming that if the topic exists in topicIdByPartitionMap then it has active archival
+                // otherwise not. Ideally, `stopPartitions` should not be called for internal and non-tiered-storage
+                // topics. See KAFKA-15432 for more details.
+                if (topicIdByPartitionMap.containsKey(tp)) {
+                    TopicIdPartition tpId = new TopicIdPartition(topicIdByPartitionMap.get(tp), tp);
+                    RLMTaskWithFuture task = leaderOrFollowerTasks.remove(tpId);
+                    if (task != null) {
+                        LOGGER.info("Cancelling the RLM task for tpId: {}", tpId);
+                        task.cancel();
+                    }
+                    if (stopPartition.deleteRemoteLog()) {
+                        LOGGER.info("Deleting the remote log segments task for partition: {}", tpId);
+                        deleteRemoteLogPartition(tpId);
+                    }
                 }
             } catch (Exception ex) {
-                errorHandler.accept(tpId.topicPartition(), ex);
-                LOGGER.error("Error while stopping the partition: {}, delete: {}", tpId.topicPartition(), delete, ex);
+                errorHandler.accept(tp, ex);
+                LOGGER.error("Error while stopping the partition: {}", stopPartition, ex);
             }
-        });
-        remoteLogMetadataManager.onStopPartitions(topicIdPartitions);
-        if (delete) {
-            // NOTE: this#stopPartitions method is called when Replica state changes to Offline and ReplicaDeletionStarted
-            topicPartitions.forEach(topicIdByPartitionMap::remove);
+        }
+        // Note `deleteLocalLog` will always be true when `deleteRemoteLog` is true but not the other way around.
+        Set<TopicIdPartition> deleteLocalPartitions = stopPartitions.stream()
+                .filter(sp -> sp.deleteLocalLog() && topicIdByPartitionMap.containsKey(sp.topicPartition()))
+                .map(sp -> new TopicIdPartition(topicIdByPartitionMap.get(sp.topicPartition()), sp.topicPartition()))
+                .collect(Collectors.toSet());
+        if (!deleteLocalPartitions.isEmpty()) {
+            // NOTE: In ZK mode, this#stopPartitions method is called when Replica state changes to Offline and
+            // ReplicaDeletionStarted
+            remoteLogMetadataManager.onStopPartitions(deleteLocalPartitions);
+            deleteLocalPartitions.forEach(tpId -> topicIdByPartitionMap.remove(tpId.topicPartition()));
         }
     }
 
@@ -1455,6 +1464,11 @@ public class RemoteLogManager implements Closeable {
         LOGGER.info("Shutting down of thread pool {} is completed", poolName);
     }
 
+    //Visible for testing
+    RLMTaskWithFuture task(TopicIdPartition partition) {
+        return leaderOrFollowerTasks.get(partition);
+    }
+
     static class RLMScheduledThreadPool {
 
         private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 52a3cc0341b..948bd30d747 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -558,7 +558,6 @@ class ReplicaManager(val config: KafkaConfig,
     // Second remove deleted partitions from the partition map. Fetchers rely on the
     // ReplicaManager to get Partition's information so they must be stopped first.
     val partitionsToDelete = mutable.Set.empty[TopicPartition]
-    val remotePartitionsToDelete = mutable.Set.empty[TopicPartition]
     partitionsToStop.foreach { stopPartition =>
       val topicPartition = stopPartition.topicPartition
       if (stopPartition.deleteLocalLog) {
@@ -575,9 +574,6 @@ class ReplicaManager(val config: KafkaConfig,
         }
         partitionsToDelete += topicPartition
       }
-      if (stopPartition.deleteRemoteLog)
-        remotePartitionsToDelete += topicPartition
-
       // If we were the leader, we may have some operations still waiting for completion.
       // We force completion to prevent them from timing out.
       completeDelayedFetchOrProduceRequests(topicPartition)
@@ -591,13 +587,9 @@ class ReplicaManager(val config: KafkaConfig,
     }
     remoteLogManager.foreach { rlm =>
       // exclude the partitions with offline/error state
-      errorMap.keySet.foreach(remotePartitionsToDelete.remove)
-      if (remotePartitionsToDelete.nonEmpty) {
-        rlm.stopPartitions(remotePartitionsToDelete.asJava, true, (tp, e) => errorMap.put(tp, e))
-      }
-      val remotePartitionsToNotDelete = partitions.diff(remotePartitionsToDelete)
-      if (remotePartitionsToNotDelete.nonEmpty) {
-        rlm.stopPartitions(remotePartitionsToNotDelete.asJava, false, (tp, e) => errorMap.put(tp, e))
+      val partitions = partitionsToStop.filterNot(sp => errorMap.contains(sp.topicPartition)).toSet.asJava
+      if (!partitions.isEmpty) {
+        rlm.stopPartitions(partitions, (tp, e) => errorMap.put(tp, e))
       }
     }
     errorMap
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index 53456e58e70..682b58b9725 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -23,6 +23,7 @@ import kafka.log.LogSegment;
 import kafka.log.UnifiedLog;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
+import kafka.server.StopPartition;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
@@ -107,6 +108,8 @@ import static org.apache.kafka.server.log.remote.storage.RemoteStorageMetrics.RE
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
@@ -896,12 +899,12 @@ public class RemoteLogManagerTest {
         verifyInCache(followerTopicIdPartition, leaderTopicIdPartition);
 
         // Evicts from topicId cache
-        remoteLogManager.stopPartitions(Collections.singleton(leaderTopicIdPartition.topicPartition()), true, (tp, ex) -> { });
+        remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { });
         verifyNotInCache(leaderTopicIdPartition);
         verifyInCache(followerTopicIdPartition);
 
         // Evicts from topicId cache
-        remoteLogManager.stopPartitions(Collections.singleton(followerTopicIdPartition.topicPartition()), true, (tp, ex) -> { });
+        remoteLogManager.stopPartitions(Collections.singleton(new StopPartition(followerTopicIdPartition.topicPartition(), true, true)), (tp, ex) -> { });
         verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition);
     }
 
@@ -1351,13 +1354,17 @@ public class RemoteLogManagerTest {
     @Test
     public void testStopPartitionsWithoutDeletion() throws RemoteStorageException {
         BiConsumer<TopicPartition, Throwable> errorHandler = (topicPartition, throwable) -> fail("shouldn't be called");
-        Set<TopicPartition> partitions = new HashSet<>();
-        partitions.add(leaderTopicIdPartition.topicPartition());
-        partitions.add(followerTopicIdPartition.topicPartition());
+        Set<StopPartition> partitions = new HashSet<>();
+        partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, false));
+        partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, false));
         remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
                 Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+        assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.task(followerTopicIdPartition));
 
-        remoteLogManager.stopPartitions(partitions, false, errorHandler);
+        remoteLogManager.stopPartitions(partitions, errorHandler);
+        assertNull(remoteLogManager.task(leaderTopicIdPartition));
+        assertNull(remoteLogManager.task(followerTopicIdPartition));
         verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
         verify(remoteStorageManager, times(0)).deleteLogSegmentData(any());
         verify(remoteLogMetadataManager, times(0)).updateRemoteLogSegmentMetadata(any());
@@ -1367,11 +1374,14 @@ public class RemoteLogManagerTest {
     public void testStopPartitionsWithDeletion() throws RemoteStorageException {
         BiConsumer<TopicPartition, Throwable> errorHandler =
                 (topicPartition, ex) -> fail("shouldn't be called: " + ex);
-        Set<TopicPartition> partitions = new HashSet<>();
-        partitions.add(leaderTopicIdPartition.topicPartition());
-        partitions.add(followerTopicIdPartition.topicPartition());
+        Set<StopPartition> partitions = new HashSet<>();
+        partitions.add(new StopPartition(leaderTopicIdPartition.topicPartition(), true, true));
+        partitions.add(new StopPartition(followerTopicIdPartition.topicPartition(), true, true));
         remoteLogManager.onLeadershipChange(Collections.singleton(mockPartition(leaderTopicIdPartition)),
                 Collections.singleton(mockPartition(followerTopicIdPartition)), topicIds);
+        assertNotNull(remoteLogManager.task(leaderTopicIdPartition));
+        assertNotNull(remoteLogManager.task(followerTopicIdPartition));
+
         when(remoteLogMetadataManager.listRemoteLogSegments(eq(leaderTopicIdPartition)))
                 .thenReturn(listRemoteLogSegmentMetadata(leaderTopicIdPartition, 5, 100, 1024).iterator());
         when(remoteLogMetadataManager.listRemoteLogSegments(eq(followerTopicIdPartition)))
@@ -1381,7 +1391,9 @@ public class RemoteLogManagerTest {
         when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any()))
                 .thenReturn(dummyFuture);
 
-        remoteLogManager.stopPartitions(partitions, true, errorHandler);
+        remoteLogManager.stopPartitions(partitions, errorHandler);
+        assertNull(remoteLogManager.task(leaderTopicIdPartition));
+        assertNull(remoteLogManager.task(followerTopicIdPartition));
         verify(remoteLogMetadataManager, times(1)).onStopPartitions(any());
         verify(remoteStorageManager, times(8)).deleteLogSegmentData(any());
         verify(remoteLogMetadataManager, times(16)).updateRemoteLogSegmentMetadata(any());
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 783c10b3b44..b4bc4f540ad 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3386,7 +3386,7 @@ class ReplicaManagerTest {
       val (_, error) = replicaManager.stopReplicas(1, 0, 0, partitionStates)
       assertEquals(Errors.STALE_CONTROLLER_EPOCH, error)
       if (enableRemoteStorage) {
-        verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), any())
+        verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -3424,7 +3424,7 @@ class ReplicaManagerTest {
       assertEquals(Errors.NONE, error)
       assertEquals(Map(tp0 -> Errors.KAFKA_STORAGE_ERROR), result)
       if (enableRemoteStorage) {
-        verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any(), any())
+        verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -3487,8 +3487,12 @@ class ReplicaManagerTest {
         assertTrue(replicaManager.logManager.getLog(tp0).isDefined)
       }
       if (enableRemoteStorage) {
-        verify(mockRemoteLogManager, times(1))
-          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)), ArgumentMatchers.eq(false), any())
+        if (throwIOException) {
+          verify(mockRemoteLogManager, times(0)).stopPartitions(any(), any())
+        } else {
+          verify(mockRemoteLogManager, times(1))
+            .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(tp0, deleteLocalLog = deletePartitions))), any())
+        }
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -3871,14 +3875,17 @@ class ReplicaManagerTest {
         assertFalse(readRecoveryPointCheckpoint().contains(tp0))
         assertFalse(readLogStartOffsetCheckpoint().contains(tp0))
         if (enableRemoteStorage) {
-          verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
-            ArgumentMatchers.eq(leaderEpoch == LeaderAndIsr.EpochDuringDelete), any())
+          val stopPartition = StopPartition(tp0,
+            deleteLocalLog = deletePartition,
+            deleteRemoteLog = leaderEpoch == LeaderAndIsr.EpochDuringDelete)
+          verify(mockRemoteLogManager)
+            .stopPartitions(ArgumentMatchers.eq(Collections.singleton(stopPartition)), any())
         }
       }
 
       if (expectedOutput == Errors.NONE && !deletePartition && enableRemoteStorage) {
-        verify(mockRemoteLogManager).stopPartitions(ArgumentMatchers.eq(Collections.singleton(tp0)),
-          ArgumentMatchers.eq(false), any())
+        verify(mockRemoteLogManager)
+          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(tp0, deleteLocalLog = false))), any())
       }
     } finally {
       replicaManager.shutdown(checkpointHW = false)
@@ -4528,8 +4535,8 @@ class ReplicaManagerTest {
 
       if (enableRemoteStorage) {
         verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
-        verify(mockRemoteLogManager, times(1)).stopPartitions(
-          ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
+        verify(mockRemoteLogManager, times(1))
+          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
       }
 
       // Check that the partition was removed
@@ -4575,8 +4582,8 @@ class ReplicaManagerTest {
 
       if (enableRemoteStorage) {
         verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
-        verify(mockRemoteLogManager, times(1)).stopPartitions(
-          ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
+        verify(mockRemoteLogManager, times(1))
+          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
       }
 
       // Check that the partition was removed
@@ -4622,8 +4629,8 @@ class ReplicaManagerTest {
 
       if (enableRemoteStorage) {
         verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
-        verify(mockRemoteLogManager, times(1)).stopPartitions(
-          ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(false), any())
+        verify(mockRemoteLogManager, times(1))
+          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true))), any())
       }
 
       // Check that the partition was removed
@@ -4669,8 +4676,8 @@ class ReplicaManagerTest {
 
       if (enableRemoteStorage) {
         verify(mockRemoteLogManager, never()).onLeadershipChange(anySet(), anySet(), anyMap())
-        verify(mockRemoteLogManager, times(1)).stopPartitions(
-          ArgumentMatchers.eq(Collections.singleton(topicPartition)), ArgumentMatchers.eq(true), any())
+        verify(mockRemoteLogManager, times(1))
+          .stopPartitions(ArgumentMatchers.eq(Collections.singleton(StopPartition(topicPartition, deleteLocalLog = true, deleteRemoteLog = true))), any())
       }
 
       // Check that the partition was removed
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 186cbb17c56..ed310f45149 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -98,35 +98,28 @@ public class ConsumerManager implements Closeable {
      */
     public void waitTillConsumptionCatchesUp(RecordMetadata recordMetadata,
                                              long timeoutMs) throws TimeoutException {
-        final int partition = recordMetadata.partition();
-        final long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
-
-        log.info("Waiting until consumer is caught up with the target partition: [{}]", partition);
-
+        int partition = recordMetadata.partition();
         // If the current assignment does not have the subscription for this partition then return immediately.
         if (!consumerTask.isMetadataPartitionAssigned(partition)) {
-            throw new KafkaException("This consumer is not assigned to the target partition " + partition + ". " +
-                    "Partitions currently assigned: " + consumerTask.metadataPartitionsAssigned());
+            throw new KafkaException("This consumer is not assigned to the target partition " + partition +
+                    ". Currently assigned partitions: " + consumerTask.metadataPartitionsAssigned());
         }
-
-        final long offset = recordMetadata.offset();
+        long offset = recordMetadata.offset();
         long startTimeMs = time.milliseconds();
+        long consumeCheckIntervalMs = Math.min(CONSUME_RECHECK_INTERVAL_MS, timeoutMs);
+        log.info("Wait until the consumer is caught up with the target partition {} up-to offset {}", partition, offset);
         while (true) {
-            log.debug("Checking if partition [{}] is up to date with offset [{}]", partition, offset);
             long readOffset = consumerTask.readOffsetForMetadataPartition(partition).orElse(-1L);
             if (readOffset >= offset) {
                 return;
             }
-
-            log.debug("Expected offset [{}] for partition [{}], but the committed offset: [{}],  Sleeping for [{}] to retry again",
-                    offset, partition, readOffset, consumeCheckIntervalMs);
-
+            log.debug("Expected offset for partition {} is {}, but the read offset is {}. " +
+                    "Sleeping for {} ms to retry again", partition, offset, readOffset, consumeCheckIntervalMs);
             if (time.milliseconds() - startTimeMs > timeoutMs) {
-                log.warn("Expected offset for partition:[{}] is : [{}], but the committed offset: [{}] ",
-                        partition, readOffset, offset);
+                log.warn("Expected offset for partition {} is {}, but the read offset is {}",
+                        partition, offset, readOffset);
                 throw new TimeoutException("Timed out in catching up with the expected offset by consumer.");
             }
-
             time.sleep(consumeCheckIntervalMs);
         }
     }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index b53c4ee3374..f36909b66b6 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -157,9 +157,10 @@ class ConsumerTask implements Runnable, Closeable {
             remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
             readOffsetsByUserTopicPartition.put(remoteLogMetadata.topicIdPartition(), record.offset());
         } else {
-            log.debug("The event {} is skipped because it is either already processed or not assigned to this consumer", remoteLogMetadata);
+            log.trace("The event {} is skipped because it is either already processed or not assigned to this consumer",
+                    remoteLogMetadata);
         }
-        log.debug("Updating consumed offset: [{}] for partition [{}]", record.offset(), record.partition());
+        log.trace("Updating consumed offset: {} for partition {}", record.offset(), record.partition());
         readOffsetsByMetadataPartition.put(record.partition(), record.offset());
     }
 
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index e1bf145bbd8..0d1106db05f 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.admin.TopicDescription;
@@ -175,12 +176,10 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
     private CompletableFuture<Void> storeRemoteLogMetadata(TopicIdPartition topicIdPartition,
                                                            RemoteLogMetadata remoteLogMetadata)
             throws RemoteStorageException {
-        log.debug("Storing metadata for partition: [{}] with context: [{}]", topicIdPartition, remoteLogMetadata);
-
+        log.debug("Storing the partition: {} metadata: {}", topicIdPartition, remoteLogMetadata);
         try {
             // Publish the message to the metadata topic.
             CompletableFuture<RecordMetadata> produceFuture = producerManager.publishMessage(remoteLogMetadata);
-
             // Create and return a `CompletableFuture` instance which completes when the consumer is caught up with the produced record's offset.
             return produceFuture.thenAcceptAsync(recordMetadata -> {
                 try {
@@ -377,14 +376,16 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
         final NewTopic remoteLogMetadataTopicRequest = createRemoteLogMetadataTopicRequest();
         boolean topicCreated = false;
         long startTimeMs = time.milliseconds();
-        try (AdminClient adminClient = AdminClient.create(rlmmConfig.commonProperties())) {
+        Admin adminClient = null;
+        try {
+            adminClient = AdminClient.create(rlmmConfig.commonProperties());
             // Stop if it is already initialized or closing.
             while (!(initialized.get() || closing.get())) {
 
                 // If it is timed out then raise an error to exit.
                 if (time.milliseconds() - startTimeMs > rlmmConfig.initializationRetryMaxTimeoutMs()) {
-                    log.error("Timed out in initializing the resources, retried to initialize the resource for [{}] ms.",
-                              rlmmConfig.initializationRetryMaxTimeoutMs());
+                    log.error("Timed out in initializing the resources, retried to initialize the resource for {} ms.",
+                            rlmmConfig.initializationRetryMaxTimeoutMs());
                     initializationFailed = true;
                     return;
                 }
@@ -395,7 +396,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
 
                 if (!topicCreated) {
                     // Sleep for INITIALIZATION_RETRY_INTERVAL_MS before trying to create the topic again.
-                    log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
+                    log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
                     Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
                     continue;
                 } else {
@@ -407,7 +408,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
                             initializationFailed = true;
                         }
                     } catch (Exception e) {
-                        log.info("Sleep for : {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
+                        log.info("Sleep for {} ms before it is retried again.", rlmmConfig.initializationRetryIntervalMs());
                         Utils.sleep(rlmmConfig.initializationRetryIntervalMs());
                         continue;
                     }
@@ -438,10 +439,15 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
                     lock.writeLock().unlock();
                 }
             }
+        } catch (Exception e) {
+            log.error("Encountered error while initializing topic-based RLMM resources", e);
+            initializationFailed = true;
+        } finally {
+            Utils.closeQuietly(adminClient, "AdminClient");
         }
     }
 
-    private boolean isPartitionsCountSameAsConfigured(AdminClient adminClient,
+    private boolean isPartitionsCountSameAsConfigured(Admin adminClient,
                                                       String topicName) throws InterruptedException, ExecutionException {
         log.debug("Getting topic details to check for partition count and replication factor.");
         TopicDescription topicDescription = adminClient.describeTopics(Collections.singleton(topicName))
@@ -471,7 +477,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
      * @param topic topic to be created.
      * @return Returns true if the topic already exists, or it is created successfully.
      */
-    private boolean createTopic(AdminClient adminClient, NewTopic topic) {
+    private boolean createTopic(Admin adminClient, NewTopic topic) {
         boolean topicCreated = false;
         try {
             adminClient.createTopics(Collections.singleton(topic)).all().get();
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
index 3386b94f895..e30e7c410f3 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest.java
@@ -134,14 +134,14 @@ public class TopicBasedRemoteLogMetadataManagerMultipleSubscriptionsTest {
             0, 100, -1L, 0,
             time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
         ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get());
-        Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []",
+        Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
             exception.getMessage());
 
         RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
             0, 100, -1L, 0,
             time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
         exception = Assertions.assertThrows(ExecutionException.class, () -> rlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get());
-        Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Partitions currently assigned: []",
+        Assertions.assertEquals("org.apache.kafka.common.KafkaException: This consumer is not assigned to the target partition 0. Currently assigned partitions: []",
             exception.getMessage());
 
         // `listRemoteLogSegments` will receive an exception as these topic partitions are not yet registered.
diff --git a/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteTopicTest.java b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteTopicTest.java
new file mode 100644
index 00000000000..34c5b7a42dc
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/tiered/storage/integration/DeleteTopicTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tiered.storage.integration;
+
+import org.apache.kafka.tiered.storage.TieredStorageTestBuilder;
+import org.apache.kafka.tiered.storage.TieredStorageTestHarness;
+import org.apache.kafka.tiered.storage.specs.KeyValueSpec;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.server.log.remote.storage.LocalTieredStorageEvent.EventType.DELETE_SEGMENT;
+
+public final class DeleteTopicTest extends TieredStorageTestHarness {
+
+    @Override
+    public int brokerCount() {
+        return 2;
+    }
+
+    @Override
+    protected void writeTestSpecifications(TieredStorageTestBuilder builder) {
+        final Integer broker0 = 0;
+        final Integer broker1 = 1;
+        final String topicA = "topicA";
+        final Integer p0 = 0;
+        final Integer p1 = 1;
+        final Integer partitionCount = 2;
+        final Integer replicationFactor = 2;
+        final Integer maxBatchCountPerSegment = 1;
+        final boolean enableRemoteLogStorage = true;
+        final Map<Integer, List<Integer>> assignment = mkMap(
+                mkEntry(p0, Arrays.asList(broker0, broker1)),
+                mkEntry(p1, Arrays.asList(broker1, broker0))
+        );
+
+        builder
+                .createTopic(topicA, partitionCount, replicationFactor, maxBatchCountPerSegment,
+                        assignment, enableRemoteLogStorage)
+                // send records to partition 0
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker0, topicA, p0, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p0, 2L)
+                .produce(topicA, p0, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // send records to partition 1
+                .expectSegmentToBeOffloaded(broker1, topicA, p1, 0, new KeyValueSpec("k0", "v0"))
+                .expectSegmentToBeOffloaded(broker1, topicA, p1, 1, new KeyValueSpec("k1", "v1"))
+                .expectEarliestLocalOffsetInLogDirectory(topicA, p1, 2L)
+                .produce(topicA, p1, new KeyValueSpec("k0", "v0"), new KeyValueSpec("k1", "v1"),
+                        new KeyValueSpec("k2", "v2"))
+                // delete the topic
+                .expectDeletionInRemoteStorage(broker0, topicA, p0, DELETE_SEGMENT, 2)
+                .expectDeletionInRemoteStorage(broker1, topicA, p1, DELETE_SEGMENT, 2)
+                .deleteTopic(Collections.singletonList(topicA))
+                .expectEmptyRemoteStorage(topicA, p0)
+                .expectEmptyRemoteStorage(topicA, p1);
+    }
+}