You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2019/09/16 10:01:46 UTC
[incubator-ratis] branch master updated: RATIS-670. Add a metric to
track StateMachine Log apply index. Contributed by Supratim Deka.
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c8f2c7a RATIS-670. Add a metric to track StateMachine Log apply index. Contributed by Supratim Deka.
c8f2c7a is described below
commit c8f2c7a2f1c9de71b38d837d5b010793c53ce51b
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Sep 16 15:31:30 2019 +0530
RATIS-670. Add a metric to track StateMachine Log apply index. Contributed by Supratim Deka.
---
.../ratis/server/impl/StateMachineMetrics.java | 57 ++++++++++++++++++++++
.../ratis/server/impl/StateMachineUpdater.java | 11 +++++
.../ratis/server/metrics/RatisMetricNames.java | 4 ++
.../apache/ratis/server/metrics/RatisMetrics.java | 7 +++
.../ratis/statemachine/impl/BaseStateMachine.java | 9 ++++
.../test/java/org/apache/ratis/RaftAsyncTests.java | 6 +++
.../test/java/org/apache/ratis/RaftBasicTests.java | 55 +++++++++++++++++++++
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 6 +++
8 files changed, 155 insertions(+)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java
new file mode 100644
index 0000000..1bde465
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.server.metrics.RatisMetricNames;
+import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.util.function.LongSupplier;
+
+/**
+ * Metrics Registry for the State Machine Updater. One instance per group.
+ */
+public final class StateMachineMetrics {
+ private RatisMetricRegistry registry = null;
+
+ public static StateMachineMetrics getStateMachineMetrics(
+ RaftServerImpl server, RaftLogIndex appliedIndex,
+ StateMachine stateMachine) {
+
+ String serverId = server.getMemberId().toString();
+ LongSupplier getApplied = appliedIndex::get;
+ LongSupplier getApplyCompleted =
+ () -> (stateMachine.getLastAppliedTermIndex() == null) ? -1
+ : stateMachine.getLastAppliedTermIndex().getIndex();
+
+ return new StateMachineMetrics(serverId, getApplied, getApplyCompleted);
+ }
+
+ private StateMachineMetrics(String serverId, LongSupplier getApplied,
+ LongSupplier getApplyCompleted) {
+
+ registry = RatisMetrics.getMetricRegistryForStateMachine(serverId);
+ registry.gauge(RatisMetricNames.STATEMACHINE_APPLIED_INDEX_GAUGE,
+ () -> () -> getApplied.getAsLong());
+ registry.gauge(RatisMetricNames.STATEMACHINE_APPLY_COMPLETED_GAUGE,
+ () -> () -> getApplyCompleted.getAsLong());
+ }
+}
\ No newline at end of file
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 8553a01..39eb472 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -76,6 +76,7 @@ class StateMachineUpdater implements Runnable {
private final AtomicReference<Long> stopIndex = new AtomicReference<>();
private volatile State state = State.RUNNING;
private SnapshotRetentionPolicy snapshotRetentionPolicy;
+ private StateMachineMetrics stateMachineMetrics = null;
StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
ServerState serverState, long lastAppliedIndex, RaftProperties properties) {
@@ -103,9 +104,19 @@ class StateMachineUpdater implements Runnable {
}
void start() {
+ //wait for RaftServerImpl and ServerState constructors to complete
+ initializeMetrics();
updater.start();
}
+ private void initializeMetrics() {
+ if (stateMachineMetrics == null) {
+ stateMachineMetrics =
+ StateMachineMetrics.getStateMachineMetrics(
+ server, appliedIndex, stateMachine);
+ }
+ }
+
private void stop() {
state = State.STOP;
try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index 08933b3..c8da330 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -30,4 +30,8 @@ public final class RatisMetricNames {
public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC = "follower_%s_last_heartbeat_elapsed_time";
+ public static final String STATEMACHINE_APPLIED_INDEX_GAUGE =
+ "statemachine_applied_index";
+ public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE =
+ "statemachine_apply_completed_index";
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 4230dab..674a732 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -35,6 +35,8 @@ public class RatisMetrics {
public static final String RATIS_LEADER_ELECTION_METRICS_DESC = "Metrics for Ratis Leader Election.";
public static final String RATIS_HEARTBEAT_METRICS = "heartbeat";
public static final String RATIS_HEARTBEAT_METRICS_DESC = "Metrics for Ratis Heartbeat.";
+ public static final String RATIS_STATEMACHINE_METRICS = "ratis_state_machine";
+ public static final String RATIS_STATEMACHINE_METRICS_DESC = "Metrics for State Machine Updater";
static MetricsReporting metricsReporting = new MetricsReporting(500, TimeUnit.MILLISECONDS);
@@ -75,4 +77,9 @@ public class RatisMetrics {
return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS, RATIS_HEARTBEAT_METRICS,
RATIS_HEARTBEAT_METRICS_DESC));
}
+
+ public static RatisMetricRegistry getMetricRegistryForStateMachine(String serverId) {
+ return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS,
+ RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC));
+ }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index fad9612..57d88e1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -19,6 +19,7 @@
package org.apache.ratis.statemachine.impl;
import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
@@ -108,6 +109,14 @@ public class BaseStateMachine implements StateMachine {
lastAppliedTermIndex.set(newTI);
}
+ /**
+ * to be used for testing only.
+ */
+ @VisibleForTesting
+ public void initLastAppliedTermIndex() {
+ setLastAppliedTermIndex(TermIndex.newTermIndex(0, 0));
+ }
+
protected boolean updateLastAppliedTermIndex(long term, long index) {
final TermIndex newTI = TermIndex.newTermIndex(term, index);
final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fded453..4ab5282 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -353,6 +353,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
}
@Test
+ public void testStateMachineMetrics() throws Exception {
+ runWithNewCluster(NUM_SERVERS, cluster ->
+ RaftBasicTests.testStateMachineMetrics(true, cluster, LOG));
+ }
+
+ @Test
public void testAppendEntriesTimeout() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestAppendEntriesTimeout);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index c01fdc2..ca61677 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -17,10 +17,12 @@
*/
package org.apache.ratis;
+import com.codahale.metrics.Gauge;
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.RaftClientTestUtil;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
@@ -30,8 +32,11 @@ import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
+import org.apache.ratis.server.metrics.RatisMetricNames;
+import org.apache.ratis.server.metrics.RatisMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LogUtils;
@@ -42,6 +47,7 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
+import java.util.SortedMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
@@ -431,4 +437,53 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
}
}
+
+ public static void testStateMachineMetrics(boolean async,
+ MiniRaftCluster cluster, Logger LOG) throws Exception {
+ RaftServerImpl leader = waitForLeader(cluster);
+ long time = System.currentTimeMillis();
+ try (final RaftClient client = cluster.createClient()) {
+
+ // this is required because the lastAppliedTermIndex is not initialized
+ ((BaseStateMachine) leader.getStateMachine()).initLastAppliedTermIndex();
+ Assert.assertTrue(leader.isLeader());
+
+ Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
+ RatisMetricNames.STATEMACHINE_APPLIED_INDEX_GAUGE);
+ Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,
+ RatisMetricNames.STATEMACHINE_APPLY_COMPLETED_GAUGE);
+
+ long appliedIndexBefore = (Long) appliedIndexGauge.getValue();
+ long smAppliedIndexBefore = (Long) smAppliedIndexGauge.getValue();
+
+ if (async) {
+ CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new SimpleMessage("abc"));
+ replyFuture.get();
+ } else {
+ client.send(new SimpleMessage("abc"));
+ }
+
+ long appliedIndexAfter = (Long) appliedIndexGauge.getValue();
+ long smAppliedIndexAfter = (Long) smAppliedIndexGauge.getValue();
+
+ Assert.assertTrue("StateMachine Applied Index not incremented",
+ appliedIndexAfter > appliedIndexBefore);
+ Assert.assertTrue("StateMachine Apply completed Index not incremented",
+ smAppliedIndexAfter > smAppliedIndexBefore);
+ }
+ }
+
+ private static Gauge getStatemachineGaugeWithName(RaftServerImpl server,
+ String gaugeName) {
+
+ RatisMetricRegistry ratisStateMachineMetricRegistry =
+ RatisMetrics.getMetricRegistryForStateMachine(
+ server.getMemberId().toString());
+
+ SortedMap<String, Gauge> gaugeMap =
+ ratisStateMachineMetricRegistry.getGauges((s, metric) ->
+ s.contains(gaugeName));
+
+ return gaugeMap.get(gaugeMap.firstKey());
+ }
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 0568581..0f1d71d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -60,6 +60,12 @@ public class TestRaftWithGrpc
}
@Test
+ public void testStateMachineMetrics() throws Exception {
+ runWithNewCluster(NUM_SERVERS, cluster ->
+ testStateMachineMetrics(false, cluster, LOG));
+ }
+
+ @Test
public void testUpdateViaHeartbeat() throws Exception {
runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
}