You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2019/09/27 15:37:01 UTC

[incubator-ratis] branch master updated: RATIS-647. Create metrics associated with RaftLog for RaftServer. Contributed by Aravindan Vijayan.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new cff085c  RATIS-647. Create metrics associated with RaftLog for RaftServer. Contributed by Aravindan Vijayan.
cff085c is described below

commit cff085c4ff7ad49dd65f8e77549494584b3ebc51
Author: Lokesh Jain <lj...@apache.org>
AuthorDate: Fri Sep 27 21:05:01 2019 +0530

    RATIS-647. Create metrics associated with RaftLog for RaftServer. Contributed by Aravindan Vijayan.
---
 .../ratis/server/metrics/RaftLogMetrics.java       | 115 +++++++++++++++++++++
 .../ratis/server/metrics/RatisMetricNames.java     |  59 ++++++++++-
 .../apache/ratis/server/metrics/RatisMetrics.java  |  29 +++---
 .../ratis/server/raftlog/segmented/LogSegment.java |  42 +++++---
 .../server/raftlog/segmented/SegmentedRaftLog.java |  28 ++++-
 .../raftlog/segmented/SegmentedRaftLogCache.java   |  12 ++-
 .../segmented/SegmentedRaftLogInputStream.java     |  12 ++-
 .../raftlog/segmented/SegmentedRaftLogReader.java  |  15 ++-
 .../raftlog/segmented/SegmentedRaftLogWorker.java  |  43 ++++----
 .../java/org/apache/ratis/LogAppenderTests.java    |   2 +-
 .../ratis/server/storage/RaftStorageTestUtils.java |   8 +-
 .../ratis/TestRaftServerSlownessDetection.java     |   2 +-
 .../ratis/server/raftlog/TestRaftLogMetrics.java   |  58 ++++++++++-
 .../raftlog/segmented/TestCacheEviction.java       |   2 +-
 .../server/raftlog/segmented/TestLogSegment.java   |  34 ++++--
 .../raftlog/segmented/TestSegmentedRaftLog.java    |  13 +++
 .../segmented/TestSegmentedRaftLogCache.java       |   2 +-
 17 files changed, 402 insertions(+), 74 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
new file mode 100644
index 0000000..dde0d2b
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.metrics;
+
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_APPEND_ENTRY_LATENCY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_CACHE_HIT_COUNT;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_CACHE_MISS_COUNT;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_DATA_QUEUE_SIZE;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_FLUSH_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_LOAD_SEGMENT_LATENCY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_READ_ENTRY_LATENCY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_SYNC_BATCH_SIZE;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_SYNC_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_TASK_ENQUEUE_DELAY;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_TASK_EXECUTION_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_TASK_QUEUE_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_WORKER_QUEUE_SIZE;
+
+import java.util.Queue;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.util.DataQueue;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+public class RaftLogMetrics {
+
+  private RatisMetricRegistry registry = null;
+
+  RaftLogMetrics(RatisMetricRegistry ratisMetricRegistry) {
+    this.registry = ratisMetricRegistry;
+  }
+
+  public RatisMetricRegistry getRegistry() {
+    return registry;
+  }
+
+  public void addDataQueueSizeGauge(DataQueue queue) {
+    registry.gauge(RAFT_LOG_DATA_QUEUE_SIZE, () -> () -> {
+      //q.size() is O(1) operation
+      return queue.size();
+    });
+  }
+
+  public void addLogWorkerQueueSizeGauge(Queue queue) {
+    registry.gauge(RAFT_LOG_WORKER_QUEUE_SIZE, () -> () -> queue.size());
+  }
+
+  public void addFlushBatchSizeGauge(MetricRegistry.MetricSupplier<Gauge> supplier) {
+    registry.gauge(RAFT_LOG_SYNC_BATCH_SIZE, supplier);
+  }
+
+  private Timer getTimer(String timerName) {
+    return registry.timer(timerName);
+  }
+
+  public Timer getFlushTimer() {
+    return getTimer(RAFT_LOG_FLUSH_TIME);
+  }
+
+  public Timer getRaftLogSyncTimer() {
+    return getTimer(RAFT_LOG_SYNC_TIME);
+  }
+
+  public void onRaftLogCacheHit() {
+    registry.counter(RAFT_LOG_CACHE_HIT_COUNT).inc();
+  }
+
+  public void onRaftLogCacheMiss() {
+    registry.counter(RAFT_LOG_CACHE_MISS_COUNT).inc();
+  }
+
+  public Timer getRaftLogAppendEntryTimer() {
+    return getTimer(RAFT_LOG_APPEND_ENTRY_LATENCY);
+  }
+
+  public Timer getRaftLogQueueTimer() {
+    return getTimer(RAFT_LOG_TASK_QUEUE_TIME);
+  }
+
+  public Timer getRaftLogEnqueueDelayTimer() {
+    return getTimer(RAFT_LOG_TASK_ENQUEUE_DELAY);
+  }
+
+  public Timer getRaftLogTaskExecutionTimer(String taskName) {
+    return getTimer(taskName + RAFT_LOG_TASK_EXECUTION_TIME);
+  }
+
+  public Timer getRaftLogReadEntryTimer() {
+    return getTimer(RAFT_LOG_READ_ENTRY_LATENCY);
+  }
+
+  public Timer getRaftLogLoadSegmentTimer() {
+    return getTimer(RAFT_LOG_LOAD_SEGMENT_LATENCY);
+  }
+
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index c8da330..65b8298 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -23,15 +23,64 @@ public final class RatisMetricNames {
   private RatisMetricNames() {
   }
 
-  public static final String LEADER_ELECTION_COUNT_METRIC = "leader_election_count";
-  public static final String LEADER_ELECTION_TIMEOUT_COUNT_METRIC = "leader_election_timeout_count";
-  public static final String LEADER_ELECTION_LATENCY = "leader_election_latency";
-  public static final String LAST_LEADER_ELAPSED_TIME = "last_leader_elapsed_time";
+  public static final String LEADER_ELECTION_COUNT_METRIC = "leaderElectionCount";
+  public static final String LEADER_ELECTION_TIMEOUT_COUNT_METRIC = "leaderElectionTimeoutCount";
+  public static final String LEADER_ELECTION_LATENCY = "leaderElectionLatency";
+  public static final String LAST_LEADER_ELAPSED_TIME = "lastLeaderElapsedTime";
 
-  public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC = "follower_%s_last_heartbeat_elapsed_time";
+  public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC = "follower-%s-lastHeartbeatElapsedTime";
 
   public static final String STATEMACHINE_APPLIED_INDEX_GAUGE =
       "statemachine_applied_index";
   public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE =
       "statemachine_apply_completed_index";
+
+  //////////////////////////////
+  // Raft Log Write Path Metrics
+  /////////////////////////////
+
+  // Time taken to flush log.
+  public static final String RAFT_LOG_FLUSH_TIME = "flushTime";
+
+  // Time taken to sync raft log.
+  public static final String RAFT_LOG_SYNC_TIME = "syncTime";
+
+  // Raft log data queue size which at any time gives the number of raft log related operations in the queue.
+  public static final String RAFT_LOG_DATA_QUEUE_SIZE = "dataQueueSize";
+
+  // Raft log worker queue size which at any time gives number of committed entries that are to be synced.
+  public static final String RAFT_LOG_WORKER_QUEUE_SIZE = "workerQueueSize";
+
+  // No. of raft log entries synced with each flush call
+  public static final String RAFT_LOG_SYNC_BATCH_SIZE = "syncBatchSize";
+
+  // Count of RaftLogCache Misses
+  public static final String RAFT_LOG_CACHE_MISS_COUNT = "cacheMissCount";
+
+  // Count of RaftLogCache Hits
+  public static final String RAFT_LOG_CACHE_HIT_COUNT = "cacheHitCount";
+
+  // Total time taken to append a raft log entry
+  public static final String RAFT_LOG_APPEND_ENTRY_LATENCY = "appendEntryLatency";
+
+  // Time spent by a Raft log operation in the queue.
+  public static final String RAFT_LOG_TASK_QUEUE_TIME = "enqueuedTime";
+
+  // Time taken for a Raft log operation to get into the queue after being requested. This will be the time it has to
+  // wait for the queue to be non-full.
+  public static final String RAFT_LOG_TASK_ENQUEUE_DELAY = "queueingDelay";
+
+  // Time taken for a Raft log operation to complete execution.
+  public static final String RAFT_LOG_TASK_EXECUTION_TIME = "ExecutionTime";
+
+  //////////////////////////////
+  // Raft Log Read Path Metrics
+  /////////////////////////////
+
+  // Time required to read a raft log entry from actual raft log file and create a raft log entry
+  public static final String RAFT_LOG_READ_ENTRY_LATENCY = "readEntryLatency";
+
+  // Time required to load and process raft log segments during restart
+  public static final String RAFT_LOG_LOAD_SEGMENT_LATENCY = "segmentLoadLatency";
+
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 674a732..950f43c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -40,18 +40,6 @@ public class RatisMetrics {
 
   static MetricsReporting metricsReporting = new MetricsReporting(500, TimeUnit.MILLISECONDS);
 
-  public static RatisMetricRegistry createMetricRegistryForLogWorker(String name) {
-    return create(
-        new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS,
-            RATIS_LOG_WORKER_METRICS_DESC));
-  }
-
-  public static RatisMetricRegistry getMetricRegistryForLogWorker(String name) {
-    return MetricRegistries.global().get(
-        new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS,
-            RATIS_LOG_WORKER_METRICS_DESC)).get();
-  }
-
   private static RatisMetricRegistry create(MetricRegistryInfo info) {
     Optional<RatisMetricRegistry> metricRegistry = MetricRegistries.global().get(info);
     if (metricRegistry.isPresent()) {
@@ -82,4 +70,21 @@ public class RatisMetrics {
     return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS,
         RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC));
   }
+
+  public static RaftLogMetrics createMetricRegistryForLogWorker(String name) {
+    RatisMetricRegistry ratisMetricRegistry = getMetricRegistryForLogWorker(name);
+    if (ratisMetricRegistry == null) {
+      ratisMetricRegistry = create(new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS,
+          RATIS_LOG_WORKER_METRICS, RATIS_LOG_WORKER_METRICS_DESC));
+    }
+    return new RaftLogMetrics(ratisMetricRegistry);
+  }
+
+  public static RatisMetricRegistry getMetricRegistryForLogWorker(String name) {
+    Optional<RatisMetricRegistry> ratisMetricRegistry = MetricRegistries.global().get(
+        new MetricRegistryInfo(name, RATIS_APPLICATION_NAME_METRICS, RATIS_LOG_WORKER_METRICS,
+            RATIS_LOG_WORKER_METRICS_DESC));
+    return ratisMetricRegistry.orElse(null);
+  }
+
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index 12bd195..26b070a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.raftlog.segmented;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -76,22 +77,24 @@ class LogSegment implements Comparable<Long> {
     }
   }
 
-  static LogSegment newOpenSegment(RaftStorage storage, long start) {
+  static LogSegment newOpenSegment(RaftStorage storage, long start, RaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0);
-    return new LogSegment(storage, true, start, start - 1);
+    return new LogSegment(storage, true, start, start - 1, raftLogMetrics);
   }
 
   @VisibleForTesting
-  static LogSegment newCloseSegment(RaftStorage storage, long start, long end) {
+  static LogSegment newCloseSegment(RaftStorage storage,
+      long start, long end, RaftLogMetrics raftLogMetrics) {
     Preconditions.assertTrue(start >= 0 && end >= start);
-    return new LogSegment(storage, false, start, end);
+    return new LogSegment(storage, false, start, end, raftLogMetrics);
   }
 
-  private static int readSegmentFile(File file, long start, long end, boolean isOpen,
-      CorruptionPolicy corruptionPolicy, Consumer<LogEntryProto> entryConsumer)
-      throws IOException {
+  private static int readSegmentFile(File file, long start, long end,
+      boolean isOpen, CorruptionPolicy corruptionPolicy,
+      RaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer) throws
+      IOException {
     int count = 0;
-    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen)) {
+    try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(file, start, end, isOpen, raftLogMetrics)) {
       for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
         if (prev != null) {
           Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
@@ -119,15 +122,16 @@ class LogSegment implements Comparable<Long> {
     return count;
   }
 
+  @SuppressWarnings("parameternumber")
   static LogSegment loadSegment(RaftStorage storage, File file, long start, long end, boolean isOpen,
-      boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer)
+      boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, RaftLogMetrics raftLogMetrics)
       throws IOException {
     final LogSegment segment = isOpen ?
-        LogSegment.newOpenSegment(storage, start) :
-        LogSegment.newCloseSegment(storage, start, end);
+        LogSegment.newOpenSegment(storage, start, raftLogMetrics) :
+        LogSegment.newCloseSegment(storage, start, end, raftLogMetrics);
 
     final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
-    final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, entry -> {
+    final int entryCount = readSegmentFile(file, start, end, isOpen, corruptionPolicy, raftLogMetrics, entry -> {
       segment.append(keepEntryInCache || isOpen, entry);
       if (logConsumer != null) {
         logConsumer.accept(entry);
@@ -162,12 +166,18 @@ class LogSegment implements Comparable<Long> {
    * In the future we can make the cache loader configurable if necessary.
    */
   class LogEntryLoader extends CacheLoader<LogRecord, LogEntryProto> {
+    private RaftLogMetrics raftLogMetrics;
+
+    LogEntryLoader(RaftLogMetrics raftLogMetrics) {
+      this.raftLogMetrics = raftLogMetrics;
+    }
+
     @Override
     public LogEntryProto load(LogRecord key) throws IOException {
       final File file = getSegmentFile();
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
-      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(),
+      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics,
           entry -> entryCache.put(ServerProtoUtils.toTermIndex(entry), entry));
       loadingTimes.incrementAndGet();
       return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
@@ -185,7 +195,8 @@ class LogSegment implements Comparable<Long> {
   private final long startIndex;
   private volatile long endIndex;
   private final RaftStorage storage;
-  private final LogEntryLoader cacheLoader = new LogEntryLoader();
+  private RaftLogMetrics raftLogMetrics;
+  private final LogEntryLoader cacheLoader = new LogEntryLoader(raftLogMetrics);
   /** later replace it with a metric */
   private final AtomicInteger loadingTimes = new AtomicInteger();
 
@@ -199,11 +210,12 @@ class LogSegment implements Comparable<Long> {
   private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();
   private final Set<TermIndex> configEntries = new HashSet<>();
 
-  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end) {
+  private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, RaftLogMetrics raftLogMetrics) {
     this.storage = storage;
     this.isOpen = isOpen;
     this.startIndex = start;
     this.endIndex = end;
+    this.raftLogMetrics = raftLogMetrics;
   }
 
   long getStartIndex() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index fbe8529..a8c12c8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -23,6 +23,8 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -47,6 +49,8 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
 
+import com.codahale.metrics.Timer;
+
 /**
  * The RaftLog implementation that writes log entries into segmented files in
  * local disk.
@@ -79,6 +83,7 @@ public class SegmentedRaftLog extends RaftLog {
    */
   abstract static class Task {
     private final CompletableFuture<Long> future = new CompletableFuture<>();
+    private Timer.Context queueTimerContext;
 
     CompletableFuture<Long> getFuture() {
       return future;
@@ -102,6 +107,16 @@ public class SegmentedRaftLog extends RaftLog {
 
     abstract long getEndIndex();
 
+    void startTimerOnEnqueue(Timer queueTimer) {
+      queueTimerContext = queueTimer.time();
+    }
+
+    void stopTimerOnDequeue() {
+      if (queueTimerContext != null) {
+        queueTimerContext.stop();
+      }
+    }
+
     int getSerializedSize() {
       return 0;
     }
@@ -177,6 +192,7 @@ public class SegmentedRaftLog extends RaftLog {
   private final SegmentedRaftLogWorker fileLogWorker;
   private final long segmentMaxSize;
   private final boolean stateMachineCachingEnabled;
+  private final RaftLogMetrics metricRegistry;
 
   public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
@@ -193,9 +209,10 @@ public class SegmentedRaftLog extends RaftLog {
     this.storage = storage;
     this.stateMachine = stateMachine;
     segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    this.cache = new SegmentedRaftLogCache(memberId, storage, properties);
+    this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
+    this.cache = new SegmentedRaftLogCache(memberId, storage, properties, metricRegistry);
     this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
-        submitUpdateCommitEvent, server, storage, properties);
+        submitUpdateCommitEvent, server, storage, properties, metricRegistry);
     stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
   }
 
@@ -231,7 +248,9 @@ public class SegmentedRaftLog extends RaftLog {
         // so that during the initial loading we can apply part of the log
         // entries to the state machine
         boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
+        final Timer.Context loadSegmentContext = metricRegistry.getRaftLogLoadSegmentTimer().time();
         cache.loadSegment(pi, keepEntryInCache, logConsumer);
+        loadSegmentContext.stop();
       }
 
       // if the largest index is smaller than the last index in snapshot, we do
@@ -263,11 +282,13 @@ public class SegmentedRaftLog extends RaftLog {
       }
       final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
       if (entry != null) {
+        metricRegistry.onRaftLogCacheHit();
         return entry;
       }
     }
 
     // the entry is not in the segment's cache. Load the cache without holding the lock.
+    metricRegistry.onRaftLogCacheMiss();
     checkAndEvictCache();
     return segment.loadCache(record);
   }
@@ -356,6 +377,7 @@ public class SegmentedRaftLog extends RaftLog {
 
   @Override
   protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
+    final Timer.Context context = metricRegistry.getRaftLogAppendEntryTimer().time();
     checkLogState();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", getName(), ServerProtoUtils.toLogEntryString(entry));
@@ -397,6 +419,8 @@ public class SegmentedRaftLog extends RaftLog {
     } catch (Throwable throwable) {
       LOG.error("{}: Failed to append {}", getName(), ServerProtoUtils.toLogEntryString(entry), throwable);
       throw throwable;
+    } finally {
+      context.stop();
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index 58f5093..dced8ed 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -22,6 +22,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.storage.RaftStorage;
@@ -290,15 +291,22 @@ class SegmentedRaftLogCache {
   private volatile LogSegment openSegment;
   private final LogSegmentList closedSegments;
   private final RaftStorage storage;
+  private final RaftLogMetrics raftLogMetrics;
 
   private final int maxCachedSegments;
   private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
 
   SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties) {
+    this(name, storage, properties, null);
+  }
+
+  SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties,
+                                RaftLogMetrics raftLogMetrics) {
     this.name = name + "-" + getClass().getSimpleName();
     this.closedSegments = new LogSegmentList(name);
     this.storage = storage;
     maxCachedSegments = RaftServerConfigKeys.Log.maxCachedSegmentNum(properties);
+    this.raftLogMetrics = raftLogMetrics;
   }
 
   int getMaxCachedSegments() {
@@ -308,7 +316,7 @@ class SegmentedRaftLogCache {
   void loadSegment(LogPathAndIndex pi, boolean keepEntryInCache,
       Consumer<LogEntryProto> logConsumer) throws IOException {
     LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(),
-        pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, logConsumer);
+        pi.startIndex, pi.endIndex, pi.isOpen(), keepEntryInCache, logConsumer, raftLogMetrics);
     if (logSegment != null) {
       addSegment(logSegment);
     }
@@ -350,7 +358,7 @@ class SegmentedRaftLogCache {
   }
 
   void addOpenSegment(long startIndex) {
-    setOpenSegment(LogSegment.newOpenSegment(storage, startIndex));
+    setOpenSegment(LogSegment.newOpenSegment(storage, startIndex,raftLogMetrics));
   }
 
   private void setOpenSegment(LogSegment openSegment) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
index c0f9e17..1db790f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogInputStream.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.util.Optional;
 
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.OpenCloseState;
 import org.apache.ratis.util.Preconditions;
@@ -65,8 +66,14 @@ public class SegmentedRaftLogInputStream implements Closeable {
   private final boolean isOpen;
   private final OpenCloseState state;
   private SegmentedRaftLogReader reader;
+  private RaftLogMetrics raftLogMetrics;
 
   public SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen) {
+    this(log, startIndex, endIndex, isOpen, null);
+  }
+
+  SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen,
+                                      RaftLogMetrics raftLogMetrics) {
     if (isOpen) {
       Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
     } else {
@@ -78,12 +85,13 @@ public class SegmentedRaftLogInputStream implements Closeable {
     this.endIndex = endIndex;
     this.isOpen = isOpen;
     this.state = new OpenCloseState(getName());
+    this.raftLogMetrics = raftLogMetrics;
   }
 
   private void init() throws IOException {
     state.open();
     try {
-      final SegmentedRaftLogReader r = new SegmentedRaftLogReader(logFile);
+      final SegmentedRaftLogReader r = new SegmentedRaftLogReader(logFile, raftLogMetrics);
       if (r.verifyHeader()) {
         reader = r;
       }
@@ -184,7 +192,7 @@ public class SegmentedRaftLogInputStream implements Closeable {
       throws IOException {
     SegmentedRaftLogInputStream in;
     try {
-      in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false);
+      in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, null);
       // read the header, initialize the inputstream
       in.init();
     } catch (EOFException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
index f90d4c6..5caec22 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogReader.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.raftlog.segmented;
 import org.apache.ratis.io.CorruptedFileException;
 import org.apache.ratis.protocol.ChecksumException;
 import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedInputStream;
 import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -33,6 +34,8 @@ import org.slf4j.LoggerFactory;
 import java.io.*;
 import java.util.zip.Checksum;
 
+import com.codahale.metrics.Timer;
+
 class SegmentedRaftLogReader implements Closeable {
   static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogReader.class);
   /**
@@ -130,13 +133,15 @@ class SegmentedRaftLogReader implements Closeable {
   private final DataInputStream in;
   private byte[] temp = new byte[4096];
   private final Checksum checksum;
+  private final RaftLogMetrics raftLogMetrics;
 
-  SegmentedRaftLogReader(File file) throws FileNotFoundException {
+  SegmentedRaftLogReader(File file, RaftLogMetrics raftLogMetrics) throws FileNotFoundException {
     this.file = file;
     this.limiter = new LimitedInputStream(
         new BufferedInputStream(new FileInputStream(file)));
     in = new DataInputStream(limiter);
     checksum = new PureJavaCrc32C();
+    this.raftLogMetrics = raftLogMetrics;
   }
 
   /**
@@ -181,7 +186,11 @@ class SegmentedRaftLogReader implements Closeable {
    *         exception when skipBrokenEdits is false.
    */
   LogEntryProto readEntry() throws IOException {
+    Timer.Context readEntryContext = null;
     try {
+      if (raftLogMetrics != null) {
+        readEntryContext = raftLogMetrics.getRaftLogReadEntryTimer().time();
+      }
       return decodeEntry();
     } catch (EOFException eof) {
       in.reset();
@@ -202,6 +211,10 @@ class SegmentedRaftLogReader implements Closeable {
       // broken, throw the exception instead of skipping broken entries
       in.reset();
       throw new IOException("got unexpected exception " + e.getMessage(), e);
+    } finally {
+      if (readEntryContext != null) {
+        readEntryContext.stop();
+      }
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index ec7ab73..dcd4adb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -18,17 +18,14 @@
 package org.apache.ratis.server.raftlog.segmented;
 
 import com.codahale.metrics.Gauge;
-import com.codahale.metrics.Metric;
-import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.Timer;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
@@ -61,7 +58,6 @@ class SegmentedRaftLogWorker implements Runnable {
   static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
 
   static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
-  private final RatisMetricRegistry metricRegistry;
 
   static class StateMachineDataPolicy {
     private final boolean sync;
@@ -146,6 +142,10 @@ class SegmentedRaftLogWorker implements Runnable {
   private final Runnable submitUpdateCommitEvent;
   private final StateMachine stateMachine;
   private final Timer logFlushTimer;
+  private final Timer raftLogSyncTimer;
+  private final Timer raftLogQueueingTimer;
+  private final Timer raftLogEnqueueingDelayTimer;
+  private final RaftLogMetrics raftLogMetrics;
 
   /**
    * The number of entries that have been written into the SegmentedRaftLogOutputStream but
@@ -167,13 +167,14 @@ class SegmentedRaftLogWorker implements Runnable {
   private final StateMachineDataPolicy stateMachineDataPolicy;
 
   SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
-                         RaftServerImpl server, RaftStorage storage, RaftProperties properties) {
+                         RaftServerImpl server, RaftStorage storage, RaftProperties properties,
+                         RaftLogMetrics metricRegistry) {
     this.name = memberId + "-" + getClass().getSimpleName();
     LOG.info("new {} for {}", name, storage);
 
     this.submitUpdateCommitEvent = submitUpdateCommitEvent;
     this.stateMachine = stateMachine;
-    this.metricRegistry = RatisMetrics.createMetricRegistryForLogWorker(memberId.getPeerId().toString());
+    this.raftLogMetrics = metricRegistry;
     this.storage = storage;
     this.server = server;
     final SizeInBytes queueByteLimit = RaftServerConfigKeys.Log.queueByteLimit(properties);
@@ -181,17 +182,6 @@ class SegmentedRaftLogWorker implements Runnable {
     this.queue =
         new DataBlockingQueue<>(name, queueByteLimit, queueElementLimit, Task::getSerializedSize);
 
-    metricRegistry.gauge("dataQueueSize", new MetricRegistry.MetricSupplier() {
-      @Override public Metric newMetric() {
-        return new Gauge<Integer>() {
-          @Override public Integer getValue() {
-            //q.size() is O(1) operation
-            return queue.size();
-          }
-        };
-      }
-    });
-
     this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
     this.preallocatedSize = RaftServerConfigKeys.Log.preallocatedSize(properties).getSize();
     this.bufferSize = RaftServerConfigKeys.Log.writeBufferSize(properties).getSizeInt();
@@ -202,7 +192,13 @@ class SegmentedRaftLogWorker implements Runnable {
     this.workerThread = new Thread(this, name);
 
     // Server Id can be null in unit tests
-    this.logFlushTimer = metricRegistry.timer("flush-time");
+    metricRegistry.addDataQueueSizeGauge(queue);
+    metricRegistry.addLogWorkerQueueSizeGauge(writeTasks.q);
+    metricRegistry.addFlushBatchSizeGauge(() -> (Gauge<Integer>) () -> pendingFlushNum);
+    this.logFlushTimer = metricRegistry.getFlushTimer();
+    this.raftLogSyncTimer = metricRegistry.getRaftLogSyncTimer();
+    this.raftLogQueueingTimer = metricRegistry.getRaftLogQueueTimer();
+    this.raftLogEnqueueingDelayTimer = metricRegistry.getRaftLogEnqueueDelayTimer();
   }
 
   void start(long latestIndex, File openSegmentFile) throws IOException {
@@ -250,10 +246,13 @@ class SegmentedRaftLogWorker implements Runnable {
   private Task addIOTask(Task task) {
     LOG.debug("{} adds IO task {}", name, task);
     try {
+      final Timer.Context enqueueTimerContext = raftLogEnqueueingDelayTimer.time();
       for(; !queue.offer(task, ONE_SECOND); ) {
         Preconditions.assertTrue(isAlive(),
             "the worker thread is not alive");
       }
+      enqueueTimerContext.stop();
+      task.startTimerOnEnqueue(raftLogQueueingTimer);
     } catch (Throwable t) {
       if (t instanceof InterruptedException && !running) {
         LOG.info("Got InterruptedException when adding task " + task
@@ -282,11 +281,15 @@ class SegmentedRaftLogWorker implements Runnable {
       try {
         Task task = queue.poll(ONE_SECOND);
         if (task != null) {
+          task.stopTimerOnDequeue();
           try {
             if (logIOException != null) {
               throw logIOException;
             } else {
+              Timer.Context executionTimeContext =
+                  raftLogMetrics.getRaftLogTaskExecutionTimer(task.getClass().getSimpleName().toLowerCase()).time();
               task.execute();
+              executionTimeContext.stop();
             }
           } catch (IOException e) {
             if (task.getEndIndex() < lastWrittenIndex) {
@@ -346,7 +349,9 @@ class SegmentedRaftLogWorker implements Runnable {
         if (stateMachineDataPolicy.isSync()) {
           stateMachineDataPolicy.getFromFuture(f, () -> this + "-flushStateMachineData");
         }
+        final Timer.Context logSyncTimerContext = raftLogSyncTimer.time();
         out.flush();
+        logSyncTimerContext.stop();
         if (!stateMachineDataPolicy.isSync()) {
           IOUtils.getFromFuture(f, () -> this + "-flushStateMachineData");
         }
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 2bc73f2..af08a0c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -146,7 +146,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
 
     // Get all last_heartbeat_elapsed_time metric gauges. Should be equal to number of followers.
     SortedMap<String, Gauge> heartbeatElapsedTimeGauges = ratisMetricRegistry.getGauges((s, metric) ->
-        s.contains("last_heartbeat_elapsed_time"));
+        s.contains("lastHeartbeatElapsedTime"));
     assertTrue(heartbeatElapsedTimeGauges.size() == 2);
 
     for (RaftServerImpl followerServer : cluster.getFollowers()) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
index d5c5850..7ad23b7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/storage/RaftStorageTestUtils.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.server.storage;
 
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_FLUSH_TIME;
+
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.impl.ServerProtoUtils;
 import org.apache.ratis.server.metrics.RatisMetrics;
@@ -30,8 +32,12 @@ import java.util.function.Consumer;
 public interface RaftStorageTestUtils {
 
   static String getLogFlushTimeMetric(RaftPeerId serverId) {
+    return getRaftLogFullMetric(serverId, RAFT_LOG_FLUSH_TIME);
+  }
+
+  static String getRaftLogFullMetric(RaftPeerId serverId, String metricName) {
     return RatisMetrics.RATIS_APPLICATION_NAME_METRICS + "." + RatisMetrics.RATIS_LOG_WORKER_METRICS
-        + "." + serverId + ".flush-time";
+        + "." + serverId + "." + metricName;
   }
 
   static void printLog(RaftLog log, Consumer<String> println) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 63d9da6..e4918a4 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -92,7 +92,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
     RatisMetricRegistry ratisMetricRegistry = RatisMetrics.getMetricRegistryForHeartbeat(
         leaderServer.getMemberId().toString());
     SortedMap<String, Gauge> heartbeatElapsedTimeGauges = ratisMetricRegistry.getGauges((s, metric) ->
-        s.contains("last_heartbeat_elapsed_time"));
+        s.contains("lastHeartbeatElapsedTime"));
 
     String followerId = failedFollower.getId().toString();
     Gauge metric = heartbeatElapsedTimeGauges.entrySet().parallelStream().filter(e -> e.getKey().contains(
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 971a279..e3f16bc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -17,12 +17,16 @@
  */
 package org.apache.ratis.server.raftlog;
 
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_FLUSH_TIME;
+import static org.apache.ratis.server.metrics.RatisMetricNames.RAFT_LOG_SYNC_TIME;
+
 import com.codahale.metrics.Timer;
 import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
@@ -72,14 +76,14 @@ public class TestRaftLogMetrics extends BaseTest
   }
 
   @Test
-  public void testFlushMetric() throws Exception {
+  public void testRaftLogMetrics() throws Exception {
     try(final MiniRaftCluster cluster = newCluster(NUM_SERVERS)) {
       cluster.start();
-      runTestFlushMetric(cluster);
+      runTestRaftLogMetrics(cluster);
     }
   }
 
-  static void runTestFlushMetric(MiniRaftCluster cluster) throws Exception {
+  static void runTestRaftLogMetrics(MiniRaftCluster cluster) throws Exception {
     int numMsg = 2;
     final RaftTestUtil.SimpleMessage[] messages = RaftTestUtil.SimpleMessage.create(numMsg);
 
@@ -91,17 +95,20 @@ public class TestRaftLogMetrics extends BaseTest
 
     // For leader, flush must happen before client can get replies.
     assertFlushCount(cluster.getLeader());
+    assertRaftLogWritePathMetrics(cluster.getLeader());
 
     // For followers, flush can be lagged behind.  Attempt multiple times.
     for(RaftServerImpl f : cluster.getFollowers()) {
       JavaUtils.attempt(() -> assertFlushCount(f), 10, HUNDRED_MILLIS, f.getId() + "-assertFlushCount", null);
+      // We have already waited enough for follower metrics to populate.
+      assertRaftLogWritePathMetrics(f);
     }
   }
 
   static void assertFlushCount(RaftServerImpl server) throws Exception {
     final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(server.getId());
     Timer tm = (Timer) RatisMetrics.getMetricRegistryForLogWorker(server.getId().toString())
-        .get("flush-time");
+        .get(RAFT_LOG_FLUSH_TIME);
     Assert.assertNotNull(tm);
 
     final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
@@ -116,4 +123,45 @@ public class TestRaftLogMetrics extends BaseTest
         ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
             .intValue());
   }
-}
+
+  static void assertRaftLogWritePathMetrics(RaftServerImpl server) throws Exception {
+    final String syncTimeMetric = RaftStorageTestUtils.getRaftLogFullMetric(server.getId(), RAFT_LOG_SYNC_TIME);
+    RatisMetricRegistry ratisMetricRegistry = RatisMetrics.getMetricRegistryForLogWorker(server.getId().toString());
+
+    //Test sync count
+    Timer tm = (Timer) ratisMetricRegistry.get(RAFT_LOG_SYNC_TIME);
+    Assert.assertNotNull(tm);
+    final MetricsStateMachine stateMachine = MetricsStateMachine.get(server);
+    final int expectedFlush = stateMachine.getFlushCount();
+    Assert.assertEquals(expectedFlush, tm.getCount()); // Ideally, flushCount should be same as syncCount.
+    Assert.assertTrue(tm.getMeanRate() > 0);
+
+    // Test jmx. Just testing one metric's JMX is good enough.
+    ObjectName oname = new ObjectName("ratis_core", "name", syncTimeMetric);
+    Assert.assertEquals(expectedFlush,
+        ((Long) ManagementFactory.getPlatformMBeanServer().getAttribute(oname, "Count"))
+            .intValue());
+
+    long cacheMissCount = ratisMetricRegistry.counter("cacheMissCount").getCount();
+    Assert.assertTrue(cacheMissCount == 0);
+
+    long cacheHitsCount = ratisMetricRegistry.counter("cacheHitCount").getCount();
+    Assert.assertTrue(cacheHitsCount > 0);
+
+    Timer appendLatencyTimer = ratisMetricRegistry.timer("appendEntryLatency");
+    Assert.assertTrue(appendLatencyTimer.getMeanRate() > 0);
+
+    Timer enqueuedTimer = ratisMetricRegistry.timer("enqueuedTime");
+    Assert.assertTrue(enqueuedTimer.getMeanRate() > 0);
+
+    Timer queueingDelayTimer = ratisMetricRegistry.timer("queueingDelay");
+    Assert.assertTrue(queueingDelayTimer.getMeanRate() > 0);
+
+    Timer executionTimer = ratisMetricRegistry.timer("writelogExecutionTime");
+    Assert.assertTrue(executionTimer.getMeanRate() > 0);
+
+    Assert.assertNotNull(ratisMetricRegistry.get("dataQueueSize"));
+    Assert.assertNotNull(ratisMetricRegistry.get("workerQueueSize"));
+    Assert.assertNotNull(ratisMetricRegistry.get("syncBatchSize"));
+  }
+}
\ No newline at end of file
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index a249fe0..f498062 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -55,7 +55,7 @@ public class TestCacheEviction extends BaseTest {
     Assert.assertEquals(numSegments, cached.length);
     final LogSegmentList segments = new LogSegmentList(TestCacheEviction.class.getSimpleName());
     for (int i = 0; i < numSegments; i++) {
-      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1);
+      LogSegment s = LogSegment.newCloseSegment(null, start, start + size - 1, null);
       if (cached[i]) {
         s = Mockito.spy(s);
         Mockito.when(s.hasCache()).thenReturn(true);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
index 3086e78..0e9eb9f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestLogSegment.java
@@ -23,6 +23,8 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants.StartupOption;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.server.storage.RaftStorageDirectory;
@@ -47,9 +49,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.INVALID_LOG_INDEX;
+import static org.apache.ratis.server.raftlog.RaftLog.INVALID_LOG_INDEX;
 import static org.apache.ratis.server.raftlog.segmented.LogSegment.getEntrySize;
 
+import com.codahale.metrics.Timer;
+
 /**
  * Test basic functionality of {@link LogSegment}
  */
@@ -167,7 +171,7 @@ public class TestLogSegment extends BaseTest {
     final File openSegmentFile = prepareLog(true, 0, 100, 0, isLastEntryPartiallyWritten);
     RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
     LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
-        INVALID_LOG_INDEX, true, loadInitial, null);
+        INVALID_LOG_INDEX, true, loadInitial, null, null);
     final int delta = isLastEntryPartiallyWritten? 1: 0;
     checkLogSegment(openSegment, 0, 99 - delta, true, openSegmentFile.length(), 0);
     storage.close();
@@ -177,7 +181,7 @@ public class TestLogSegment extends BaseTest {
     // load a closed segment (1000-1099)
     final File closedSegmentFile = prepareLog(false, 1000, 100, 1, false);
     LogSegment closedSegment = LogSegment.loadSegment(storage, closedSegmentFile,
-        1000, 1099, false, loadInitial, null);
+        1000, 1099, false, loadInitial, null, null);
     checkLogSegment(closedSegment, 1000, 1099, false,
         closedSegment.getTotalSize(), 1);
     Assert.assertEquals(loadInitial ? 0 : 1, closedSegment.getLoadingTimes());
@@ -186,7 +190,7 @@ public class TestLogSegment extends BaseTest {
   @Test
   public void testAppendEntries() throws Exception {
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(null, start);
+    LogSegment segment = LogSegment.newOpenSegment(null, start, null);
     long size = SegmentedRaftLogFormat.getHeaderLength();
     final long max = 8 * 1024 * 1024;
     checkLogSegment(segment, start, start - 1, true, size, 0);
@@ -206,8 +210,26 @@ public class TestLogSegment extends BaseTest {
   }
 
   @Test
+  public void testAppendEntryMetric() throws Exception {
+    RaftLogMetrics raftLogMetrics = RatisMetrics.createMetricRegistryForLogWorker("test");
+
+    final File openSegmentFile = prepareLog(true, 0, 100, 0, true);
+    RaftStorage storage = new RaftStorage(storageDir, StartupOption.REGULAR);
+    LogSegment openSegment = LogSegment.loadSegment(storage, openSegmentFile, 0,
+        INVALID_LOG_INDEX, true, true, null, raftLogMetrics);
+    checkLogSegment(openSegment, 0, 98, true, openSegmentFile.length(), 0);
+    storage.close();
+
+    Timer readEntryTimer = raftLogMetrics.getRaftLogReadEntryTimer();
+    Assert.assertNotNull(readEntryTimer);
+    Assert.assertEquals(100, readEntryTimer.getCount());
+    Assert.assertTrue(readEntryTimer.getMeanRate() > 0);
+  }
+
+
+  @Test
   public void testAppendWithGap() throws Exception {
-    LogSegment segment = LogSegment.newOpenSegment(null, 1000);
+    LogSegment segment = LogSegment.newOpenSegment(null, 1000, null);
     SimpleOperation op = new SimpleOperation("m");
     final StateMachineLogEntryProto m = op.getLogEntryContent();
     try {
@@ -234,7 +256,7 @@ public class TestLogSegment extends BaseTest {
   public void testTruncate() throws Exception {
     final long term = 1;
     final long start = 1000;
-    LogSegment segment = LogSegment.newOpenSegment(null, start);
+    LogSegment segment = LogSegment.newOpenSegment(null, start, null);
     for (int i = 0; i < 100; i++) {
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(
           new SimpleOperation("m" + i).getLogEntryContent(), term, i + start);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index fa89192..53cb364 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -21,6 +21,7 @@ import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeerId;
@@ -31,6 +32,7 @@ import org.apache.ratis.server.impl.RetryCacheTestUtil;
 import org.apache.ratis.server.impl.RetryCache;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -67,6 +69,8 @@ import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import com.codahale.metrics.Timer;
+
 public class TestSegmentedRaftLog extends BaseTest {
   static {
     LogUtils.setLogLevel(SegmentedRaftLogWorker.LOG, Level.DEBUG);
@@ -193,6 +197,15 @@ public class TestSegmentedRaftLog extends BaseTest {
           .toArray(LogEntryProto[]::new);
       Assert.assertArrayEquals(entries, entriesFromLog);
       Assert.assertEquals(entries[entries.length - 1], getLastEntry(raftLog));
+
+      RatisMetricRegistry metricRegistryForLogWorker =
+          RatisMetrics.getMetricRegistryForLogWorker(memberId.getPeerId().toString());
+
+      Timer raftLogSegmentLoadLatencyTimer = metricRegistryForLogWorker.timer("segmentLoadLatency");
+      Assert.assertTrue(raftLogSegmentLoadLatencyTimer.getMeanRate() > 0);
+
+      Timer raftLogReadLatencyTimer = metricRegistryForLogWorker.timer("readEntryLatency");
+      Assert.assertTrue(raftLogReadLatencyTimer.getMeanRate() > 0);
     }
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index 660c2f8..71c3dc6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -43,7 +43,7 @@ public class TestSegmentedRaftLogCache {
   }
 
   private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
-    LogSegment s = LogSegment.newOpenSegment(null, start);
+    LogSegment s = LogSegment.newOpenSegment(null, start, null);
     for (long i = start; i <= end; i++) {
       SimpleOperation m = new SimpleOperation("m" + i);
       LogEntryProto entry = ServerProtoUtils.toLogEntryProto(m.getLogEntryContent(), 0, i);