You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2019/09/16 10:01:46 UTC

[incubator-ratis] branch master updated: RATIS-670. Add a metric to track StateMachine Log apply index. Contributed by Supratim Deka.

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c8f2c7a  RATIS-670. Add a metric to track StateMachine Log apply index. Contributed by Supratim Deka.
c8f2c7a is described below

commit c8f2c7a2f1c9de71b38d837d5b010793c53ce51b
Author: Shashikant Banerjee <sh...@apache.org>
AuthorDate: Mon Sep 16 15:31:30 2019 +0530

    RATIS-670. Add a metric to track StateMachine Log apply index. Contributed by Supratim Deka.
---
 .../ratis/server/impl/StateMachineMetrics.java     | 57 ++++++++++++++++++++++
 .../ratis/server/impl/StateMachineUpdater.java     | 11 +++++
 .../ratis/server/metrics/RatisMetricNames.java     |  4 ++
 .../apache/ratis/server/metrics/RatisMetrics.java  |  7 +++
 .../ratis/statemachine/impl/BaseStateMachine.java  |  9 ++++
 .../test/java/org/apache/ratis/RaftAsyncTests.java |  6 +++
 .../test/java/org/apache/ratis/RaftBasicTests.java | 55 +++++++++++++++++++++
 .../org/apache/ratis/grpc/TestRaftWithGrpc.java    |  6 +++
 8 files changed, 155 insertions(+)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java
new file mode 100644
index 0000000..1bde465
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineMetrics.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.server.metrics.RatisMetricNames;
+import org.apache.ratis.server.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.RaftLogIndex;
+import org.apache.ratis.statemachine.StateMachine;
+
+import java.util.function.LongSupplier;
+
+/**
+ * Metrics Registry for the State Machine Updater. One instance per group.
+ */
+public final class StateMachineMetrics {
+  private RatisMetricRegistry registry = null;
+
+  public static StateMachineMetrics getStateMachineMetrics(
+      RaftServerImpl server, RaftLogIndex appliedIndex,
+      StateMachine stateMachine) {
+
+    String serverId = server.getMemberId().toString();
+    LongSupplier getApplied = appliedIndex::get;
+    LongSupplier getApplyCompleted =
+        () -> (stateMachine.getLastAppliedTermIndex() == null) ? -1
+            : stateMachine.getLastAppliedTermIndex().getIndex();
+
+    return new StateMachineMetrics(serverId, getApplied, getApplyCompleted);
+  }
+
+  private StateMachineMetrics(String serverId, LongSupplier getApplied,
+      LongSupplier getApplyCompleted) {
+
+    registry = RatisMetrics.getMetricRegistryForStateMachine(serverId);
+    registry.gauge(RatisMetricNames.STATEMACHINE_APPLIED_INDEX_GAUGE,
+        () -> () -> getApplied.getAsLong());
+    registry.gauge(RatisMetricNames.STATEMACHINE_APPLY_COMPLETED_GAUGE,
+        () -> () -> getApplyCompleted.getAsLong());
+  }
+}
\ No newline at end of file
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 8553a01..39eb472 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -76,6 +76,7 @@ class StateMachineUpdater implements Runnable {
   private final AtomicReference<Long> stopIndex = new AtomicReference<>();
   private volatile State state = State.RUNNING;
   private SnapshotRetentionPolicy snapshotRetentionPolicy;
+  private StateMachineMetrics stateMachineMetrics = null;
 
   StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
       ServerState serverState, long lastAppliedIndex, RaftProperties properties) {
@@ -103,9 +104,19 @@ class StateMachineUpdater implements Runnable {
   }
 
   void start() {
+    //wait for RaftServerImpl and ServerState constructors to complete
+    initializeMetrics();
     updater.start();
   }
 
+  private void initializeMetrics() {
+    if (stateMachineMetrics == null) {
+      stateMachineMetrics =
+          StateMachineMetrics.getStateMachineMetrics(
+              server, appliedIndex, stateMachine);
+    }
+  }
+
   private void stop() {
     state = State.STOP;
     try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
index 08933b3..c8da330 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetricNames.java
@@ -30,4 +30,8 @@ public final class RatisMetricNames {
 
   public static final String FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC = "follower_%s_last_heartbeat_elapsed_time";
 
+  public static final String STATEMACHINE_APPLIED_INDEX_GAUGE =
+      "statemachine_applied_index";
+  public static final String STATEMACHINE_APPLY_COMPLETED_GAUGE =
+      "statemachine_apply_completed_index";
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
index 4230dab..674a732 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RatisMetrics.java
@@ -35,6 +35,8 @@ public class RatisMetrics {
   public static final String RATIS_LEADER_ELECTION_METRICS_DESC = "Metrics for Ratis Leader Election.";
   public static final String RATIS_HEARTBEAT_METRICS = "heartbeat";
   public static final String RATIS_HEARTBEAT_METRICS_DESC = "Metrics for Ratis Heartbeat.";
+  public static final String RATIS_STATEMACHINE_METRICS = "ratis_state_machine";
+  public static final String RATIS_STATEMACHINE_METRICS_DESC = "Metrics for State Machine Updater";
 
   static MetricsReporting metricsReporting = new MetricsReporting(500, TimeUnit.MILLISECONDS);
 
@@ -75,4 +77,9 @@ public class RatisMetrics {
     return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS, RATIS_HEARTBEAT_METRICS,
         RATIS_HEARTBEAT_METRICS_DESC));
   }
+
+  public static RatisMetricRegistry getMetricRegistryForStateMachine(String serverId) {
+    return create(new MetricRegistryInfo(serverId, RATIS_APPLICATION_NAME_METRICS,
+        RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC));
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index fad9612..57d88e1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -19,6 +19,7 @@
 package org.apache.ratis.statemachine.impl;
 
 import com.codahale.metrics.Timer;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -108,6 +109,14 @@ public class BaseStateMachine implements StateMachine {
     lastAppliedTermIndex.set(newTI);
   }
 
+  /**
+   * to be used for testing only.
+   */
+  @VisibleForTesting
+  public void initLastAppliedTermIndex() {
+    setLastAppliedTermIndex(TermIndex.newTermIndex(0, 0));
+  }
+
   protected boolean updateLastAppliedTermIndex(long term, long index) {
     final TermIndex newTI = TermIndex.newTermIndex(term, index);
     final TermIndex oldTI = lastAppliedTermIndex.getAndSet(newTI);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fded453..4ab5282 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -353,6 +353,12 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
   }
 
   @Test
+  public void testStateMachineMetrics() throws Exception {
+    runWithNewCluster(NUM_SERVERS, cluster ->
+        RaftBasicTests.testStateMachineMetrics(true, cluster, LOG));
+  }
+
+  @Test
   public void testAppendEntriesTimeout() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestAppendEntriesTimeout);
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index c01fdc2..ca61677 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -17,10 +17,12 @@
  */
 package org.apache.ratis;
 
+import com.codahale.metrics.Gauge;
 import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
+import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
@@ -30,8 +32,11 @@ import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.RetryCacheTestUtil;
+import org.apache.ratis.server.metrics.RatisMetricNames;
+import org.apache.ratis.server.metrics.RatisMetrics;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
 import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
@@ -42,6 +47,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.SortedMap;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.CompletableFuture;
@@ -431,4 +437,53 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
       Assert.assertTrue(duration.compareTo(retryCacheExpiryDuration) >= 0);
     }
   }
+
+  public static void testStateMachineMetrics(boolean async,
+      MiniRaftCluster cluster, Logger LOG) throws Exception {
+    RaftServerImpl leader = waitForLeader(cluster);
+    long time = System.currentTimeMillis();
+    try (final RaftClient client = cluster.createClient()) {
+
+      // this is required because the lastAppliedTermIndex is not initialized
+      ((BaseStateMachine) leader.getStateMachine()).initLastAppliedTermIndex();
+      Assert.assertTrue(leader.isLeader());
+
+      Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
+          RatisMetricNames.STATEMACHINE_APPLIED_INDEX_GAUGE);
+      Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,
+          RatisMetricNames.STATEMACHINE_APPLY_COMPLETED_GAUGE);
+
+      long appliedIndexBefore = (Long) appliedIndexGauge.getValue();
+      long smAppliedIndexBefore = (Long) smAppliedIndexGauge.getValue();
+
+      if (async) {
+        CompletableFuture<RaftClientReply> replyFuture = client.sendAsync(new SimpleMessage("abc"));
+        replyFuture.get();
+      } else {
+        client.send(new SimpleMessage("abc"));
+      }
+
+      long appliedIndexAfter = (Long) appliedIndexGauge.getValue();
+      long smAppliedIndexAfter = (Long) smAppliedIndexGauge.getValue();
+
+      Assert.assertTrue("StateMachine Applied Index not incremented",
+          appliedIndexAfter > appliedIndexBefore);
+      Assert.assertTrue("StateMachine Apply completed Index not incremented",
+          smAppliedIndexAfter > smAppliedIndexBefore);
+    }
+  }
+
+  private static Gauge getStatemachineGaugeWithName(RaftServerImpl server,
+      String gaugeName) {
+
+    RatisMetricRegistry ratisStateMachineMetricRegistry =
+        RatisMetrics.getMetricRegistryForStateMachine(
+            server.getMemberId().toString());
+
+    SortedMap<String, Gauge> gaugeMap =
+        ratisStateMachineMetricRegistry.getGauges((s, metric) ->
+            s.contains(gaugeName));
+
+    return gaugeMap.get(gaugeMap.firstKey());
+  }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 0568581..0f1d71d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -60,6 +60,12 @@ public class TestRaftWithGrpc
   }
 
   @Test
+  public void testStateMachineMetrics() throws Exception {
+    runWithNewCluster(NUM_SERVERS, cluster ->
+        testStateMachineMetrics(false, cluster, LOG));
+  }
+
+  @Test
   public void testUpdateViaHeartbeat() throws Exception {
     runWithNewCluster(NUM_SERVERS, this::runTestUpdateViaHeartbeat);
   }