You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/07/18 14:46:26 UTC
[kafka] branch trunk updated: KAFKA-14953: Add tiered storage related metrics (#13944)
This is an automated email from the ASF dual-hosted git repository.
satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new fd3b1137d2f KAFKA-14953: Add tiered storage related metrics (#13944)
fd3b1137d2f is described below
commit fd3b1137d2f12bd5c3717fc7ac5ba3d8a96fd951
Author: Abhijeet Kumar <ab...@gmail.com>
AuthorDate: Tue Jul 18 20:16:19 2023 +0530
KAFKA-14953: Add tiered storage related metrics (#13944)
* KAFKA-14953: Adding RemoteLogManager metrics
In this PR, I have added the following metrics that are related to tiered storage mentioned in[ KIP-405](https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage).
|Metric|Description|
|-----------------------------------------|--------------------------------------------------------------|
| RemoteReadRequestsPerSec | Number of remote storage read requests per second |
| RemoteWriteRequestsPerSec | Number of remote storage write requests per second |
| RemoteBytesInPerSec | Number of bytes read from remote storage per second |
| RemoteReadErrorsPerSec | Number of remote storage read errors per second |
| RemoteBytesOutPerSec | Number of bytes copied to remote storage per second |
| RemoteWriteErrorsPerSec | Number of remote storage write errors per second |
| RemoteLogReaderTaskQueueSize | Number of remote storage read tasks pending for execution. |
| RemoteLogReaderAvgIdlePercent | Average idle percent of the remote storage reader thread pool|
| RemoteLogManagerTasksAvgIdlePercent | Average idle percent of RemoteLogManager thread pool |
Added unit tests for all the rate metrics.
Reviewers: Luke Chen <sh...@gmail.com>, Divij Vaidya <di...@amazon.com>, Kamal Chandraprakash<ka...@gmail.com>, Jorge Esteban Quilcate Otoya <qu...@gmail.com>, Staniel Yao<ya...@gmail.com>, hudeqi<12...@qq.com>, Satish Duggana <sa...@apache.org>
---
build.gradle | 1 +
checkstyle/import-control-core.xml | 1 +
checkstyle/import-control.xml | 1 +
checkstyle/suppressions.xml | 2 +-
.../java/kafka/log/remote/RemoteLogManager.java | 51 ++++++-
.../java/kafka/log/remote/RemoteLogReader.java | 12 +-
core/src/main/scala/kafka/log/UnifiedLog.scala | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../scala/kafka/server/KafkaRequestHandler.scala | 30 ++++
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../kafka/log/remote/RemoteLogManagerTest.java | 158 ++++++++++++++++++++-
.../java/kafka/log/remote/RemoteLogReaderTest.java | 40 +++++-
.../internals/log/RemoteStorageThreadPool.java | 32 ++++-
13 files changed, 313 insertions(+), 21 deletions(-)
diff --git a/build.gradle b/build.gradle
index 7dfea2bf516..630271f11ad 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1705,6 +1705,7 @@ project(':storage') {
implementation libs.caffeine
implementation libs.slf4jApi
implementation libs.jacksonDatabind
+ implementation libs.metrics
testImplementation project(':clients')
testImplementation project(':clients').sourceSets.test.output
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index f97dda31461..6136ff5928d 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -76,6 +76,7 @@
<subpackage name="log.remote">
<allow pkg="org.apache.kafka.server.common" />
<allow pkg="org.apache.kafka.server.log.remote" />
+ <allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.storage.internals" />
<allow pkg="kafka.log" />
<allow pkg="kafka.cluster" />
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 67b8ea08d84..e472dec90b4 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -266,6 +266,7 @@
</subpackage>
<subpackage name="storage.internals">
+ <allow pkg="com.yammer.metrics.core" />
<allow pkg="org.apache.kafka.server"/>
<allow pkg="org.apache.kafka.storage.internals"/>
<allow pkg="org.apache.kafka.common" />
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3f2df5a32b6..78e2183dbf0 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,7 +39,7 @@
<suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
<suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
- <suppress checks="NPathComplexity|ClassFanOutComplexity" files="RemoteLogManager.java"/>
+ <suppress checks="NPathComplexity|ClassFanOutComplexity" files="(RemoteLogManager|RemoteLogManagerTest).java"/>
<suppress checks="MethodLength"
files="(KafkaClusterTestKit).java"/>
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 6831230287b..ea065b8c812 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -16,10 +16,12 @@
*/
package kafka.log.remote;
+import com.yammer.metrics.core.Gauge;
import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
+import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
@@ -49,6 +51,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AbortedTxn;
@@ -120,12 +123,14 @@ public class RemoteLogManager implements Closeable {
private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
-
+ public static final String REMOTE_LOG_READER_METRICS_NAME_PREFIX = "RemoteLogReader";
+ public static final String REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT = "RemoteLogManagerTasksAvgIdlePercent";
private final RemoteLogManagerConfig rlmConfig;
private final int brokerId;
private final String logDir;
private final Time time;
private final Function<TopicPartition, Optional<UnifiedLog>> fetchLog;
+ private final BrokerTopicStats brokerTopicStats;
private final RemoteStorageManager remoteLogStorageManager;
@@ -147,6 +152,8 @@ public class RemoteLogManager implements Closeable {
private Optional<EndPoint> endpoint = Optional.empty();
private boolean closed = false;
+ private KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
+
/**
* Creates RemoteLogManager instance with the given arguments.
*
@@ -162,26 +169,42 @@ public class RemoteLogManager implements Closeable {
String logDir,
String clusterId,
Time time,
- Function<TopicPartition, Optional<UnifiedLog>> fetchLog) throws IOException {
+ Function<TopicPartition, Optional<UnifiedLog>> fetchLog,
+ BrokerTopicStats brokerTopicStats) throws IOException {
this.rlmConfig = rlmConfig;
this.brokerId = brokerId;
this.logDir = logDir;
this.clusterId = clusterId;
this.time = time;
this.fetchLog = fetchLog;
+ this.brokerTopicStats = brokerTopicStats;
remoteLogStorageManager = createRemoteStorageManager();
remoteLogMetadataManager = createRemoteLogMetadataManager();
indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+
+ metricsGroup.newGauge(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT, new Gauge<Double>() {
+ @Override
+ public Double value() {
+ return rlmScheduledThreadPool.getIdlePercent();
+ }
+ });
+
remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
REMOTE_LOG_READER_THREAD_NAME_PREFIX,
rlmConfig.remoteLogReaderThreads(),
- rlmConfig.remoteLogReaderMaxPendingTasks()
+ rlmConfig.remoteLogReaderMaxPendingTasks(),
+ REMOTE_LOG_READER_METRICS_NAME_PREFIX
);
}
+ private void removeMetrics() {
+ metricsGroup.removeMetric(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+ remoteStorageReaderThreadPool.removeMetrics();
+ }
+
private <T> T createDelegate(ClassLoader classLoader, String className) {
try {
return (T) classLoader.loadClass(className)
@@ -553,6 +576,8 @@ public class RemoteLogManager implements Closeable {
throw ex;
} catch (Exception ex) {
if (!isCancelled()) {
+ brokerTopicStats.topicStats(log.topicPartition().topic()).failedRemoteWriteRequestRate().mark();
+ brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().mark();
logger.error("Error occurred while copying log segments of partition: {}", topicIdPartition, ex);
}
}
@@ -594,13 +619,17 @@ public class RemoteLogManager implements Closeable {
LogSegmentData segmentData = new LogSegmentData(logFile.toPath(), toPathIfExists(segment.lazyOffsetIndex().get().file()),
toPathIfExists(segment.lazyTimeIndex().get().file()), Optional.ofNullable(toPathIfExists(segment.txnIndex().file())),
producerStateSnapshotFile.toPath(), leaderEpochsIndex);
+ brokerTopicStats.topicStats(log.topicPartition().topic()).remoteWriteRequestRate().mark();
+ brokerTopicStats.allTopicsStats().remoteWriteRequestRate().mark();
remoteLogStorageManager.copyLogSegmentData(copySegmentStartedRlsm, segmentData);
RemoteLogSegmentMetadataUpdate copySegmentFinishedRlsm = new RemoteLogSegmentMetadataUpdate(id, time.milliseconds(),
RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
remoteLogMetadataManager.updateRemoteLogSegmentMetadata(copySegmentFinishedRlsm).get();
-
+ brokerTopicStats.topicStats(log.topicPartition().topic())
+ .remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
+ brokerTopicStats.allTopicsStats().remoteBytesOutRate().mark(copySegmentStartedRlsm.segmentSizeInBytes());
copiedOffsetOption = OptionalLong.of(endOffset);
log.updateHighestOffsetInRemoteStorage(endOffset);
logger.info("Copied {} to remote storage with segment-id: {}", logFileName, copySegmentFinishedRlsm.remoteLogSegmentId());
@@ -845,7 +874,7 @@ public class RemoteLogManager implements Closeable {
* @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full)
*/
public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
- return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback));
+ return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback, brokerTopicStats));
}
void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
@@ -896,7 +925,11 @@ public class RemoteLogManager implements Closeable {
Utils.closeQuietly(indexCache, "RemoteIndexCache");
rlmScheduledThreadPool.close();
- shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
+ try {
+ shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
+ } finally {
+ removeMetrics();
+ }
leaderOrFollowerTasks.clear();
closed = true;
@@ -955,6 +988,10 @@ public class RemoteLogManager implements Closeable {
return threadPool;
}
+ public Double getIdlePercent() {
+ return 1 - (double) scheduledThreadPool.getActiveCount() / (double) scheduledThreadPool.getCorePoolSize();
+ }
+
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit timeUnit) {
LOGGER.info("Scheduling runnable {} with initial delay: {}, fixed delay: {}", runnable, initialDelay, delay);
return scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit);
@@ -965,4 +1002,4 @@ public class RemoteLogManager implements Closeable {
}
}
-}
\ No newline at end of file
+}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
index 0ed7f722d5b..b4cea4fa81e 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogReader.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -16,6 +16,7 @@
*/
package kafka.log.remote;
+import kafka.server.BrokerTopicStats;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
@@ -31,14 +32,19 @@ public class RemoteLogReader implements Callable<Void> {
private final Logger logger;
private final RemoteStorageFetchInfo fetchInfo;
private final RemoteLogManager rlm;
+ private final BrokerTopicStats brokerTopicStats;
private final Consumer<RemoteLogReadResult> callback;
public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
RemoteLogManager rlm,
- Consumer<RemoteLogReadResult> callback) {
+ Consumer<RemoteLogReadResult> callback,
+ BrokerTopicStats brokerTopicStats) {
this.fetchInfo = fetchInfo;
this.rlm = rlm;
+ this.brokerTopicStats = brokerTopicStats;
this.callback = callback;
+ this.brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteReadRequestRate().mark();
+ this.brokerTopicStats.allTopicsStats().remoteReadRequestRate().mark();
logger = new LogContext() {
@Override
public String logPrefix() {
@@ -54,10 +60,14 @@ public class RemoteLogReader implements Callable<Void> {
logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+ brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
+ brokerTopicStats.allTopicsStats().remoteBytesInRate().mark(fetchDataInfo.records.sizeInBytes());
result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
} catch (OffsetOutOfRangeException e) {
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
} catch (Exception e) {
+ brokerTopicStats.topicStats(fetchInfo.topicPartition.topic()).failedRemoteReadRequestRate().mark();
+ brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().mark();
logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
}
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 9eb1af6d56f..37a6d7be460 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -97,7 +97,7 @@ import scala.jdk.CollectionConverters._
@threadsafe
class UnifiedLog(@volatile var logStartOffset: Long,
private val localLog: LocalLog,
- brokerTopicStats: BrokerTopicStats,
+ val brokerTopicStats: BrokerTopicStats,
val producerIdExpirationCheckIntervalMs: Int,
@volatile var leaderEpochCache: Option[LeaderEpochFileCache],
val producerStateManager: ProducerStateManager,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 8641a74824b..a7397774bba 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -571,7 +571,7 @@ class BrokerServer(
}
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
- (tp: TopicPartition) => logManager.getLog(tp).asJava));
+ (tp: TopicPartition) => logManager.getLog(tp).asJava, brokerTopicStats));
} else {
None
}
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index 325c288c58e..f0de624562b 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -277,6 +277,12 @@ class BrokerTopicMetrics(name: Option[String]) {
BrokerTopicStats.TotalFetchRequestsPerSec -> MeterWrapper(BrokerTopicStats.TotalFetchRequestsPerSec, "requests"),
BrokerTopicStats.FetchMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.FetchMessageConversionsPerSec, "requests"),
BrokerTopicStats.ProduceMessageConversionsPerSec -> MeterWrapper(BrokerTopicStats.ProduceMessageConversionsPerSec, "requests"),
+ BrokerTopicStats.RemoteBytesOutPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesOutPerSec, "bytes"),
+ BrokerTopicStats.RemoteBytesInPerSec -> MeterWrapper(BrokerTopicStats.RemoteBytesInPerSec, "bytes"),
+ BrokerTopicStats.RemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteReadRequestsPerSec, "requests"),
+ BrokerTopicStats.RemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.RemoteWriteRequestsPerSec, "requests"),
+ BrokerTopicStats.FailedRemoteReadRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteReadRequestsPerSec, "requests"),
+ BrokerTopicStats.FailedRemoteWriteRequestsPerSec -> MeterWrapper(BrokerTopicStats.FailedRemoteWriteRequestsPerSec, "requests"),
BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec -> MeterWrapper(BrokerTopicStats.NoKeyCompactedTopicRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMagicNumberRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMagicNumberRecordsPerSec, "requests"),
BrokerTopicStats.InvalidMessageCrcRecordsPerSec -> MeterWrapper(BrokerTopicStats.InvalidMessageCrcRecordsPerSec, "requests"),
@@ -336,6 +342,18 @@ class BrokerTopicMetrics(name: Option[String]) {
def invalidOffsetOrSequenceRecordsPerSec: Meter = metricTypeMap.get(BrokerTopicStats.InvalidOffsetOrSequenceRecordsPerSec).meter()
+ def remoteBytesOutRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesOutPerSec).meter()
+
+ def remoteBytesInRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteBytesInPerSec).meter()
+
+ def remoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteReadRequestsPerSec).meter()
+
+ def remoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.RemoteWriteRequestsPerSec).meter()
+
+ def failedRemoteReadRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteReadRequestsPerSec).meter()
+
+ def failedRemoteWriteRequestRate: Meter = metricTypeMap.get(BrokerTopicStats.FailedRemoteWriteRequestsPerSec).meter()
+
def closeMetric(metricType: String): Unit = {
val meter = metricTypeMap.get(metricType)
if (meter != null)
@@ -360,6 +378,12 @@ object BrokerTopicStats {
val ProduceMessageConversionsPerSec = "ProduceMessageConversionsPerSec"
val ReassignmentBytesInPerSec = "ReassignmentBytesInPerSec"
val ReassignmentBytesOutPerSec = "ReassignmentBytesOutPerSec"
+ val RemoteBytesOutPerSec = "RemoteBytesOutPerSec"
+ val RemoteBytesInPerSec = "RemoteBytesInPerSec"
+ val RemoteReadRequestsPerSec = "RemoteReadRequestsPerSec"
+ val RemoteWriteRequestsPerSec = "RemoteWriteRequestsPerSec"
+ val FailedRemoteReadRequestsPerSec = "RemoteReadErrorsPerSec"
+ val FailedRemoteWriteRequestsPerSec = "RemoteWriteErrorsPerSec"
// These following topics are for LogValidator for better debugging on failed records
val NoKeyCompactedTopicRecordsPerSec = "NoKeyCompactedTopicRecordsPerSec"
@@ -415,6 +439,12 @@ class BrokerTopicStats extends Logging {
topicMetrics.closeMetric(BrokerTopicStats.ProduceMessageConversionsPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReplicationBytesOutPerSec)
topicMetrics.closeMetric(BrokerTopicStats.ReassignmentBytesOutPerSec)
+ topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesOutPerSec)
+ topicMetrics.closeMetric(BrokerTopicStats.RemoteBytesInPerSec)
+ topicMetrics.closeMetric(BrokerTopicStats.RemoteReadRequestsPerSec)
+ topicMetrics.closeMetric(BrokerTopicStats.RemoteWriteRequestsPerSec)
+ topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteReadRequestsPerSec)
+ topicMetrics.closeMetric(BrokerTopicStats.FailedRemoteWriteRequestsPerSec)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 28c07840973..0d10f1cdde3 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -614,7 +614,7 @@ class KafkaServer(
}
Some(new RemoteLogManager(config.remoteLogManagerConfig, config.brokerId, config.logDirs.head, clusterId, time,
- (tp: TopicPartition) => logManager.getLog(tp).asJava));
+ (tp: TopicPartition) => logManager.getLog(tp).asJava, brokerTopicStats));
} else {
None
}
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index f7a6f1c4a9e..3bdbe4e6c76 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -20,6 +20,7 @@ import kafka.cluster.EndPoint;
import kafka.cluster.Partition;
import kafka.log.LogSegment;
import kafka.log.UnifiedLog;
+import kafka.server.BrokerTopicStats;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicIdPartition;
@@ -47,6 +48,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
@@ -54,6 +56,7 @@ import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LazyIndex;
import org.apache.kafka.storage.internals.log.OffsetIndex;
import org.apache.kafka.storage.internals.log.ProducerStateManager;
+import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
import org.apache.kafka.storage.internals.log.TimeIndex;
import org.apache.kafka.storage.internals.log.TransactionIndex;
import org.apache.kafka.test.TestUtils;
@@ -61,6 +64,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
+import org.mockito.MockedConstruction;
import org.mockito.Mockito;
import scala.Option;
import scala.collection.JavaConverters;
@@ -83,7 +87,10 @@ import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT;
+import static kafka.log.remote.RemoteLogManager.REMOTE_LOG_READER_METRICS_NAME_PREFIX;
import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -93,14 +100,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
public class RemoteLogManagerTest {
@@ -112,6 +123,7 @@ public class RemoteLogManagerTest {
RemoteStorageManager remoteStorageManager = mock(RemoteStorageManager.class);
RemoteLogMetadataManager remoteLogMetadataManager = mock(RemoteLogMetadataManager.class);
RemoteLogManagerConfig remoteLogManagerConfig = null;
+ BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
RemoteLogManager remoteLogManager = null;
TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0));
@@ -143,7 +155,10 @@ public class RemoteLogManagerTest {
topicIds.put(followerTopicIdPartition.topicPartition().topic(), followerTopicIdPartition.topicId());
Properties props = new Properties();
remoteLogManagerConfig = createRLMConfig(props);
- remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog)) {
+
+ kafka.utils.TestUtils.clearYammerMetrics();
+
+ remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, tp -> Optional.of(mockLog), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return remoteStorageManager;
}
@@ -252,6 +267,8 @@ public class RemoteLogManagerTest {
long nextSegmentStartOffset = 150L;
long oldSegmentEndOffset = nextSegmentStartOffset - 1;
+ when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
// leader epoch preparation
checkpoint.write(totalEpochEntries);
LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
@@ -271,6 +288,7 @@ public class RemoteLogManagerTest {
FileRecords fileRecords = mock(FileRecords.class);
when(oldSegment.log()).thenReturn(fileRecords);
when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
when(mockLog.activeSegment()).thenReturn(activeSegment);
@@ -297,6 +315,15 @@ public class RemoteLogManagerTest {
when(remoteLogMetadataManager.updateRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadataUpdate.class))).thenReturn(dummyFuture);
doNothing().when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+ // Verify the metrics for remote writes and for failures is zero before attempt to copy log segment
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count());
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+
RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
task.convertToLeader(2);
task.copyLogSegmentsToRemote(mockLog);
@@ -329,6 +356,92 @@ public class RemoteLogManagerTest {
ArgumentCaptor<Long> argument = ArgumentCaptor.forClass(Long.class);
verify(mockLog, times(1)).updateHighestOffsetInRemoteStorage(argument.capture());
assertEquals(oldSegmentEndOffset, argument.getValue());
+
+ // Verify the metric for remote writes is updated correctly
+ assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+ assertEquals(10, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteBytesOutRate().count());
+ // Verify we did not report any failure for remote writes
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+ assertEquals(10, brokerTopicStats.allTopicsStats().remoteBytesOutRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+
+ }
+
+ @Test
+ void testMetricsUpdateOnCopyLogSegmentsFailure() throws Exception {
+ long oldSegmentStartOffset = 0L;
+ long nextSegmentStartOffset = 150L;
+
+ when(mockLog.topicPartition()).thenReturn(leaderTopicIdPartition.topicPartition());
+
+ // leader epoch preparation
+ checkpoint.write(totalEpochEntries);
+ LeaderEpochFileCache cache = new LeaderEpochFileCache(leaderTopicIdPartition.topicPartition(), checkpoint);
+ when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
+ when(remoteLogMetadataManager.highestOffsetForEpoch(any(TopicIdPartition.class), anyInt())).thenReturn(Optional.of(0L));
+
+ File tempFile = TestUtils.tempFile();
+ File mockProducerSnapshotIndex = TestUtils.tempFile();
+ File tempDir = TestUtils.tempDirectory();
+ // create 2 log segments, with 0 and 150 as log start offset
+ LogSegment oldSegment = mock(LogSegment.class);
+ LogSegment activeSegment = mock(LogSegment.class);
+
+ when(oldSegment.baseOffset()).thenReturn(oldSegmentStartOffset);
+ when(activeSegment.baseOffset()).thenReturn(nextSegmentStartOffset);
+
+ FileRecords fileRecords = mock(FileRecords.class);
+ when(oldSegment.log()).thenReturn(fileRecords);
+ when(fileRecords.file()).thenReturn(tempFile);
+ when(fileRecords.sizeInBytes()).thenReturn(10);
+ when(oldSegment.readNextOffset()).thenReturn(nextSegmentStartOffset);
+
+ when(mockLog.activeSegment()).thenReturn(activeSegment);
+ when(mockLog.logStartOffset()).thenReturn(oldSegmentStartOffset);
+ when(mockLog.logSegments(anyLong(), anyLong())).thenReturn(JavaConverters.collectionAsScalaIterable(Arrays.asList(oldSegment, activeSegment)));
+
+ ProducerStateManager mockStateManager = mock(ProducerStateManager.class);
+ when(mockLog.producerStateManager()).thenReturn(mockStateManager);
+ when(mockStateManager.fetchSnapshot(anyLong())).thenReturn(Optional.of(mockProducerSnapshotIndex));
+ when(mockLog.lastStableOffset()).thenReturn(250L);
+
+ LazyIndex idx = LazyIndex.forOffset(UnifiedLog.offsetIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1000);
+ LazyIndex timeIdx = LazyIndex.forTime(UnifiedLog.timeIndexFile(tempDir, oldSegmentStartOffset, ""), oldSegmentStartOffset, 1500);
+ File txnFile = UnifiedLog.transactionIndexFile(tempDir, oldSegmentStartOffset, "");
+ txnFile.createNewFile();
+ TransactionIndex txnIndex = new TransactionIndex(oldSegmentStartOffset, txnFile);
+ when(oldSegment.lazyTimeIndex()).thenReturn(timeIdx);
+ when(oldSegment.lazyOffsetIndex()).thenReturn(idx);
+ when(oldSegment.txnIndex()).thenReturn(txnIndex);
+
+ CompletableFuture<Void> dummyFuture = new CompletableFuture<>();
+ dummyFuture.complete(null);
+ when(remoteLogMetadataManager.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenReturn(dummyFuture);
+ doThrow(new RuntimeException()).when(remoteStorageManager).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+
+ // Verify the metrics for remote write requests/failures is zero before attempt to copy log segment
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+ assertEquals(0, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(0, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
+ RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
+ task.convertToLeader(2);
+ task.copyLogSegmentsToRemote(mockLog);
+
+ // Verify we attempted to copy log segment metadata to remote storage
+ verify(remoteStorageManager, times(1)).copyLogSegmentData(any(RemoteLogSegmentMetadata.class), any(LogSegmentData.class));
+
+ // Verify we should not have updated the highest offset because of write failure
+ verify(mockLog, times(0)).updateHighestOffsetInRemoteStorage(anyLong());
+ // Verify the metric for remote write requests/failures was updated.
+ assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).remoteWriteRequestRate().count());
+ assertEquals(1, brokerTopicStats.topicStats(leaderTopicIdPartition.topic()).failedRemoteWriteRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1, brokerTopicStats.allTopicsStats().remoteWriteRequestRate().count());
+ assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteWriteRequestRate().count());
}
@Test
@@ -412,7 +525,7 @@ public class RemoteLogManagerTest {
void testGetClassLoaderAwareRemoteStorageManager() throws Exception {
ClassLoaderAwareRemoteStorageManager rsmManager = mock(ClassLoaderAwareRemoteStorageManager.class);
RemoteLogManager remoteLogManager =
- new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty()) {
+ new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId, time, t -> Optional.empty(), brokerTopicStats) {
public RemoteStorageManager createRemoteStorageManager() {
return rsmManager;
}
@@ -589,6 +702,47 @@ public class RemoteLogManagerTest {
inorder.verify(remoteLogMetadataManager, times(1)).close();
}
+ @Test
+ public void testRemoveMetricsOnClose() throws IOException {
+ MockedConstruction<KafkaMetricsGroup> mockMetricsGroupCtor = mockConstruction(KafkaMetricsGroup.class);
+ try {
+ RemoteLogManager remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir, clusterId,
+ time, tp -> Optional.of(mockLog), brokerTopicStats) {
+ public RemoteStorageManager createRemoteStorageManager() {
+ return remoteStorageManager;
+ }
+
+ public RemoteLogMetadataManager createRemoteLogMetadataManager() {
+ return remoteLogMetadataManager;
+ }
+ };
+ // Close RemoteLogManager so that metrics are removed
+ remoteLogManager.close();
+
+ KafkaMetricsGroup mockRlmMetricsGroup = mockMetricsGroupCtor.constructed().get(0);
+ KafkaMetricsGroup mockThreadPoolMetricsGroup = mockMetricsGroupCtor.constructed().get(1);
+
+ List<String> remoteLogManagerMetricNames = Collections.singletonList(REMOTE_LOG_MANAGER_TASKS_AVG_IDLE_PERCENT);
+ List<String> remoteStorageThreadPoolMetricNames = RemoteStorageThreadPool.METRIC_SUFFIXES
+ .stream()
+ .map(suffix -> REMOTE_LOG_READER_METRICS_NAME_PREFIX + suffix)
+ .collect(Collectors.toList());
+
+ verify(mockRlmMetricsGroup, times(remoteLogManagerMetricNames.size())).newGauge(anyString(), any());
+ // Verify that the RemoteLogManager metrics are removed
+ remoteLogManagerMetricNames.forEach(metricName -> verify(mockRlmMetricsGroup).removeMetric(metricName));
+
+ verify(mockThreadPoolMetricsGroup, times(remoteStorageThreadPoolMetricNames.size())).newGauge(anyString(), any());
+ // Verify that the RemoteStorageThreadPool metrics are removed
+ remoteStorageThreadPoolMetricNames.forEach(metricName -> verify(mockThreadPoolMetricsGroup).removeMetric(metricName));
+
+ verifyNoMoreInteractions(mockRlmMetricsGroup);
+ verifyNoMoreInteractions(mockThreadPoolMetricsGroup);
+ } finally {
+ mockMetricsGroupCtor.close();
+ }
+ }
+
private Partition mockPartition(TopicIdPartition topicIdPartition) {
TopicPartition tp = topicIdPartition.topicPartition();
Partition partition = mock(Partition.class);
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
index aa8dd042a13..36533fed22d 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -16,14 +16,16 @@
*/
package kafka.log.remote;
+import kafka.server.BrokerTopicStats;
+import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.storage.internals.log.FetchDataInfo;
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -40,18 +42,26 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class RemoteLogReaderTest {
+ public static final String TOPIC = "test";
RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+ BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
Records records = mock(Records.class);
+ @BeforeEach
+ public void setUp() {
+ TestUtils.clearYammerMetrics();
+ }
+
@Test
public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOException {
FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, records);
+ when(records.sizeInBytes()).thenReturn(100);
when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
- RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
- RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
+ RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
remoteLogReader.call();
// verify the callback did get invoked with the expected remoteLogReadResult
@@ -61,15 +71,24 @@ public class RemoteLogReaderTest {
assertFalse(actualRemoteLogReadResult.error.isPresent());
assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());
+
+ // Verify metrics for remote reads are updated correctly
+ assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
+ assertEquals(100, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
+ assertEquals(0, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count());
+ assertEquals(100, brokerTopicStats.allTopicsStats().remoteBytesInRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count());
}
@Test
public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException {
- when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new OffsetOutOfRangeException("error"));
+ when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new RuntimeException("error"));
Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
- RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
- RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+ RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition(TOPIC, 0), null, null, false);
+ RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback, brokerTopicStats);
remoteLogReader.call();
// verify the callback did get invoked with the expected remoteLogReadResult
@@ -78,5 +97,14 @@ public class RemoteLogReaderTest {
RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue();
assertTrue(actualRemoteLogReadResult.error.isPresent());
assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+
+ // Verify metrics for remote reads are updated correctly
+ assertEquals(1, brokerTopicStats.topicStats(TOPIC).remoteReadRequestRate().count());
+ assertEquals(0, brokerTopicStats.topicStats(TOPIC).remoteBytesInRate().count());
+ assertEquals(1, brokerTopicStats.topicStats(TOPIC).failedRemoteReadRequestRate().count());
+ // Verify aggregate metrics
+ assertEquals(1, brokerTopicStats.allTopicsStats().remoteReadRequestRate().count());
+ assertEquals(0, brokerTopicStats.allTopicsStats().remoteBytesInRate().count());
+ assertEquals(1, brokerTopicStats.allTopicsStats().failedRemoteReadRequestRate().count());
}
}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
index 2b7ae15b154..22807a0271b 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -16,11 +16,17 @@
*/
package org.apache.kafka.storage.internals.log;
+import com.yammer.metrics.core.Gauge;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
import org.slf4j.Logger;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
@@ -28,11 +34,17 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+ public static final String TASK_QUEUE_SIZE = "TaskQueueSize";
+ public static final String AVG_IDLE_PERCENT = "AvgIdlePercent";
+ public static final Set<String> METRIC_SUFFIXES = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(TASK_QUEUE_SIZE, AVG_IDLE_PERCENT)));
private final Logger logger;
+ private final String metricsNamePrefix;
+ private final KafkaMetricsGroup metricsGroup = new KafkaMetricsGroup(this.getClass());
public RemoteStorageThreadPool(String threadNamePrefix,
int numThreads,
- int maxPendingTasks) {
+ int maxPendingTasks,
+ String metricsNamePrefix) {
super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
new RemoteStorageThreadFactory(threadNamePrefix));
logger = new LogContext() {
@@ -41,6 +53,20 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
return "[" + Thread.currentThread().getName() + "]";
}
}.logger(RemoteStorageThreadPool.class);
+
+ this.metricsNamePrefix = metricsNamePrefix;
+ metricsGroup.newGauge(metricsNamePrefix.concat(TASK_QUEUE_SIZE), new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return RemoteStorageThreadPool.this.getQueue().size();
+ }
+ });
+ metricsGroup.newGauge(metricsNamePrefix.concat(AVG_IDLE_PERCENT), new Gauge<Double>() {
+ @Override
+ public Double value() {
+ return 1 - (double) RemoteStorageThreadPool.this.getActiveCount() / (double) RemoteStorageThreadPool.this.getCorePoolSize();
+ }
+ });
}
@Override
@@ -70,4 +96,8 @@ public class RemoteStorageThreadPool extends ThreadPoolExecutor {
}
}
+
+ public void removeMetrics() {
+ METRIC_SUFFIXES.forEach(metric -> metricsGroup.removeMetric(metricsNamePrefix.concat(metric)));
+ }
}