You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/18 14:46:26 UTC

[kafka] branch trunk updated: KAFKA-14953: Add tiered storage related metrics (#13944)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd3b1137d2f KAFKA-14953: Add tiered storage related metrics (#13944)
fd3b1137d2f is described below

commit fd3b1137d2f12bd5c3717fc7ac5ba3d8a96fd951
Author: Abhijeet Kumar <ab...@gmail.com>
AuthorDate: Tue Jul 18 20:16:19 2023 +0530

    KAFKA-14953: Add tiered storage related metrics (#13944)
    
    * KAFKA-14953: Adding RemoteLogManager metrics
    In this PR, I have added the following metrics that are related to tiered storage mentioned in[ KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage).
    |Metric|Description|
    |-----------------------------------------|--------------------------------------------------------------|
    | RemoteReadRequestsPerSec                    | Number of remote storage read requests per second               |
    | RemoteWriteRequestsPerSec                    | Number of remote storage write requests per second              |
    | RemoteBytesInPerSec                                | Number of bytes read from remote storage per second           |
    | RemoteReadErrorsPerSec                          | Number of remote storage read errors per second                   |
    | RemoteBytesOutPerSec                             | Number of bytes copied to remote storage per second            |
    | RemoteWriteErrorsPerSec                          | Number of remote storage write errors per second                   |
    | RemoteLogReaderTaskQueueSize             | Number of remote storage read tasks pending for execution.  |
    | RemoteLogReaderAvgIdlePercent             | Average idle percent of the remote storage reader thread pool|
    | RemoteLogManagerTasksAvgIdlePercent | Average idle percent of RemoteLogManager thread pool          |
    
    Added unit tests for all the rate metrics.
    
    Reviewers: Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Kamal Chandraprakash<ka...@gmail.com>, Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Staniel Yao<ya...@gmail.com>, hudeqi<12...@qq.com>, Satish Duggana <sa...@apache.org>
---
 build.gradle                                       |   1 +
 checkstyle/import-control-core.xml                 |   1 +
 checkstyle/import-control.xml                      |   1 +
 checkstyle/suppressions.xml                        |   2 +-
 .../java/kafka/log/remote/RemoteLogManager.java    |  51 ++++++-
 .../java/kafka/log/remote/RemoteLogReader.java     |  12 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   2 +-
 .../scala/kafka/server/KafkaRequestHandler.scala   |  30 ++++
 core/src/main/scala/kafka/server/KafkaServer.scala |   2 +-
 .../kafka/log/remote/RemoteLogManagerTest.java     | 158 ++++++++++++++++++++-
 .../java/kafka/log/remote/RemoteLogReaderTest.java |  40 +++++-
 .../internals/log/RemoteStorageThreadPool.java     |  32 ++++-
 13 files changed, 313 insertions(+), 21 deletions(-)

diff --git a/build.gradle b/build.gradle
index 7dfea2bf516..630271f11ad 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1705,6 +1705,7 @@ project(':storage') {
     implementation libs.caffeine
     implementation libs.slf4jApi
     implementation libs.jacksonDatabind
+    implementation libs.metrics
 
     testImplementation project(':clients')
     testImplementation project(':clients').sourceSets.test.output
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index f97dda31461..6136ff5928d 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -76,6 +76,7 @@
   <subpackage name="log.remote">
     <allow pkg="org.apache.kafka.server.common" />
     <allow pkg="org.apache.kafka.server.log.remote" />
+    <allow pkg="org.apache.kafka.server.metrics" />
     <allow pkg="org.apache.kafka.storage.internals" />
     <allow pkg="kafka.log" />
     <allow pkg="kafka.cluster" />
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 67b8ea08d84..e472dec90b4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -266,6 +266,7 @@
   </subpackage>
 
   <subpackage name="storage.internals">
+    <allow pkg="com.yammer.metrics.core" />
     <allow pkg="org.apache.kafka.server"/>
     <allow pkg="org.apache.kafka.storage.internals"/>
     <allow pkg="org.apache.kafka.common" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3f2df5a32b6..78e2183dbf0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,7 +39,7 @@
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
-    <suppress checks="NPathComplexity|ClassFanOutComplexity" files="RemoteLogManager.java"/>
+    <suppress checks="NPathComplexity|ClassFanOutComplexity" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
 
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6831230287b..ea065b8c812 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -16,10 +16,12 @@
  */
 package kafka.log.remote;
 
+import com.yammer.metrics.core.Gauge;
 import kafka.cluster.EndPoint;
 import kafka.cluster.Partition;
 import kafka.log.LogSegment;
 import kafka.log.UnifiedLog;
+import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
@@ -49,6 +51,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
 import org.apache.kafka.storage.internals.log.AbortedTxn;
@@ -120,12 +123,14 @@ public class RemoteLogManager implements Closeable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
     private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
-
+    public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
+    public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
     private final RemoteLogManagerConfig rlmConfig;
     private final int brokerId;
     private final String logDir;
     private final Time time;
     private final Function<TopicPartition, Optional<UnifiedLog>> fetchLog;
+    private final BrokerTopicStats brokerTopicStats;
 
     private final RemoteStorageManager remoteLogStorageManager;
 
@@ -147,6 +152,8 @@ public class RemoteLogManager implements Closeable {
     private Optional<EndPoint> endpoint = Optional.empty();
     private boolean closed = false;
 
+    private KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
+
     /**
      * Creates RemoteLogManager instance with the given arguments.
      *
@@ -162,26 +169,42 @@ public class RemoteLogManager implements Closeable {
                             String logDir,
                             String clusterId,
                             Time time,
-                            Function<TopicPartition, Optional<UnifiedLog>> fetchLog) throws IOException {
+                            Function<TopicPartition, Optional<UnifiedLog>> fetchLog,
+                            BrokerTopicStats brokerTopicStats) throws IOException {
         this.rlmConfig = rlmConfig;
         this.brokerId = brokerId;
         this.logDir = logDir;
         this.clusterId = clusterId;
         this.time = time;
         this.fetchLog = fetchLog;
+        this.brokerTopicStats = brokerTopicStats;
 
         remoteLogStorageManager = createRemoteStorageManager();
         remoteLogMetadataManager = createRemoteLogMetadataManager();
         indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+
+        metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new Gauge<Double>() {
+            @Override
+            public Double value() {
+                return rlmScheduledThreadPool.getIdlePercent();
+            }
+        });
+
         remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
                 REMOTE_LOG_READER_THREAD_NAME_PREFIX,
                 rlmConfig.remoteLogReaderThreads(),
-                rlmConfig.remoteLogReaderMaxPendingTasks()
+                rlmConfig.remoteLogReaderMaxPendingTasks(),
+                REMOTE_LOG_READER_METRICS_NAME_PREFIX
         );
     }
 
+    private void removeMetrics() {
+        metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+        remoteStorageReaderThreadPool.removeMetrics();
+    }
+
     private <T> T createDelegate(ClassLoader classLoader, String className) {
         try {
             return (T) classLoader.loadClass(className)
@@ -553,6 +576,8 @@ public class RemoteLogManager implements Closeable {
                 throw ex;
             } catch (Exception ex) {
                 if (!isCancelled()) {
+                    brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteWriteRequestRate().mark();
+                    brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().mark();
                     logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex);
                 }
             }
@@ -594,13 +619,17 @@ public class RemoteLogManager implements Closeable {
             LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.lazyOffsetIndex().get().file()),
                     toPathIfExists(segment.lazyTimeIndex().get().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
                     producerStateSnapshotFile.toPath(), leaderEpochsIndex);
+            brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
+            brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
             remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
 
             RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
                     RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
 
             remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
-
+            brokerTopicStats.topicStats(log.topicPartition().topic())
+                .remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
+            brokerTopicStats.allTopicsStats().remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
             copiedOffsetOption = OptionalLong.of(endOffset);
             log.updateHighestOffsetInRemoteStorage(endOffset);
             logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
@@ -845,7 +874,7 @@ public class RemoteLogManager implements Closeable {
      * @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full)
      */
     public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
-        return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback));
+        return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats));
     }
 
     void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
@@ -896,7 +925,11 @@ public class RemoteLogManager implements Closeable {
                 Utils.closeQuietly(indexCache, "RemoteIndexCache");
 
                 rlmScheduledThreadPool.close();
-                shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
+                try {
+                    shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
+                } finally {
+                    removeMetrics();
+                }
 
                 leaderOrFollowerTasks.clear();
                 closed = true;
@@ -955,6 +988,10 @@ public class RemoteLogManager implements Closeable {
             return threadPool;
         }
 
+        public Double getIdlePercent() {
+            return 1 - (double) scheduledThreadPool.getActiveCount() / (double) scheduledThreadPool.getCorePoolSize();
+        }
+
         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
             LOGGER.info("Scheduling runnable {} with initial delay: {}, fixed delay: {}", runnable, initialDelay, delay);
             return scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit);
@@ -965,4 +1002,4 @@ public class RemoteLogManager implements Closeable {
         }
     }
 
-}
\ No newline at end of file
+}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
index 0ed7f722d5b..b4cea4fa81e 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -16,6 +16,7 @@
  */
 package kafka.log.remote;
 
+import kafka.server.BrokerTopicStats;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
@@ -31,14 +32,19 @@ public class RemoteLogReader implements Callable<Void> {
     private final Logger logger;
     private final RemoteStorageFetchInfo fetchInfo;
     private final RemoteLogManager rlm;
+    private final BrokerTopicStats brokerTopicStats;
     private final Consumer<RemoteLogReadResult> callback;
 
     public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
                            RemoteLogManager rlm,
-                           Consumer<RemoteLogReadResult> callback) {
+                           Consumer<RemoteLogReadResult> callback,
+                           BrokerTopicStats brokerTopicStats) {
         this.fetchInfo = fetchInfo;
         this.rlm = rlm;
+        this.brokerTopicStats = brokerTopicStats;
         this.callback = callback;
+        this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteReadRequestRate().mark();
+        this.brokerTopicStats.allTopicsStats().remoteReadRequestRate().mark();
         logger = new LogContext() {
             @Override
             public String logPrefix() {
@@ -54,10 +60,14 @@ public class RemoteLogReader implements Callable<Void> {
             logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
 
             FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+            brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
+            brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
             result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
         } catch (OffsetOutOfRangeException e) {
             result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
         } catch (Exception e) {
+            brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteReadRequestRate().mark();
+            brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().mark();
             logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
             result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
         }
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 9eb1af6d56f..37a6d7be460 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -97,7 +97,7 @@ import scala.jdk.CollectionConverters._
 @threadsafe
 class UnifiedLog(@volatile var logStartOffset: Long,
                  private val localLog: LocalLog,
-                 brokerTopicStats: BrokerTopicStats,
+                 val brokerTopicStats: BrokerTopicStats,
                  val producerIdExpirationCheckIntervalMs: Int,
                  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                  val producerStateManager: ProducerStateManager,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8641a74824b..a7397774bba 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -571,7 +571,7 @@ class BrokerServer(
       }
 
       Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
-        (tp: TopicPartition) => logManager.getLog(tp).asJava));
+        (tp: TopicPartition) => logManager.getLog(tp).asJava, brokerTopicStats));
     } else {
       None
     }
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 325c288c58e..f0de624562b 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
     BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
     BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
     BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+    BrokerTopicStats.RemoteBytesOutPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"),
+    BrokerTopicStats.RemoteBytesInPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesInPerSec, "bytes"),
+    BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"),
+    BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"),
+    BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"),
+    BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"),
     BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
     BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
     BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
@@ -336,6 +342,18 @@ class BrokerTopicMetrics(name: Option[String]) {
 
   def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
 
+  def remoteBytesOutRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesOutPerSec).meter()
+
+  def remoteBytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesInPerSec).meter()
+
+  def remoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteReadRequestsPerSec).meter()
+
+  def remoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteWriteRequestsPerSec).meter()
+
+  def failedRemoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteReadRequestsPerSec).meter()
+
+  def failedRemoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteWriteRequestsPerSec).meter()
+
   def closeMetric(metricType: String): Unit = {
     val meter = metricTypeMap.get(metricType)
     if (meter != null)
@@ -360,6 +378,12 @@ object BrokerTopicStats {
   val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
   val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
   val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
+  val RemoteBytesOutPerSec = "RemoteBytesOutPerSec"
+  val RemoteBytesInPerSec = "RemoteBytesInPerSec"
+  val RemoteReadRequestsPerSec = "RemoteReadRequestsPerSec"
+  val RemoteWriteRequestsPerSec = "RemoteWriteRequestsPerSec"
+  val FailedRemoteReadRequestsPerSec = "RemoteReadErrorsPerSec"
+  val FailedRemoteWriteRequestsPerSec = "RemoteWriteErrorsPerSec"
 
   // These following topics are for LogValidator for better debugging on failed records
   val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
@@ -415,6 +439,12 @@ class BrokerTopicStats extends Logging {
       topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
       topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
       topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
+      topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesOutPerSec)
+      topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesInPerSec)
+      topicMetrics.closeMetric(BrokerTopicStats.RemoteReadRequestsPerSec)
+      topicMetrics.closeMetric(BrokerTopicStats.RemoteWriteRequestsPerSec)
+      topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteReadRequestsPerSec)
+      topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteWriteRequestsPerSec)
     }
   }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 28c07840973..0d10f1cdde3 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -614,7 +614,7 @@ class KafkaServer(
       }
 
       Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
-        (tp: TopicPartition) => logManager.getLog(tp).asJava));
+        (tp: TopicPartition) => logManager.getLog(tp).asJava, brokerTopicStats));
     } else {
       None
     }
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index f7a6f1c4a9e..3bdbe4e6c76 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -20,6 +20,7 @@ import kafka.cluster.EndPoint;
 import kafka.cluster.Partition;
 import kafka.log.LogSegment;
 import kafka.log.UnifiedLog;
+import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
@@ -47,6 +48,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@@ -54,6 +56,7 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
 import org.apache.kafka.storage.internals.log.LazyIndex;
 import org.apache.kafka.storage.internals.log.OffsetIndex;
 import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
 import org.apache.kafka.storage.internals.log.TimeIndex;
 import org.apache.kafka.storage.internals.log.TransactionIndex;
 import org.apache.kafka.test.TestUtils;
@@ -61,6 +64,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
+import org.mockito.MockedConstruction;
 import org.mockito.Mockito;
 import scala.Option;
 import scala.collection.JavaConverters;
@@ -83,7 +87,10 @@ import java.util.Optional;
 import java.util.Properties;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
+import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
+import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -93,14 +100,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 public class RemoteLogManagerTest {
@@ -112,6 +123,7 @@ public class RemoteLogManagerTest {
     RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
     RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
     RemoteLogManagerConfig remoteLogManagerConfig = null;
+    BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
     RemoteLogManager remoteLogManager = null;
 
     TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
@@ -143,7 +155,10 @@ public class RemoteLogManagerTest {
         topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
         Properties props = new Properties();
         remoteLogManagerConfig = createRLMConfig(props);
-        remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog)) {
+
+        kafka.utils.TestUtils.clearYammerMetrics();
+
+        remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), brokerTopicStats) {
             public RemoteStorageManager createRemoteStorageManager() {
                 return remoteStorageManager;
             }
@@ -252,6 +267,8 @@ public class RemoteLogManagerTest {
         long nextSegmentStartOffset = 150L;
         long oldSegmentEndOffset = nextSegmentStartOffset - 1;
 
+        when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
         // leader epoch preparation
         checkpoint.write(totalEpochEntries);
         LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
@@ -271,6 +288,7 @@ public class RemoteLogManagerTest {
         FileRecords fileRecords = mock(FileRecords.class);
         when(oldSegment.log()).thenReturn(fileRecords);
         when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
         when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
 
         when(mockLog.activeSegment()).thenReturn(activeSegment);
@@ -297,6 +315,15 @@ public class RemoteLogManagerTest {
         when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
         doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
 
+        // Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count());
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+
         RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
         task.convertToLeader(2);
         task.copyLogSegmentsToRemote(mockLog);
@@ -329,6 +356,92 @@ public class RemoteLogManagerTest {
         ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
         verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
         assertEquals(oldSegmentEndOffset, argument.getValue());
+
+        // Verify the metric for remote writes is updated correctly
+        assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+        assertEquals(10, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count());
+        // Verify we did not report any failure for remote writes
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+        assertEquals(10, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+
+    }
+
+    @Test
+    void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
+        long oldSegmentStartOffset = 0L;
+        long nextSegmentStartOffset = 150L;
+
+        when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+        // leader epoch preparation
+        checkpoint.write(totalEpochEntries);
+        LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+        when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+        when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
+
+        File tempFile = TestUtils.tempFile();
+        File mockProducerSnapshotIndex = TestUtils.tempFile();
+        File tempDir = TestUtils.tempDirectory();
+        // create 2 log segments, with 0 and 150 as log start offset
+        LogSegment oldSegment = mock(LogSegment.class);
+        LogSegment activeSegment = mock(LogSegment.class);
+
+        when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+        when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+        FileRecords fileRecords = mock(FileRecords.class);
+        when(oldSegment.log()).thenReturn(fileRecords);
+        when(fileRecords.file()).thenReturn(tempFile);
+        when(fileRecords.sizeInBytes()).thenReturn(10);
+        when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+        when(mockLog.activeSegment()).thenReturn(activeSegment);
+        when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+        when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
+
+        ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
+        when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+        when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+        when(mockLog.lastStableOffset()).thenReturn(250L);
+
+        LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
+        LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
+        File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
+        txnFile.createNewFile();
+        TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
+        when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+        when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+        when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+        CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+        dummyFuture.complete(null);
+        when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+        doThrow(new RuntimeException()).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+
+        // Verify the metrics for remote write requests/failures is zero before attempt to copy log segment
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+        assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+        RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+        task.convertToLeader(2);
+        task.copyLogSegmentsToRemote(mockLog);
+
+        // Verify we attempted to copy log segment metadata to remote storage
+        verify(remoteStorageManager, times(1)).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+
+        // Verify we should not have updated the highest offset because of write failure
+        verify(mockLog, times(0)).updateHighestOffsetInRemoteStorage(anyLong());
+        // Verify the metric for remote write requests/failures was updated.
+        assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+        assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+        assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
     }
 
     @Test
@@ -412,7 +525,7 @@ public class RemoteLogManagerTest {
     void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
         ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
         RemoteLogManager remoteLogManager =
-            new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty()) {
+            new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty(), brokerTopicStats) {
                 public RemoteStorageManager createRemoteStorageManager() {
                     return rsmManager;
                 }
@@ -589,6 +702,47 @@ public class RemoteLogManagerTest {
         inorder.verify(remoteLogMetadataManager, times(1)).close();
     }
 
+    @Test
+    public void testRemoveMetricsOnClose() throws IOException {
+        MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class);
+        try {
+            RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId,
+                time, tp -> Optional.of(mockLog), brokerTopicStats) {
+                public RemoteStorageManager createRemoteStorageManager() {
+                    return remoteStorageManager;
+                }
+
+                public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+                    return remoteLogMetadataManager;
+                }
+            };
+            // Close RemoteLogManager so that metrics are removed
+            remoteLogManager.close();
+
+            KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor.constructed().get(0);
+            KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor.constructed().get(1);
+
+            List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+            List<String> remoteStorageThreadPoolMetricNames = RemoteStorageThreadPool.METRIC_SUFFIXES
+                .stream()
+                .map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
+                .collect(Collectors.toList());
+
+            verify(mockRlmMetricsGroup, times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
+            // Verify that the RemoteLogManager metrics are removed
+            remoteLogManagerMetricNames.forEach(metricName -> verify(mockRlmMetricsGroup).removeMetric(metricName));
+
+            verify(mockThreadPoolMetricsGroup, times(remoteStorageThreadPoolMetricNames.size())).newGauge(anyString(), any());
+            // Verify that the RemoteStorageThreadPool metrics are removed
+            remoteStorageThreadPoolMetricNames.forEach(metricName -> verify(mockThreadPoolMetricsGroup).removeMetric(metricName));
+
+            verifyNoMoreInteractions(mockRlmMetricsGroup);
+            verifyNoMoreInteractions(mockThreadPoolMetricsGroup);
+        } finally {
+            mockMetricsGroupCtor.close();
+        }
+    }
+
     private Partition mockPartition(TopicIdPartition topicIdPartition) {
         TopicPartition tp = topicIdPartition.topicPartition();
         Partition partition = mock(Partition.class);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index aa8dd042a13..36533fed22d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -16,14 +16,16 @@
  */
 package kafka.log.remote;
 
+import kafka.server.BrokerTopicStats;
+import kafka.utils.TestUtils;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.storage.internals.log.FetchDataInfo;
 import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
 import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
 import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -40,18 +42,26 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class RemoteLogReaderTest {
+    public static final String TOPIC = "test";
     RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+    BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
     LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
     Records records = mock(Records.class);
 
+    @BeforeEach
+    public void setUp() {
+        TestUtils.clearYammerMetrics();
+    }
+
     @Test
     public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOException {
         FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, records);
+        when(records.sizeInBytes()).thenReturn(100);
         when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
-        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
-        RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
+        RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
         remoteLogReader.call();
 
         // verify the callback did get invoked with the expected remoteLogReadResult
@@ -61,15 +71,24 @@ public class RemoteLogReaderTest {
         assertFalse(actualRemoteLogReadResult.error.isPresent());
         assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
         assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());
+
+        // Verify metrics for remote reads are updated correctly
+        assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
+        assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
+        assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count());
+        assertEquals(100, brokerTopicStats.allTopicsStats().remoteBytesInRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count());
     }
 
     @Test
     public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException {
-        when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new OffsetOutOfRangeException("error"));
+        when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new RuntimeException("error"));
 
         Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
-        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
-        RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
+        RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
         remoteLogReader.call();
 
         // verify the callback did get invoked with the expected remoteLogReadResult
@@ -78,5 +97,14 @@ public class RemoteLogReaderTest {
         RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue();
         assertTrue(actualRemoteLogReadResult.error.isPresent());
         assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+
+        // Verify metrics for remote reads are updated correctly
+        assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
+        assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
+        assertEquals(1, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
+        // Verify aggregate metrics
+        assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count());
+        assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesInRate().count());
+        assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count());
     }
 }
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 2b7ae15b154..22807a0271b 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -16,11 +16,17 @@
  */
 package org.apache.kafka.storage.internals.log;
 
+import com.yammer.metrics.core.Gauge;
 import org.apache.kafka.common.internals.FatalExitError;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
 import org.slf4j.Logger;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -28,11 +34,17 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+    public static final String TASK_QUEUE_SIZE = "TaskQueueSize";
+    public static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
+    public static final Set<String> METRIC_SUFFIXES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(TASK_QUEUE_SIZE, AVG_IDLE_PERCENT)));
     private final Logger logger;
+    private final String metricsNamePrefix;
+    private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
 
     public RemoteStorageThreadPool(String threadNamePrefix,
                                    int numThreads,
-                                   int maxPendingTasks) {
+                                   int maxPendingTasks,
+                                   String metricsNamePrefix) {
         super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
                 new RemoteStorageThreadFactory(threadNamePrefix));
         logger = new LogContext() {
@@ -41,6 +53,20 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
                 return "[" + Thread.currentThread().getName() + "]";
             }
         }.logger(RemoteStorageThreadPool.class);
+
+        this.metricsNamePrefix = metricsNamePrefix;
+        metricsGroup.newGauge(metricsNamePrefix.concat(TASK_QUEUE_SIZE), new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return RemoteStorageThreadPool.this.getQueue().size();
+            }
+        });
+        metricsGroup.newGauge(metricsNamePrefix.concat(AVG_IDLE_PERCENT), new Gauge<Double>() {
+            @Override
+            public Double value() {
+                return 1 - (double) RemoteStorageThreadPool.this.getActiveCount() / (double) RemoteStorageThreadPool.this.getCorePoolSize();
+            }
+        });
     }
 
     @Override
@@ -70,4 +96,8 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
         }
 
     }
+
+    public void removeMetrics() {
+        METRIC_SUFFIXES.forEach(metric -> metricsGroup.removeMetric(metricsNamePrefix.concat(metric)));
+    }
 }