You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/11 07:42:08 UTC

[incubator-ratis] branch master updated: RATIS-966. Track commit metrics for different type of entries in RaftLog. (#120)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 374be2a  RATIS-966. Track commit metrics for different type of entries in RaftLog. (#120)
374be2a is described below

commit 374be2ae5f9a7ef4c9c6fc3ba4696264362bad56
Author: anshkhannasbu <54...@users.noreply.github.com>
AuthorDate: Thu Jun 11 03:41:57 2020 -0400

    RATIS-966. Track commit metrics for different type of entries in RaftLog. (#120)
---
 .../java/org/apache/ratis/server/impl/LeaderState.java  | 11 +++++++++++
 .../org/apache/ratis/server/metrics/RaftLogMetrics.java | 17 +++++++++++++++++
 .../java/org/apache/ratis/server/raftlog/RaftLog.java   |  8 +++++++-
 .../server/raftlog/segmented/SegmentedRaftLog.java      | 17 +++++++----------
 .../apache/ratis/server/raftlog/TestRaftLogMetrics.java | 14 ++++++++++++++
 5 files changed, 56 insertions(+), 11 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index a27c77e..26766f4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.*;
 import org.slf4j.Logger;
@@ -652,11 +653,21 @@ public class LeaderState {
       // the log gets purged after the statemachine does a snapshot
       final TermIndex[] entriesToCommit = raftLog.getEntries(
           oldLastCommitted + 1, majority + 1);
+
       if (server.getState().updateStatemachine(majority, currentTerm)) {
         watchRequests.update(ReplicationLevel.MAJORITY, majority);
         logMetadata(majority);
         commitIndexChanged();
       }
+
+      try {
+        for (TermIndex entry : entriesToCommit) {
+          raftLog.getRaftLogMetrics().onLogEntryCommit(raftLog.get(entry.getIndex()));
+        }
+      } catch (RaftLogIOException e) {
+        LOG.error("Caught exception reading from RaftLog", e);
+      }
+
       checkAndUpdateConfiguration(entriesToCommit);
     }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
index 51bb67b..e8725ba 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -24,6 +24,8 @@ import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.metrics.RatisMetrics;
 import org.apache.ratis.util.DataQueue;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+
 
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.MetricRegistry;
@@ -67,6 +69,11 @@ public class RaftLogMetrics extends RatisMetrics {
   public static final String RAFT_LOG_PURGE_METRIC = "purgeLog";
   public static final String LOG_APPENDER_INSTALL_SNAPSHOT_METRIC = "numInstallSnapshot";
 
+  // Log Entry metrics
+  public static final String METADATA_LOG_ENTRY_COUNT = "metadataLogEntryCount";
+  public static final String CONFIG_LOG_ENTRY_COUNT = "configLogEntryCount";
+  public static final String STATE_MACHINE_LOG_ENTRY_COUNT = "stateMachineLogEntryCount";
+
   //////////////////////////////
   // Raft Log Read Path Metrics
   /////////////////////////////
@@ -120,6 +127,16 @@ public class RaftLogMetrics extends RatisMetrics {
     registry.counter(RAFT_LOG_CACHE_HIT_COUNT).inc();
   }
 
+  public void onLogEntryCommit(LogEntryProto proto) {
+    if (proto.hasConfigurationEntry()) {
+      registry.counter(CONFIG_LOG_ENTRY_COUNT).inc();
+    } else if (proto.hasMetadataEntry()) {
+      registry.counter(METADATA_LOG_ENTRY_COUNT).inc();
+    } else if (proto.hasStateMachineLogEntry()) {
+      registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc();
+    }
+  }
+
   public void onRaftLogCacheMiss() {
     registry.counter(RAFT_LOG_CACHE_MISS_COUNT).inc();
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index b882a60..4122262 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -82,6 +83,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
   private final Runner runner = new Runner(this::getName);
   private final OpenCloseState state;
+  private final RaftLogMetrics raftLogMetrics;
 
   private volatile LogEntryProto lastMetadataEntry = null;
 
@@ -92,11 +94,15 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     this.snapshotIndex = new RaftLogIndex("snapshotIndex", commitIndex);
     this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
     this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
-
+    this.raftLogMetrics = new RaftLogMetrics(memberId.toString());
     this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
     this.state = new OpenCloseState(getName());
   }
 
+  public RaftLogMetrics getRaftLogMetrics() {
+    return raftLogMetrics;
+  }
+
   public long getLastCommittedIndex() {
     return commitIndex.get();
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 28f6f81..4670425 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -23,7 +23,6 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -191,7 +190,6 @@ public class SegmentedRaftLog extends RaftLog {
   private final SegmentedRaftLogWorker fileLogWorker;
   private final long segmentMaxSize;
   private final boolean stateMachineCachingEnabled;
-  private final RaftLogMetrics raftLogMetrics;
 
   public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
@@ -208,10 +206,9 @@ public class SegmentedRaftLog extends RaftLog {
     this.storage = storage;
     this.stateMachine = stateMachine;
     segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
-    this.raftLogMetrics = new RaftLogMetrics(memberId.toString());
-    this.cache = new SegmentedRaftLogCache(memberId, storage, properties, raftLogMetrics);
+    this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics());
     this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
-        submitUpdateCommitEvent, server, storage, properties, raftLogMetrics);
+        submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics());
     stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
   }
 
@@ -248,7 +245,7 @@ public class SegmentedRaftLog extends RaftLog {
         // so that during the initial loading we can apply part of the log
         // entries to the state machine
         boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
-        final Timer.Context loadSegmentContext = raftLogMetrics.getRaftLogLoadSegmentTimer().time();
+        final Timer.Context loadSegmentContext = getRaftLogMetrics().getRaftLogLoadSegmentTimer().time();
         cache.loadSegment(pi, keepEntryInCache, logConsumer, lastIndexInSnapshot);
         loadSegmentContext.stop();
       }
@@ -282,13 +279,13 @@ public class SegmentedRaftLog extends RaftLog {
       }
       final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
       if (entry != null) {
-        raftLogMetrics.onRaftLogCacheHit();
+        getRaftLogMetrics().onRaftLogCacheHit();
         return entry;
       }
     }
 
     // the entry is not in the segment's cache. Load the cache without holding the lock.
-    raftLogMetrics.onRaftLogCacheMiss();
+    getRaftLogMetrics().onRaftLogCacheMiss();
     checkAndEvictCache();
     return segment.loadCache(record);
   }
@@ -384,7 +381,7 @@ public class SegmentedRaftLog extends RaftLog {
 
   @Override
   protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
-    final Timer.Context context = raftLogMetrics.getRaftLogAppendEntryTimer().time();
+    final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
     checkLogState();
     if (LOG.isTraceEnabled()) {
       LOG.trace("{}: appendEntry {}", getName(), ServerProtoUtils.toLogEntryString(entry));
@@ -516,7 +513,7 @@ public class SegmentedRaftLog extends RaftLog {
     }
     fileLogWorker.close();
     storage.close();
-    raftLogMetrics.unregister();
+    getRaftLogMetrics().unregister();
   }
 
   SegmentedRaftLogCache getRaftLogCache() {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index dfaa822..ef65d79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -30,6 +30,9 @@ import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_ENQUE
 import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_EXECUTION_TIME;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_QUEUE_TIME;
 import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_WORKER_QUEUE_SIZE;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.METADATA_LOG_ENTRY_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.CONFIG_LOG_ENTRY_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.STATE_MACHINE_LOG_ENTRY_COUNT;
 import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
 
 import com.codahale.metrics.Timer;
@@ -115,6 +118,17 @@ public class TestRaftLogMetrics extends BaseTest
       // We have already waited enough for follower metrics to populate.
       assertRaftLogWritePathMetrics(f);
     }
+
+    // Wait for commits to happen on leader
+    JavaUtils.attempt(() -> assertCommitCount(cluster.getLeader(), numMsg), 10, HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertCommitCount", null);
+  }
+
+  static void assertCommitCount(RaftServerImpl server, int expectedMsgs) throws  Exception {
+    RatisMetricRegistry rlm = server.getState().getLog().getRaftLogMetrics().getRegistry();
+    long metaCount = rlm.counter(METADATA_LOG_ENTRY_COUNT).getCount();
+    long configCount = rlm.counter(CONFIG_LOG_ENTRY_COUNT).getCount();
+    long stmCount = rlm.counter(STATE_MACHINE_LOG_ENTRY_COUNT).getCount();
+    Assert.assertTrue(stmCount + configCount + metaCount == expectedMsgs);
   }
 
   static void assertFlushCount(RaftServerImpl server) throws Exception {