You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/11 07:42:08 UTC
[incubator-ratis] branch master updated: RATIS-966. Track commit
metrics for different type of entries in RaftLog. (#120)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 374be2a RATIS-966. Track commit metrics for different type of entries in RaftLog. (#120)
374be2a is described below
commit 374be2ae5f9a7ef4c9c6fc3ba4696264362bad56
Author: anshkhannasbu <54...@users.noreply.github.com>
AuthorDate: Thu Jun 11 03:41:57 2020 -0400
RATIS-966. Track commit metrics for different type of entries in RaftLog. (#120)
---
.../java/org/apache/ratis/server/impl/LeaderState.java | 11 +++++++++++
.../org/apache/ratis/server/metrics/RaftLogMetrics.java | 17 +++++++++++++++++
.../java/org/apache/ratis/server/raftlog/RaftLog.java | 8 +++++++-
.../server/raftlog/segmented/SegmentedRaftLog.java | 17 +++++++----------
.../apache/ratis/server/raftlog/TestRaftLogMetrics.java | 14 ++++++++++++++
5 files changed, 56 insertions(+), 11 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index a27c77e..26766f4 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -27,6 +27,7 @@ import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.*;
import org.slf4j.Logger;
@@ -652,11 +653,21 @@ public class LeaderState {
// the log gets purged after the statemachine does a snapshot
final TermIndex[] entriesToCommit = raftLog.getEntries(
oldLastCommitted + 1, majority + 1);
+
if (server.getState().updateStatemachine(majority, currentTerm)) {
watchRequests.update(ReplicationLevel.MAJORITY, majority);
logMetadata(majority);
commitIndexChanged();
}
+
+ try {
+ for (TermIndex entry : entriesToCommit) {
+ raftLog.getRaftLogMetrics().onLogEntryCommit(raftLog.get(entry.getIndex()));
+ }
+ } catch (RaftLogIOException e) {
+ LOG.error("Caught exception reading from RaftLog", e);
+ }
+
checkAndUpdateConfiguration(entriesToCommit);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
index 51bb67b..e8725ba 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -24,6 +24,8 @@ import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.util.DataQueue;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+
import com.codahale.metrics.Gauge;
import com.codahale.metrics.MetricRegistry;
@@ -67,6 +69,11 @@ public class RaftLogMetrics extends RatisMetrics {
public static final String RAFT_LOG_PURGE_METRIC = "purgeLog";
public static final String LOG_APPENDER_INSTALL_SNAPSHOT_METRIC = "numInstallSnapshot";
+ // Log Entry metrics
+ public static final String METADATA_LOG_ENTRY_COUNT = "metadataLogEntryCount";
+ public static final String CONFIG_LOG_ENTRY_COUNT = "configLogEntryCount";
+ public static final String STATE_MACHINE_LOG_ENTRY_COUNT = "stateMachineLogEntryCount";
+
//////////////////////////////
// Raft Log Read Path Metrics
/////////////////////////////
@@ -120,6 +127,16 @@ public class RaftLogMetrics extends RatisMetrics {
registry.counter(RAFT_LOG_CACHE_HIT_COUNT).inc();
}
+ public void onLogEntryCommit(LogEntryProto proto) {
+ if (proto.hasConfigurationEntry()) {
+ registry.counter(CONFIG_LOG_ENTRY_COUNT).inc();
+ } else if (proto.hasMetadataEntry()) {
+ registry.counter(METADATA_LOG_ENTRY_COUNT).inc();
+ } else if (proto.hasStateMachineLogEntry()) {
+ registry.counter(STATE_MACHINE_LOG_ENTRY_COUNT).inc();
+ }
+ }
+
public void onRaftLogCacheMiss() {
registry.counter(RAFT_LOG_CACHE_MISS_COUNT).inc();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
index b882a60..4122262 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/RaftLog.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.StateMachineException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -82,6 +83,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private final Runner runner = new Runner(this::getName);
private final OpenCloseState state;
+ private final RaftLogMetrics raftLogMetrics;
private volatile LogEntryProto lastMetadataEntry = null;
@@ -92,11 +94,15 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
this.snapshotIndex = new RaftLogIndex("snapshotIndex", commitIndex);
this.purgeIndex = new RaftLogIndex("purgeIndex", LEAST_VALID_LOG_INDEX - 1);
this.purgeGap = RaftServerConfigKeys.Log.purgeGap(properties);
-
+ this.raftLogMetrics = new RaftLogMetrics(memberId.toString());
this.maxBufferSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties).getSizeInt();
this.state = new OpenCloseState(getName());
}
+ public RaftLogMetrics getRaftLogMetrics() {
+ return raftLogMetrics;
+ }
+
public long getLastCommittedIndex() {
return commitIndex.get();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index 28f6f81..4670425 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -23,7 +23,6 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -191,7 +190,6 @@ public class SegmentedRaftLog extends RaftLog {
private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
- private final RaftLogMetrics raftLogMetrics;
public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
@@ -208,10 +206,9 @@ public class SegmentedRaftLog extends RaftLog {
this.storage = storage;
this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
- this.raftLogMetrics = new RaftLogMetrics(memberId.toString());
- this.cache = new SegmentedRaftLogCache(memberId, storage, properties, raftLogMetrics);
+ this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics());
this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
- submitUpdateCommitEvent, server, storage, properties, raftLogMetrics);
+ submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics());
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
}
@@ -248,7 +245,7 @@ public class SegmentedRaftLog extends RaftLog {
// so that during the initial loading we can apply part of the log
// entries to the state machine
boolean keepEntryInCache = (paths.size() - i++) <= cache.getMaxCachedSegments();
- final Timer.Context loadSegmentContext = raftLogMetrics.getRaftLogLoadSegmentTimer().time();
+ final Timer.Context loadSegmentContext = getRaftLogMetrics().getRaftLogLoadSegmentTimer().time();
cache.loadSegment(pi, keepEntryInCache, logConsumer, lastIndexInSnapshot);
loadSegmentContext.stop();
}
@@ -282,13 +279,13 @@ public class SegmentedRaftLog extends RaftLog {
}
final LogEntryProto entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
- raftLogMetrics.onRaftLogCacheHit();
+ getRaftLogMetrics().onRaftLogCacheHit();
return entry;
}
}
// the entry is not in the segment's cache. Load the cache without holding the lock.
- raftLogMetrics.onRaftLogCacheMiss();
+ getRaftLogMetrics().onRaftLogCacheMiss();
checkAndEvictCache();
return segment.loadCache(record);
}
@@ -384,7 +381,7 @@ public class SegmentedRaftLog extends RaftLog {
@Override
protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
- final Timer.Context context = raftLogMetrics.getRaftLogAppendEntryTimer().time();
+ final Timer.Context context = getRaftLogMetrics().getRaftLogAppendEntryTimer().time();
checkLogState();
if (LOG.isTraceEnabled()) {
LOG.trace("{}: appendEntry {}", getName(), ServerProtoUtils.toLogEntryString(entry));
@@ -516,7 +513,7 @@ public class SegmentedRaftLog extends RaftLog {
}
fileLogWorker.close();
storage.close();
- raftLogMetrics.unregister();
+ getRaftLogMetrics().unregister();
}
SegmentedRaftLogCache getRaftLogCache() {
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index dfaa822..ef65d79 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -30,6 +30,9 @@ import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_ENQUE
import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_EXECUTION_TIME;
import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_QUEUE_TIME;
import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_WORKER_QUEUE_SIZE;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.METADATA_LOG_ENTRY_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.CONFIG_LOG_ENTRY_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.STATE_MACHINE_LOG_ENTRY_COUNT;
import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
import com.codahale.metrics.Timer;
@@ -115,6 +118,17 @@ public class TestRaftLogMetrics extends BaseTest
// We have already waited enough for follower metrics to populate.
assertRaftLogWritePathMetrics(f);
}
+
+ // Wait for commits to happen on leader
+ JavaUtils.attempt(() -> assertCommitCount(cluster.getLeader(), numMsg), 10, HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertCommitCount", null);
+ }
+
+ static void assertCommitCount(RaftServerImpl server, int expectedMsgs) throws Exception {
+ RatisMetricRegistry rlm = server.getState().getLog().getRaftLogMetrics().getRegistry();
+ long metaCount = rlm.counter(METADATA_LOG_ENTRY_COUNT).getCount();
+ long configCount = rlm.counter(CONFIG_LOG_ENTRY_COUNT).getCount();
+ long stmCount = rlm.counter(STATE_MACHINE_LOG_ENTRY_COUNT).getCount();
+ Assert.assertTrue(stmCount + configCount + metaCount == expectedMsgs);
}
static void assertFlushCount(RaftServerImpl server) throws Exception {