You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2019/09/27 15:37:01 UTC
[incubator-ratis] branch master updated: RATIS-647. Create metrics
associated with RaftLog for RaftServer. Contributed by Aravindan Vijayan.
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new cff085c RATIS-647. Create metrics associated with RaftLog for RaftServer. Contributed by Aravindan Vijayan.
cff085c is described below
commit cff085c4ff7ad49dd65f8e77549494584b3ebc51
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Fri Sep 27 21:05:01 2019 +0530
RATIS-647. Create metrics associated with RaftLog for RaftServer. Contributed by Aravindan Vijayan.
---
.../ratis/server/metrics/RaftLogMetrics.java | 115 +++++++++++++++++++++
.../ratis/server/metrics/RatisMetricNames.java | 59 ++++++++++-
.../apache/ratis/server/metrics/RatisMetrics.java | 29 +++---
.../ratis/server/raftlog/segmented/LogSegment.java | 42 +++++---
.../server/raftlog/segmented/SegmentedRaftLog.java | 28 ++++-
.../raftlog/segmented/SegmentedRaftLogCache.java | 12 ++-
.../segmented/SegmentedRaftLogInputStream.java | 12 ++-
.../raftlog/segmented/SegmentedRaftLogReader.java | 15 ++-
.../raftlog/segmented/SegmentedRaftLogWorker.java | 43 ++++----
.../java/org/apache/ratis/LogAppenderTests.java | 2 +-
.../ratis/server/storage/RaftStorageTestUtils.java | 8 +-
.../ratis/TestRaftServerSlownessDetection.java | 2 +-
.../ratis/server/raftlog/TestRaftLogMetrics.java | 58 ++++++++++-
.../raftlog/segmented/TestCacheEviction.java | 2 +-
.../server/raftlog/segmented/TestLogSegment.java | 34 ++++--
.../raftlog/segmented/TestSegmentedRaftLog.java | 13 +++
.../segmented/TestSegmentedRaftLogCache.java | 2 +-
17 files changed, 402 insertions(+), 74 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
new file mode 100644
index 0000000..dde0d2b
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_APPEND_ENTRY_LATENCY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_CACHE_HIT_COUNT;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_CACHE_MISS_COUNT;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_DATA_QUEUE_SIZE;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_FLUSH_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_LOAD_SEGMENT_LATENCY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_READ_ENTRY_LATENCY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_SYNC_BATCH_SIZE;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_SYNC_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_TASK_ENQUEUE_DELAY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_TASK_EXECUTION_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_TASK_QUEUE_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_WORKER_QUEUE_SIZE;
+
+import java.util.Queue;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.util.DataQueue;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+public class RaftLogMetrics {
+
+ private RatisMetricRegistry registry = null;
+
+ RaftLogMetrics(RatisMetricRegistry ratisMetricRegistry) {
+ this.registry = ratisMetricRegistry;
+ }
+
+ public RatisMetricRegistry getRegistry() {
+ return registry;
+ }
+
+ public void addDataQueueSizeGauge(DataQueue queue) {
+ registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> () -> {
+ //q.size() is O(1) operation
+ return queue.size();
+ });
+ }
+
+ public void addLogWorkerQueueSizeGauge(Queue queue) {
+ registry.gauge(RAFT_LOG_WORKER_QUEUE_SIZE, () -> () -> queue.size());
+ }
+
+ public void addFlushBatchSizeGauge(MetricRegistry.MetricSupplier<Gauge> supplier) {
+ registry.gauge(RAFT_LOG_SYNC_BATCH_SIZE, supplier);
+ }
+
+ private Timer getTimer(String timerName) {
+ return registry.timer(timerName);
+ }
+
+ public Timer getFlushTimer() {
+ return getTimer(RAFT_LOG_FLUSH_TIME);
+ }
+
+ public Timer getRaftLogSyncTimer() {
+ return getTimer(RAFT_LOG_SYNC_TIME);
+ }
+
+ public void onRaftLogCacheHit() {
+ registry.counter(RAFT_LOG_CACHE_HIT_COUNT).inc();
+ }
+
+ public void onRaftLogCacheMiss() {
+ registry.counter(RAFT_LOG_CACHE_MISS_COUNT).inc();
+ }
+
+ public Timer getRaftLogAppendEntryTimer() {
+ return getTimer(RAFT_LOG_APPEND_ENTRY_LATENCY);
+ }
+
+ public Timer getRaftLogQueueTimer() {
+ return getTimer(RAFT_LOG_TASK_QUEUE_TIME);
+ }
+
+ public Timer getRaftLogEnqueueDelayTimer() {
+ return getTimer(RAFT_LOG_TASK_ENQUEUE_DELAY);
+ }
+
+ public Timer getRaftLogTaskExecutionTimer(String taskName) {
+ return getTimer(taskName + RAFT_LOG_TASK_EXECUTION_TIME);
+ }
+
+ public Timer getRaftLogReadEntryTimer() {
+ return getTimer(RAFT_LOG_READ_ENTRY_LATENCY);
+ }
+
+ public Timer getRaftLogLoadSegmentTimer() {
+ return getTimer(RAFT_LOG_LOAD_SEGMENT_LATENCY);
+ }
+
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index c8da330..65b8298 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -23,15 +23,64 @@ public final class RatisMetricNames {
private RatisMetricNames() {
}
- public static final String LEADER_ELECTION_COUNT_METRIC = "leader_election_count";
- public static final String LEADER_ELECTION_TIMEOUT_COUNT_METRIC = "leader_election_timeout_count";
- public static final String LEADER_ELECTION_LATENCY = "leader_election_latency";
- public static final String LAST_LEADER_ELAPSED_TIME = "last_leader_elapsed_time";
+ public static final String LEADER_ELECTION_COUNT_METRIC = "leaderElectionCount";
+ public static final String LEADER_ELECTION_TIMEOUT_COUNT_METRIC = "leaderElectionTimeoutCount";
+ public static final String LEADER_ELECTION_LATENCY = "leaderElectionLatency";
+ public static final String LAST_LEADER_ELAPSED_TIME = "lastLeaderElapsedTime";
- public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC = "follower_%s_last_heartbeat_elapsed_time";
+ public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC = "follower-%s-lastHeartbeatElapsedTime";
public static final String STATEMACHINE_APPLIED_INDEX_GAUGE =
"statemachine_applied_index";
public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE =
"statemachine_apply_completed_index";
+
+ //////////////////////////////
+ // Raft Log Write Path Metrics
+ /////////////////////////////
+
+ // Time taken to flush log.
+ public static final String RAFT_LOG_FLUSH_TIME = "flushTime";
+
+ // Time taken to sync raft log.
+ public static final String RAFT_LOG_SYNC_TIME = "syncTime";
+
+ // Raft log data queue size which at any time gives the number of raft log related operations in the queue.
+ public static final String RAFT_LOG_DATA_QUEUE_SIZE = "dataQueueSize";
+
+ // Raft log worker queue size which at any time gives number of committed entries that are to be synced.
+ public static final String RAFT_LOG_WORKER_QUEUE_SIZE = "workerQueueSize";
+
+ // No. of raft log entries synced with each flush call
+ public static final String RAFT_LOG_SYNC_BATCH_SIZE = "syncBatchSize";
+
+ // Count of RaftLogCache Misses
+ public static final String RAFT_LOG_CACHE_MISS_COUNT = "cacheMissCount";
+
+ // Count of RaftLogCache Hits
+ public static final String RAFT_LOG_CACHE_HIT_COUNT = "cacheHitCount";
+
+ // Total time taken to append a raft log entry
+ public static final String RAFT_LOG_APPEND_ENTRY_LATENCY = "appendEntryLatency";
+
+ // Time spent by a Raft log operation in the queue.
+ public static final String RAFT_LOG_TASK_QUEUE_TIME = "enqueuedTime";
+
+ // Time taken for a Raft log operation to get into the queue after being requested. This will be the time it has to
+ // wait for the queue to be non-full.
+ public static final String RAFT_LOG_TASK_ENQUEUE_DELAY = "queueingDelay";
+
+ // Time taken for a Raft log operation to complete execution.
+ public static final String RAFT_LOG_TASK_EXECUTION_TIME = "ExecutionTime";
+
+ //////////////////////////////
+ // Raft Log Read Path Metrics
+ /////////////////////////////
+
+ // Time required to read a raft log entry from actual raft log file and create a raft log entry
+ public static final String RAFT_LOG_READ_ENTRY_LATENCY = "readEntryLatency";
+
+ // Time required to load and process raft log segments during restart
+ public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY = "segmentLoadLatency";
+
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 674a732..950f43c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -40,18 +40,6 @@ public class RatisMetrics {
static MetricsReporting metricsReporting = new MetricsReporting(500, TimeUnit.MILLISECONDS);
- public static RatisMetricRegistry createMetricRegistryForLogWorker(String name) {
- return create(
- new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS,
- RATIS_LOG_WORKER_METRICS_DESC));
- }
-
- public static RatisMetricRegistry getMetricRegistryForLogWorker(String name) {
- return MetricRegistries.global().get(
- new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS,
- RATIS_LOG_WORKER_METRICS_DESC)).get();
- }
-
private static RatisMetricRegistry create(MetricRegistryInfo info) {
Optional<RatisMetricRegistry> metricRegistry = MetricRegistries.global().get(info);
if (metricRegistry.isPresent()) {
@@ -82,4 +70,21 @@ public class RatisMetrics {
return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS,
RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC));
}
+
+ public static RaftLogMetrics createMetricRegistryForLogWorker(String name) {
+ RatisMetricRegistry ratisMetricRegistry = getMetricRegistryForLogWorker(name);
+ if (ratisMetricRegistry == null) {
+ ratisMetricRegistry = create(new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS,
+ RATIS_LOG_WORKER_METRICS, RATIS_LOG_WORKER_METRICS_DESC));
+ }
+ return new RaftLogMetrics(ratisMetricRegistry);
+ }
+
+ public static RatisMetricRegistry getMetricRegistryForLogWorker(String name) {
+ Optional<RatisMetricRegistry> ratisMetricRegistry = MetricRegistries.global().get(
+ new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS,
+ RATIS_LOG_WORKER_METRICS_DESC));
+ return ratisMetricRegistry.orElse(null);
+ }
+
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 12bd195..26b070a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
@@ -76,22 +77,24 @@ class LogSegment implements Comparable<Long> {
}
}
- static LogSegment newOpenSegment(RaftStorage storage, long start) {
+ static LogSegment newOpenSegment(RaftStorage storage, long start, RaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
- return new LogSegment(storage, true, start, start - 1);
+ return new LogSegment(storage, true, start, start - 1, raftLogMetrics);
}
@VisibleForTesting
- static LogSegment newCloseSegment(RaftStorage storage, long start, long end) {
+ static LogSegment newCloseSegment(RaftStorage storage,
+ long start, long end, RaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0 && end >= start);
- return new LogSegment(storage, false, start, end);
+ return new LogSegment(storage, false, start, end, raftLogMetrics);
}
- private static int readSegmentFile(File file, long start, long end, boolean isOpen,
- CorruptionPolicy corruptionPolicy, Consumer<LogEntryProto> entryConsumer)
- throws IOException {
+ private static int readSegmentFile(File file, long start, long end,
+ boolean isOpen, CorruptionPolicy corruptionPolicy,
+ RaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer) throws
+ IOException {
int count = 0;
- try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen)) {
+ try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) {
for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
if (prev != null) {
Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
@@ -119,15 +122,16 @@ class LogSegment implements Comparable<Long> {
return count;
}
+ @SuppressWarnings("parameternumber")
static LogSegment loadSegment(RaftStorage storage, File file, long start, long end, boolean isOpen,
- boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer)
+ boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, RaftLogMetrics raftLogMetrics)
throws IOException {
final LogSegment segment = isOpen ?
- LogSegment.newOpenSegment(storage, start) :
- LogSegment.newCloseSegment(storage, start, end);
+ LogSegment.newOpenSegment(storage, start, raftLogMetrics) :
+ LogSegment.newCloseSegment(storage, start, end, raftLogMetrics);
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
- final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, entry -> {
+ final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, raftLogMetrics, entry -> {
segment.append(keepEntryInCache || isOpen, entry);
if (logConsumer != null) {
logConsumer.accept(entry);
@@ -162,12 +166,18 @@ class LogSegment implements Comparable<Long> {
* In the future we can make the cache loader configurable if necessary.
*/
class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> {
+ private RaftLogMetrics raftLogMetrics;
+
+ LogEntryLoader(RaftLogMetrics raftLogMetrics) {
+ this.raftLogMetrics = raftLogMetrics;
+ }
+
@Override
public LogEntryProto load(LogRecord key) throws IOException {
final File file = getSegmentFile();
// note the loading should not exceed the endIndex: it is possible that
// the on-disk log file should be truncated but has not been done yet.
- readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(),
+ readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics,
entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
loadingTimes.incrementAndGet();
return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
@@ -185,7 +195,8 @@ class LogSegment implements Comparable<Long> {
private final long startIndex;
private volatile long endIndex;
private final RaftStorage storage;
- private final LogEntryLoader cacheLoader = new LogEntryLoader();
+ private RaftLogMetrics raftLogMetrics;
+ private final LogEntryLoader cacheLoader = new LogEntryLoader(raftLogMetrics);
/** later replace it with a metric */
private final AtomicInteger loadingTimes = new AtomicInteger();
@@ -199,11 +210,12 @@ class LogSegment implements Comparable<Long> {
private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
private final Set<TermIndex> configEntries = new HashSet<>();
- private LogSegment(RaftStorage storage, boolean isOpen, long start, long end) {
+ private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, RaftLogMetrics raftLogMetrics) {
this.storage = storage;
this.isOpen = isOpen;
this.startIndex = start;
this.endIndex = end;
+ this.raftLogMetrics = raftLogMetrics;
}
long getStartIndex() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index fbe8529..a8c12c8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -23,6 +23,8 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -47,6 +49,8 @@ import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import com.codahale.metrics.Timer;
+
/**
* The RaftLog implementation that writes log entries into segmented files in
* local disk.
@@ -79,6 +83,7 @@ public class SegmentedRaftLog extends RaftLog {
*/
abstract static class Task {
private final CompletableFuture<Long> future = new CompletableFuture<>();
+ private Timer.Context queueTimerContext;
CompletableFuture<Long> getFuture() {
return future;
@@ -102,6 +107,16 @@ public class SegmentedRaftLog extends RaftLog {
abstract long getEndIndex();
+ void startTimerOnEnqueue(Timer queueTimer) {
+ queueTimerContext = queueTimer.time();
+ }
+
+ void stopTimerOnDequeue() {
+ if (queueTimerContext != null) {
+ queueTimerContext.stop();
+ }
+ }
+
int getSerializedSize() {
return 0;
}
@@ -177,6 +192,7 @@ public class SegmentedRaftLog extends RaftLog {
private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
+ private final RaftLogMetrics metricRegistry;
public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
@@ -193,9 +209,10 @@ public class SegmentedRaftLog extends RaftLog {
this.storage = storage;
this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- this.cache = new SegmentedRaftLogCache(memberId, storage, properties);
+ this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
+ this.cache = new SegmentedRaftLogCache(memberId, storage, properties, metricRegistry);
this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
- submitUpdateCommitEvent, server, storage, properties);
+ submitUpdateCommitEvent, server, storage, properties, metricRegistry);
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
}
@@ -231,7 +248,9 @@ public class SegmentedRaftLog extends RaftLog {
// so that during the initial loading we can apply part of the log
// entries to the state machine
boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
+ final Timer.Context loadSegmentContext = metricRegistry.getRaftLogLoadSegmentTimer().time();
cache.loadSegment(pi, keepEntryInCache, logConsumer);
+ loadSegmentContext.stop();
}
// if the largest index is smaller than the last index in snapshot, we do
@@ -263,11 +282,13 @@ public class SegmentedRaftLog extends RaftLog {
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
+ metricRegistry.onRaftLogCacheHit();
return entry;
}
}
// the entry is not in the segment's cache. Load the cache without holding the lock.
+ metricRegistry.onRaftLogCacheMiss();
checkAndEvictCache();
return segment.loadCache(record);
}
@@ -356,6 +377,7 @@ public class SegmentedRaftLog extends RaftLog {
@Override
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+ final Timer.Context context = metricRegistry.getRaftLogAppendEntryTimer().time();
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(), ServerProtoUtils.toLogEntryString(entry));
@@ -397,6 +419,8 @@ public class SegmentedRaftLog extends RaftLog {
} catch (Throwable throwable) {
LOG.error("{}: Failed to append {}", getName(), ServerProtoUtils.toLogEntryString(entry), throwable);
throw throwable;
+ } finally {
+ context.stop();
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 58f5093..dced8ed 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
@@ -290,15 +291,22 @@ class SegmentedRaftLogCache {
private volatile LogSegment openSegment;
private final LogSegmentList closedSegments;
private final RaftStorage storage;
+ private final RaftLogMetrics raftLogMetrics;
private final int maxCachedSegments;
private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties) {
+ this(name, storage, properties, null);
+ }
+
+ SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties,
+ RaftLogMetrics raftLogMetrics) {
this.name = name + "-" + getClass().getSimpleName();
this.closedSegments = new LogSegmentList(name);
this.storage = storage;
maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
+ this.raftLogMetrics = raftLogMetrics;
}
int getMaxCachedSegments() {
@@ -308,7 +316,7 @@ class SegmentedRaftLogCache {
void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache,
Consumer<LogEntryProto> logConsumer) throws IOException {
LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(),
- pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, logConsumer);
+ pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, logConsumer, raftLogMetrics);
if (logSegment != null) {
addSegment(logSegment);
}
@@ -350,7 +358,7 @@ class SegmentedRaftLogCache {
}
void addOpenSegment(long startIndex) {
- setOpenSegment(LogSegment.newOpenSegment(storage, startIndex));
+ setOpenSegment(LogSegment.newOpenSegment(storage, startIndex,raftLogMetrics));
}
private void setOpenSegment(LogSegment openSegment) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index c0f9e17..1db790f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -26,6 +26,7 @@ import java.io.IOException;
import java.util.Optional;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
@@ -65,8 +66,14 @@ public class SegmentedRaftLogInputStream implements Closeable {
private final boolean isOpen;
private final OpenCloseState state;
private SegmentedRaftLogReader reader;
+ private RaftLogMetrics raftLogMetrics;
public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen) {
+ this(log, startIndex, endIndex, isOpen, null);
+ }
+
+ SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen,
+ RaftLogMetrics raftLogMetrics) {
if (isOpen) {
Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
} else {
@@ -78,12 +85,13 @@ public class SegmentedRaftLogInputStream implements Closeable {
this.endIndex = endIndex;
this.isOpen = isOpen;
this.state = new OpenCloseState(getName());
+ this.raftLogMetrics = raftLogMetrics;
}
private void init() throws IOException {
state.open();
try {
- final SegmentedRaftLogReader r = new SegmentedRaftLogReader(logFile);
+ final SegmentedRaftLogReader r = new SegmentedRaftLogReader(logFile, raftLogMetrics);
if (r.verifyHeader()) {
reader = r;
}
@@ -184,7 +192,7 @@ public class SegmentedRaftLogInputStream implements Closeable {
throws IOException {
SegmentedRaftLogInputStream in;
try {
- in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false);
+ in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, null);
// read the header, initialize the inputstream
in.init();
} catch (EOFException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index f90d4c6..5caec22 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.io.CorruptedFileException;
import org.apache.ratis.protocol.ChecksumException;
import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -33,6 +34,8 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.zip.Checksum;
+import com.codahale.metrics.Timer;
+
class SegmentedRaftLogReader implements Closeable {
static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogReader.class);
/**
@@ -130,13 +133,15 @@ class SegmentedRaftLogReader implements Closeable {
private final DataInputStream in;
private byte[] temp = new byte[4096];
private final Checksum checksum;
+ private final RaftLogMetrics raftLogMetrics;
- SegmentedRaftLogReader(File file) throws FileNotFoundException {
+ SegmentedRaftLogReader(File file, RaftLogMetrics raftLogMetrics) throws FileNotFoundException {
this.file = file;
this.limiter = new LimitedInputStream(
new BufferedInputStream(new FileInputStream(file)));
in = new DataInputStream(limiter);
checksum = new PureJavaCrc32C();
+ this.raftLogMetrics = raftLogMetrics;
}
/**
@@ -181,7 +186,11 @@ class SegmentedRaftLogReader implements Closeable {
* exception when skipBrokenEdits is false.
*/
LogEntryProto readEntry() throws IOException {
+ Timer.Context readEntryContext = null;
try {
+ if (raftLogMetrics != null) {
+ readEntryContext = raftLogMetrics.getRaftLogReadEntryTimer().time();
+ }
return decodeEntry();
} catch (EOFException eof) {
in.reset();
@@ -202,6 +211,10 @@ class SegmentedRaftLogReader implements Closeable {
// broken, throw the exception instead of skipping broken entries
in.reset();
throw new IOException("got unexpected exception " + e.getMessage(), e);
+ } finally {
+ if (readEntryContext != null) {
+ readEntryContext.stop();
+ }
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index ec7ab73..dcd4adb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -18,17 +18,14 @@
package org.apache.ratis.server.raftlog.segmented;
import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.raftlog.RaftLogIndex;
@@ -61,7 +58,6 @@ class SegmentedRaftLogWorker implements Runnable {
static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
- private final RatisMetricRegistry metricRegistry;
static class StateMachineDataPolicy {
private final boolean sync;
@@ -146,6 +142,10 @@ class SegmentedRaftLogWorker implements Runnable {
private final Runnable submitUpdateCommitEvent;
private final StateMachine stateMachine;
private final Timer logFlushTimer;
+ private final Timer raftLogSyncTimer;
+ private final Timer raftLogQueueingTimer;
+ private final Timer raftLogEnqueueingDelayTimer;
+ private final RaftLogMetrics raftLogMetrics;
/**
* The number of entries that have been written into the SegmentedRaftLogOutputStream but
@@ -167,13 +167,14 @@ class SegmentedRaftLogWorker implements Runnable {
private final StateMachineDataPolicy stateMachineDataPolicy;
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
- RaftServerImpl server, RaftStorage storage, RaftProperties properties) {
+ RaftServerImpl server, RaftStorage storage, RaftProperties properties,
+ RaftLogMetrics metricRegistry) {
this.name = memberId + "-" + getClass().getSimpleName();
LOG.info("new {} for {}", name, storage);
this.submitUpdateCommitEvent = submitUpdateCommitEvent;
this.stateMachine = stateMachine;
- this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
+ this.raftLogMetrics = metricRegistry;
this.storage = storage;
this.server = server;
final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
@@ -181,17 +182,6 @@ class SegmentedRaftLogWorker implements Runnable {
this.queue =
new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit, Task::getSerializedSize);
- metricRegistry.gauge("dataQueueSize", new MetricRegistry.MetricSupplier() {
- @Override public Metric newMetric() {
- return new Gauge<Integer>() {
- @Override public Integer getValue() {
- //q.size() is O(1) operation
- return queue.size();
- }
- };
- }
- });
-
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
@@ -202,7 +192,13 @@ class SegmentedRaftLogWorker implements Runnable {
this.workerThread = new Thread(this, name);
// Server Id can be null in unit tests
- this.logFlushTimer = metricRegistry.timer("flush-time");
+ metricRegistry.addDataQueueSizeGauge(queue);
+ metricRegistry.addLogWorkerQueueSizeGauge(writeTasks.q);
+ metricRegistry.addFlushBatchSizeGauge(() -> (Gauge<Integer>) () -> pendingFlushNum);
+ this.logFlushTimer = metricRegistry.getFlushTimer();
+ this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
+ this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
+ this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
}
void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -250,10 +246,13 @@ class SegmentedRaftLogWorker implements Runnable {
private Task addIOTask(Task task) {
LOG.debug("{} adds IO task {}", name, task);
try {
+ final Timer.Context enqueueTimerContext = raftLogEnqueueingDelayTimer.time();
for(; !queue.offer(task, ONE_SECOND); ) {
Preconditions.assertTrue(isAlive(),
"the worker thread is not alive");
}
+ enqueueTimerContext.stop();
+ task.startTimerOnEnqueue(raftLogQueueingTimer);
} catch (Throwable t) {
if (t instanceof InterruptedException && !running) {
LOG.info("Got InterruptedException when adding task " + task
@@ -282,11 +281,15 @@ class SegmentedRaftLogWorker implements Runnable {
try {
Task task = queue.poll(ONE_SECOND);
if (task != null) {
+ task.stopTimerOnDequeue();
try {
if (logIOException != null) {
throw logIOException;
} else {
+ Timer.Context executionTimeContext =
+ raftLogMetrics.getRaftLogTaskExecutionTimer(task.getClass().getSimpleName().toLowerCase()).time();
task.execute();
+ executionTimeContext.stop();
}
} catch (IOException e) {
if (task.getEndIndex() < lastWrittenIndex) {
@@ -346,7 +349,9 @@ class SegmentedRaftLogWorker implements Runnable {
if (stateMachineDataPolicy.isSync()) {
stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
}
+ final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
out.flush();
+ logSyncTimerContext.stop();
if (!stateMachineDataPolicy.isSync()) {
IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 2bc73f2..af08a0c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -146,7 +146,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
// Get all last_heartbeat_elapsed_time metric gauges. Should be equal to number of followers.
SortedMap<String, Gauge> heartbeatElapsedTimeGauges = ratisMetricRegistry.getGauges((s, metric) ->
- s.contains("last_heartbeat_elapsed_time"));
+ s.contains("lastHeartbeatElapsedTime"));
assertTrue(heartbeatElapsedTimeGauges.size() == 2);
for (RaftServerImpl followerServer : cluster.getFollowers()) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index d5c5850..7ad23b7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.server.storage;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_FLUSH_TIME;
+
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RatisMetrics;
@@ -30,8 +32,12 @@ import java.util.function.Consumer;
public interface RaftStorageTestUtils {
static String getLogFlushTimeMetric(RaftPeerId serverId) {
+ return getRaftLogFullMetric(serverId, RAFT_LOG_FLUSH_TIME);
+ }
+
+ static String getRaftLogFullMetric(RaftPeerId serverId, String metricName) {
return RatisMetrics.RATIS_APPLICATION_NAME_METRICS + "." + RatisMetrics.RATIS_LOG_WORKER_METRICS
- + "." + serverId + ".flush-time";
+ + "." + serverId + "." + metricName;
}
static void printLog(RaftLog log, Consumer<String> println) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 63d9da6..e4918a4 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -92,7 +92,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
RatisMetricRegistry ratisMetricRegistry = RatisMetrics.getMetricRegistryForHeartbeat(
leaderServer.getMemberId().toString());
SortedMap<String, Gauge> heartbeatElapsedTimeGauges = ratisMetricRegistry.getGauges((s, metric) ->
- s.contains("last_heartbeat_elapsed_time"));
+ s.contains("lastHeartbeatElapsedTime"));
String followerId = failedFollower.getId().toString();
Gauge metric = heartbeatElapsedTimeGauges.entrySet().parallelStream().filter(e -> e.getKey().contains(
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 971a279..e3f16bc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -17,12 +17,16 @@
*/
package org.apache.ratis.server.raftlog;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_FLUSH_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_SYNC_TIME;
+
import com.codahale.metrics.Timer;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
@@ -72,14 +76,14 @@ public class TestRaftLogMetrics extends BaseTest
}
@Test
- public void testFlushMetric() throws Exception {
+ public void testRaftLogMetrics() throws Exception {
try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
cluster.start();
- runTestFlushMetric(cluster);
+ runTestRaftLogMetrics(cluster);
}
}
- static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception {
+ static void runTestRaftLogMetrics(MiniRaftCluster cluster) throws Exception {
int numMsg = 2;
final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMsg);
@@ -91,17 +95,20 @@ public class TestRaftLogMetrics extends BaseTest
// For leader, flush must happen before client can get replies.
assertFlushCount(cluster.getLeader());
+ assertRaftLogWritePathMetrics(cluster.getLeader());
// For followers, flush can be lagged behind. Attempt multiple times.
for(RaftServerImpl f : cluster.getFollowers()) {
JavaUtils.attempt(() -> assertFlushCount(f), 10, HUNDRED_MILLIS, f.getId() + "-assertFlushCount", null);
+ // We have already waited enough for follower metrics to populate.
+ assertRaftLogWritePathMetrics(f);
}
}
static void assertFlushCount(RaftServerImpl server) throws Exception {
final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(server.getId());
Timer tm = (Timer) RatisMetrics.getMetricRegistryForLogWorker(server.getId().toString())
- .get("flush-time");
+ .get(RAFT_LOG_FLUSH_TIME);
Assert.assertNotNull(tm);
final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
@@ -116,4 +123,45 @@ public class TestRaftLogMetrics extends BaseTest
((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
.intValue());
}
-}
+
+ static void assertRaftLogWritePathMetrics(RaftServerImpl server) throws Exception {
+ final String syncTimeMetric = RaftStorageTestUtils.getRaftLogFullMetric(server.getId(), RAFT_LOG_SYNC_TIME);
+ RatisMetricRegistry ratisMetricRegistry = RatisMetrics.getMetricRegistryForLogWorker(server.getId().toString());
+
+ //Test sync count
+ Timer tm = (Timer) ratisMetricRegistry.get(RAFT_LOG_SYNC_TIME);
+ Assert.assertNotNull(tm);
+ final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
+ final int expectedFlush = stateMachine.getFlushCount();
+ Assert.assertEquals(expectedFlush, tm.getCount()); // Ideally, flushCount should be same as syncCount.
+ Assert.assertTrue(tm.getMeanRate() > 0);
+
+ // Test jmx. Just testing one metric's JMX is good enough.
+ ObjectName oname = new ObjectName("ratis_core", "name", syncTimeMetric);
+ Assert.assertEquals(expectedFlush,
+ ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
+ .intValue());
+
+ long cacheMissCount = ratisMetricRegistry.counter("cacheMissCount").getCount();
+ Assert.assertTrue(cacheMissCount == 0);
+
+ long cacheHitsCount = ratisMetricRegistry.counter("cacheHitCount").getCount();
+ Assert.assertTrue(cacheHitsCount > 0);
+
+ Timer appendLatencyTimer = ratisMetricRegistry.timer("appendEntryLatency");
+ Assert.assertTrue(appendLatencyTimer.getMeanRate() > 0);
+
+ Timer enqueuedTimer = ratisMetricRegistry.timer("enqueuedTime");
+ Assert.assertTrue(enqueuedTimer.getMeanRate() > 0);
+
+ Timer queueingDelayTimer = ratisMetricRegistry.timer("queueingDelay");
+ Assert.assertTrue(queueingDelayTimer.getMeanRate() > 0);
+
+ Timer executionTimer = ratisMetricRegistry.timer("writelogExecutionTime");
+ Assert.assertTrue(executionTimer.getMeanRate() > 0);
+
+ Assert.assertNotNull(ratisMetricRegistry.get("dataQueueSize"));
+ Assert.assertNotNull(ratisMetricRegistry.get("workerQueueSize"));
+ Assert.assertNotNull(ratisMetricRegistry.get("syncBatchSize"));
+ }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index a249fe0..f498062 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -55,7 +55,7 @@ public class TestCacheEviction extends BaseTest {
Assert.assertEquals(numSegments, cached.length);
final LogSegmentList segments = new LogSegmentList(TestCacheEviction.class.getSimpleName());
for (int i = 0; i < numSegments; i++) {
- LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
+ LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, null);
if (cached[i]) {
s = Mockito.spy(s);
Mockito.when(s.hasCache()).thenReturn(true);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 3086e78..0e9eb9f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -23,6 +23,8 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
@@ -47,9 +49,11 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
import static org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize;
+import com.codahale.metrics.Timer;
+
/**
* Test basic functionality of {@link LogSegment}
*/
@@ -167,7 +171,7 @@ public class TestLogSegment extends BaseTest {
final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten);
RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
- INVALID_LOG_INDEX, true, loadInitial, null);
+ INVALID_LOG_INDEX, true, loadInitial, null, null);
final int delta = isLastEntryPartiallyWritten? 1: 0;
checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0);
storage.close();
@@ -177,7 +181,7 @@ public class TestLogSegment extends BaseTest {
// load a closed segment (1000-1099)
final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
- 1000, 1099, false, loadInitial, null);
+ 1000, 1099, false, loadInitial, null, null);
checkLogSegment(closedSegment, 1000, 1099, false,
closedSegment.getTotalSize(), 1);
Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
@@ -186,7 +190,7 @@ public class TestLogSegment extends BaseTest {
@Test
public void testAppendEntries() throws Exception {
final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(null, start);
+ LogSegment segment = LogSegment.newOpenSegment(null, start, null);
long size = SegmentedRaftLogFormat.getHeaderLength();
final long max = 8 * 1024 * 1024;
checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -206,8 +210,26 @@ public class TestLogSegment extends BaseTest {
}
@Test
+ public void testAppendEntryMetric() throws Exception {
+ RaftLogMetrics raftLogMetrics = RatisMetrics.createMetricRegistryForLogWorker("test");
+
+ final File openSegmentFile = prepareLog(true, 0, 100, 0, true);
+ RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+ LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
+ INVALID_LOG_INDEX, true, true, null, raftLogMetrics);
+ checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0);
+ storage.close();
+
+ Timer readEntryTimer = raftLogMetrics.getRaftLogReadEntryTimer();
+ Assert.assertNotNull(readEntryTimer);
+ Assert.assertEquals(100, readEntryTimer.getCount());
+ Assert.assertTrue(readEntryTimer.getMeanRate() > 0);
+ }
+
+
+ @Test
public void testAppendWithGap() throws Exception {
- LogSegment segment = LogSegment.newOpenSegment(null, 1000);
+ LogSegment segment = LogSegment.newOpenSegment(null, 1000, null);
SimpleOperation op = new SimpleOperation("m");
final StateMachineLogEntryProto m = op.getLogEntryContent();
try {
@@ -234,7 +256,7 @@ public class TestLogSegment extends BaseTest {
public void testTruncate() throws Exception {
final long term = 1;
final long start = 1000;
- LogSegment segment = LogSegment.newOpenSegment(null, start);
+ LogSegment segment = LogSegment.newOpenSegment(null, start, null);
for (int i = 0; i < 100; i++) {
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index fa89192..53cb364 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
@@ -31,6 +32,7 @@ import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.impl.RetryCache;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -67,6 +69,8 @@ import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.codahale.metrics.Timer;
+
public class TestSegmentedRaftLog extends BaseTest {
static {
LogUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.DEBUG);
@@ -193,6 +197,15 @@ public class TestSegmentedRaftLog extends BaseTest {
.toArray(LogEntryProto[]::new);
Assert.assertArrayEquals(entries, entriesFromLog);
Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
+
+ RatisMetricRegistry metricRegistryForLogWorker =
+ RatisMetrics.getMetricRegistryForLogWorker(memberId.getPeerId().toString());
+
+ Timer raftLogSegmentLoadLatencyTimer = metricRegistryForLogWorker.timer("segmentLoadLatency");
+ Assert.assertTrue(raftLogSegmentLoadLatencyTimer.getMeanRate() > 0);
+
+ Timer raftLogReadLatencyTimer = metricRegistryForLogWorker.timer("readEntryLatency");
+ Assert.assertTrue(raftLogReadLatencyTimer.getMeanRate() > 0);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 660c2f8..71c3dc6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -43,7 +43,7 @@ public class TestSegmentedRaftLogCache {
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
- LogSegment s = LogSegment.newOpenSegment(null, start);
+ LogSegment s = LogSegment.newOpenSegment(null, start, null);
for (long i = start; i <= end; i++) {
SimpleOperation m = new SimpleOperation("m" + i);
LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);