You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/27 15:57:09 UTC
[incubator-ratis] branch master updated: RATIS-1184. Move
RaftServerMetrics to org.apache.ratis.server.metrics. (#303)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c3a1d56 RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics. (#303)
c3a1d56 is described below
commit c3a1d5667c6943fef25ff3bbc937aee30f44a5af
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 27 23:56:51 2020 +0800
RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics. (#303)
* RATIS-1184. Move RaftServerMetrics to org.apache.ratis.server.metrics.
* Fix checkstyle
* Remove unused import
---
.../org/apache/ratis/server/impl/LeaderState.java | 10 +-
.../org/apache/ratis/server/impl/LogAppender.java | 4 +-
.../apache/ratis/server/impl/PendingRequests.java | 1 +
.../apache/ratis/server/impl/RaftServerImpl.java | 29 ++---
.../{impl => metrics}/RaftServerMetrics.java | 137 +++++++++++----------
.../java/org/apache/ratis/LogAppenderTests.java | 21 ++--
.../java/org/apache/ratis/MiniRaftCluster.java | 30 +++--
.../test/java/org/apache/ratis/RaftBasicTests.java | 85 ++++++-------
.../test/java/org/apache/ratis/RaftTestUtil.java | 5 +-
.../ratis/server/impl/RaftServerTestUtil.java | 15 +++
.../server/impl/TestRatisServerMetricsBase.java | 2 +-
.../ratis/server/impl/TestRetryCacheMetrics.java | 14 +--
.../ratis/statemachine/RaftSnapshotBaseTest.java | 29 +++--
.../ratis/TestRaftServerSlownessDetection.java | 12 +-
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 33 +++--
.../ratis/grpc/TestRaftSnapshotWithGrpc.java | 6 +-
16 files changed, 221 insertions(+), 212 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index e161d6f..30eec01 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.LogAppenderMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
@@ -454,7 +455,7 @@ public class LeaderState {
server.getRpcSlownessTimeoutMs());
LogAppender logAppender = server.newLogAppender(this, f);
peerIdFollowerInfoMap.put(peer.getId(), f);
- raftServerMetrics.addFollower(f.getPeer());
+ raftServerMetrics.addFollower(peer.getId());
logAppenderMetrics.addFollowerGauges(f);
return logAppender;
}).collect(Collectors.toList());
@@ -988,12 +989,9 @@ public class LeaderState {
/**
* Record Follower Heartbeat Elapsed Time.
- * @param follower RaftPeer.
- * @param elapsedTime Elapsed time in Nanos.
*/
- void recordFollowerHeartbeatElapsedTime(RaftPeer follower, long elapsedTime) {
- raftServerMetrics.recordFollowerHeartbeatElapsedTime(follower,
- elapsedTime);
+ void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, TimeDuration elapsedTime) {
+ raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS));
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 6c1f80e..aac6c05 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -565,8 +565,8 @@ public class LogAppender {
if (follower.isSlow()) {
server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getRoleInfoProto());
}
- leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer(),
- follower.getLastRpcResponseTime().elapsedTime().getDuration());
+ leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(),
+ follower.getLastRpcResponseTime().elapsedTime());
}
public synchronized void notifyAppend() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 5a3c493..9437a2f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -27,6 +27,7 @@ import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 691ed6e..a1dcc36 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -36,6 +36,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.protocol.TermIndex;
@@ -140,7 +141,8 @@ public class RaftServerImpl implements RaftServer.Division,
this.jmxAdapter = new RaftServerJmxAdapter();
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(this);
- this.raftServerMetrics = RaftServerMetrics.getRaftServerMetrics(this);
+ this.raftServerMetrics = RaftServerMetrics.computeIfAbsentRaftServerMetrics(
+ getMemberId(), () -> commitInfoCache::get, () -> retryCache);
this.startComplete = new AtomicBoolean(false);
}
@@ -356,7 +358,7 @@ public class RaftServerImpl implements RaftServer.Division,
try {
leaderElectionMetrics.unregister();
raftServerMetrics.unregister();
- RaftServerMetrics.removeRaftServerMetrics(this);
+ RaftServerMetrics.removeRaftServerMetrics(getMemberId());
} catch (Exception ignored) {
LOG.warn("{}: Failed to unregister metric", getMemberId(), ignored);
}
@@ -672,8 +674,7 @@ public class RaftServerImpl implements RaftServer.Division,
RaftClientRequest request) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
LOG.debug("{}: receive client request({})", getMemberId(), request);
- Timer timer = raftServerMetrics.getClientRequestTimer(request);
- final Timer.Context timerContext = (timer != null) ? timer.time() : null;
+ final Optional<Timer> timer = Optional.ofNullable(raftServerMetrics.getClientRequestTimer(request.getType()));
CompletableFuture<RaftClientReply> replyFuture;
@@ -735,30 +736,16 @@ public class RaftServerImpl implements RaftServer.Division,
final RaftClientRequest.Type type = request.getType();
replyFuture.whenComplete((clientReply, exception) -> {
- if (clientReply.isSuccess() && timerContext != null) {
- timerContext.stop();
+ if (clientReply.isSuccess()) {
+ timer.map(Timer::time).ifPresent(Timer.Context::stop);
}
if (exception != null || clientReply.getException() != null) {
- incFailedRequestCount(type);
+ raftServerMetrics.incFailedRequestCount(type);
}
});
return replyFuture;
}
- private void incFailedRequestCount(RaftClientRequest.Type type) {
- if (type.is(TypeCase.STALEREAD)) {
- raftServerMetrics.onFailedClientStaleRead();
- } else if (type.is(TypeCase.WATCH)) {
- raftServerMetrics.onFailedClientWatch();
- } else if (type.is(TypeCase.WRITE)) {
- raftServerMetrics.onFailedClientWrite();
- } else if (type.is(TypeCase.READ)) {
- raftServerMetrics.onFailedClientRead();
- } else if (type.is(TypeCase.MESSAGESTREAM)) {
- raftServerMetrics.onFailedClientStream();
- }
- }
-
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
return role.getLeaderState()
.map(ls -> ls.addWatchReqeust(request))
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
similarity index 63%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
rename to ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
index 3b24010..4be970a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
@@ -16,14 +16,17 @@
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.metrics;
import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_APPEND_ENTRIES_LATENCY;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
@@ -31,12 +34,13 @@ import com.codahale.metrics.Timer;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
-import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftClientRequest.Type;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.server.impl.RetryCache;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Preconditions;
@@ -76,30 +80,36 @@ public final class RaftServerMetrics extends RatisMetrics {
public static final String RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT =
"numFailedClientStreamOnServer";
- private Map<String, Long> followerLastHeartbeatElapsedTimeMap = new HashMap<>();
- private CommitInfoCache commitInfoCache;
+ /** Follower Id -> heartbeat elapsed */
+ private final Map<RaftPeerId, Long> followerLastHeartbeatElapsedTimeMap = new HashMap<>();
+ private final Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache;
- private static Map<String, RaftServerMetrics> metricsMap = new ConcurrentHashMap<>();
+ /** id -> metric */
+ private static final Map<RaftGroupMemberId, RaftServerMetrics> METRICS = new ConcurrentHashMap<>();
+ /** id -> key */
+ private static final Map<RaftPeerId, String> PEER_COMMIT_INDEX_GAUGE_KEYS = new ConcurrentHashMap<>();
- public static RaftServerMetrics getRaftServerMetrics(
- RaftServerImpl raftServer) {
- RaftServerMetrics serverMetrics = new RaftServerMetrics(raftServer);
- metricsMap.put(raftServer.getMemberId().toString(), serverMetrics);
- return serverMetrics;
+ private static String getPeerCommitIndexGaugeKey(RaftPeerId serverId) {
+ return PEER_COMMIT_INDEX_GAUGE_KEYS.computeIfAbsent(serverId,
+ key -> String.format(LEADER_METRIC_PEER_COMMIT_INDEX, key));
}
- public static void removeRaftServerMetrics(RaftServerImpl raftServer) {
- String memberId = raftServer.getMemberId().toString();
- if (metricsMap.containsKey(memberId)) {
- metricsMap.remove(memberId);
- }
+ public static RaftServerMetrics computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
+ Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache, Supplier<RetryCache> retryCache) {
+ return METRICS.computeIfAbsent(serverId,
+ key -> new RaftServerMetrics(serverId, commitInfoCache, retryCache));
+ }
+
+ public static void removeRaftServerMetrics(RaftGroupMemberId serverId) {
+ METRICS.remove(serverId);
}
- private RaftServerMetrics(RaftServerImpl server) {
- registry = getMetricRegistryForRaftServer(server.getMemberId().toString());
- commitInfoCache = server.getCommitInfoCache();
- addPeerCommitIndexGauge(server.getId());
- addRetryCacheMetric(server);
+ public RaftServerMetrics(RaftGroupMemberId serverId,
+ Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache, Supplier<RetryCache> retryCache) {
+ this.registry = getMetricRegistryForRaftServer(serverId.toString());
+ this.commitInfoCache = commitInfoCache;
+ addPeerCommitIndexGauge(serverId.getPeerId());
+ addRetryCacheMetric(retryCache);
}
private RatisMetricRegistry getMetricRegistryForRaftServer(String serverId) {
@@ -108,20 +118,18 @@ public final class RaftServerMetrics extends RatisMetrics {
RATIS_SERVER_METRICS_DESC));
}
- private void addRetryCacheMetric(RaftServerImpl raftServer) {
- registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> raftServer.getRetryCache().size());
- registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC, () -> () -> raftServer.getRetryCache().stats().hitCount());
- registry.gauge(RETRY_CACHE_HIT_RATE_METRIC, () -> () -> raftServer.getRetryCache().stats().hitRate());
- registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC, () -> () -> raftServer.getRetryCache().stats().missCount());
- registry.gauge(RETRY_CACHE_MISS_RATE_METRIC, () -> () -> raftServer.getRetryCache().stats().missRate());
+ private void addRetryCacheMetric(Supplier<RetryCache> retryCache) {
+ registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> retryCache.get().size());
+ registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC , () -> () -> retryCache.get().stats().hitCount());
+ registry.gauge(RETRY_CACHE_HIT_RATE_METRIC , () -> () -> retryCache.get().stats().hitRate());
+ registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC , () -> () -> retryCache.get().stats().missCount());
+ registry.gauge(RETRY_CACHE_MISS_RATE_METRIC , () -> () -> retryCache.get().stats().missRate());
}
/**
* Register a follower with this Leader Metrics registry instance.
- * @param peer {@Link RaftPeer} representing the follower
*/
- public void addFollower(RaftPeer peer) {
- String followerName = peer.getId().toString();
+ public void addFollower(RaftPeerId followerName) {
String followerHbMetricKey = String.format(
FOLLOWER_LAST_HEARTBEAT_ELAPSED_TIME_METRIC,
followerName);
@@ -130,43 +138,32 @@ public final class RaftServerMetrics extends RatisMetrics {
registry.gauge(followerHbMetricKey,
() -> () -> followerLastHeartbeatElapsedTimeMap.get(followerName));
- addPeerCommitIndexGauge(peer.getId());
+ addPeerCommitIndexGauge(followerName);
}
/**
* Register a commit index tracker for the peer in cluster.
*/
public void addPeerCommitIndexGauge(RaftPeerId peerId) {
- String followerCommitIndexKey = String.format(
- LEADER_METRIC_PEER_COMMIT_INDEX, peerId);
- registry.gauge(followerCommitIndexKey, () -> () -> {
- RaftProtos.CommitInfoProto commitInfoProto = commitInfoCache.get(peerId);
- if (commitInfoProto != null) {
- return commitInfoProto.getCommitIndex();
- }
- return 0L;
- });
+ registry.gauge(getPeerCommitIndexGaugeKey(peerId), () -> () -> Optional.ofNullable(commitInfoCache.get())
+ .map(cache -> cache.apply(peerId))
+ .map(CommitInfoProto::getCommitIndex)
+ .orElse(0L));
}
/**
* Get the commit index gauge for the given peer of the server
- * @param server
- * @param peerServer
* @return Metric Gauge holding the value of commit index of the peer
*/
@VisibleForTesting
- public static Gauge getPeerCommitIndexGauge(RaftServerImpl server,
- RaftServerImpl peerServer) {
+ public static Gauge getPeerCommitIndexGauge(RaftGroupMemberId serverId, RaftPeerId peerId) {
- RaftServerMetrics serverMetrics =
- metricsMap.get(server.getMemberId().toString());
+ final RaftServerMetrics serverMetrics = METRICS.get(serverId);
if (serverMetrics == null) {
return null;
}
- String followerCommitIndexKey = String.format(
- LEADER_METRIC_PEER_COMMIT_INDEX,
- peerServer.getPeer().getId().toString());
+ final String followerCommitIndexKey = getPeerCommitIndexGaugeKey(peerId);
SortedMap<String, Gauge> map =
serverMetrics.registry.getGauges((s, metric) ->
@@ -178,12 +175,11 @@ public final class RaftServerMetrics extends RatisMetrics {
/**
* Record heartbeat elapsed time for a follower within a Raft group.
- * @param peer {@Link RaftPeer} representing the follower.
- * @param elapsedTime Elapsed time in Nanos.
+ * @param followerId the follower id.
+ * @param elapsedTimeNs Elapsed time in Nanos.
*/
- public void recordFollowerHeartbeatElapsedTime(RaftPeer peer, long elapsedTime) {
- followerLastHeartbeatElapsedTimeMap.put(peer.getId().toString(),
- elapsedTime);
+ public void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, long elapsedTimeNs) {
+ followerLastHeartbeatElapsedTimeMap.put(followerId, elapsedTimeNs);
}
public Timer getFollowerAppendEntryTimer(boolean isHeartbeat) {
@@ -198,13 +194,13 @@ public final class RaftServerMetrics extends RatisMetrics {
return registry.counter(counterName);
}
- public Timer getClientRequestTimer(RaftClientRequest request) {
+ public Timer getClientRequestTimer(Type request) {
if (request.is(TypeCase.READ)) {
return getTimer(RAFT_CLIENT_READ_REQUEST);
} else if (request.is(TypeCase.STALEREAD)) {
return getTimer(RAFT_CLIENT_STALE_READ_REQUEST);
} else if (request.is(TypeCase.WATCH)) {
- String watchType = RaftClientRequest.Type.toString(request.getType().getWatch().getReplication());
+ String watchType = Type.toString(request.getWatch().getReplication());
return getTimer(String.format(RAFT_CLIENT_WATCH_REQUEST, watchType));
} else if (request.is(TypeCase.WRITE)) {
return getTimer(RAFT_CLIENT_WRITE_REQUEST);
@@ -216,27 +212,27 @@ public final class RaftServerMetrics extends RatisMetrics {
registry.counter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).inc();
}
- void addNumPendingRequestsGauge(Gauge queueSize) {
+ public void addNumPendingRequestsGauge(Gauge queueSize) {
registry.gauge(REQUEST_QUEUE_SIZE, () -> queueSize);
}
- boolean removeNumPendingRequestsGauge() {
+ public boolean removeNumPendingRequestsGauge() {
return registry.remove(REQUEST_QUEUE_SIZE);
}
- void addNumPendingRequestsByteSize(Gauge byteSize) {
+ public void addNumPendingRequestsByteSize(Gauge byteSize) {
registry.gauge(REQUEST_BYTE_SIZE, () -> byteSize);
}
- boolean removeNumPendingRequestsByteSize() {
+ public boolean removeNumPendingRequestsByteSize() {
return registry.remove(REQUEST_BYTE_SIZE);
}
- void onRequestByteSizeLimitHit() {
+ public void onRequestByteSizeLimitHit() {
registry.counter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).inc();
}
- void onResourceLimitHit() {
+ public void onResourceLimitHit() {
registry.counter(RESOURCE_LIMIT_HIT_COUNTER).inc();
}
@@ -259,6 +255,21 @@ public final class RaftServerMetrics extends RatisMetrics {
void onFailedClientStream() {
registry.counter(RATIS_SERVER_FAILED_CLIENT_STREAM_COUNT).inc();
}
+
+ public void incFailedRequestCount(Type type) {
+ if (type.is(TypeCase.STALEREAD)) {
+ onFailedClientStaleRead();
+ } else if (type.is(TypeCase.WATCH)) {
+ onFailedClientWatch();
+ } else if (type.is(TypeCase.WRITE)) {
+ onFailedClientWrite();
+ } else if (type.is(TypeCase.READ)) {
+ onFailedClientRead();
+ } else if (type.is(TypeCase.MESSAGESTREAM)) {
+ onFailedClientStream();
+ }
+ }
+
public RatisMetricRegistry getRegistry() {
return registry;
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 1eafb7f..f710ed1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -28,12 +28,12 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -130,7 +130,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
// Start a 3 node Ratis ring.
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
- RaftServerImpl leaderServer = waitForLeader(cluster);
+ final RaftServer.Division leaderServer = waitForLeader(cluster);
// Write 10 messages to leader.
try(RaftClient client = cluster.createClient(leaderServer.getId())) {
@@ -141,15 +141,14 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
throw e;
}
- RatisMetricRegistry ratisMetricRegistry =
- RaftServerMetrics.getRaftServerMetrics(leaderServer).getRegistry();
+ final RatisMetricRegistry ratisMetricRegistry = RaftServerTestUtil.getRaftServerMetrics(leaderServer).getRegistry();
// Get all last_heartbeat_elapsed_time metric gauges. Should be equal to number of followers.
SortedMap<String, Gauge> heartbeatElapsedTimeGauges = ratisMetricRegistry.getGauges((s, metric) ->
s.contains("lastHeartbeatElapsedTime"));
assertTrue(heartbeatElapsedTimeGauges.size() == 2);
- for (RaftServerImpl followerServer : cluster.getFollowers()) {
+ for (RaftServer.Division followerServer : cluster.getFollowers()) {
String followerId = followerServer.getId().toString();
Gauge metric = heartbeatElapsedTimeGauges.entrySet().parallelStream().filter(e -> e.getKey().contains(
followerId)).iterator().next().getValue();
@@ -158,7 +157,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
// Metric in nanos > 0.
assertTrue((long)metric.getValue() > 0);
// Try to get Heartbeat metrics for follower.
- RaftServerMetrics followerMetrics = RaftServerMetrics.getRaftServerMetrics(followerServer);
+ final RaftServerMetrics followerMetrics = RaftServerTestUtil.getRaftServerMetrics(followerServer);
// Metric should not exist. It only exists in leader.
assertTrue(followerMetrics.getRegistry().getGauges((s, m) -> s.contains("lastHeartbeatElapsedTime")).isEmpty());
for (boolean heartbeat : new boolean[] { true, false }) {
@@ -208,8 +207,8 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
}
}
- final ServerState leaderState = cluster.getLeader().getState();
- final RaftLog leaderLog = leaderState.getLog();
+ final RaftServer.Division leader = cluster.getLeader();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
LOG.info("counts = " + counts);
Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
@@ -217,6 +216,6 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
final LogEntryProto last = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, leaderLog);
LOG.info("last = " + ServerProtoUtils.toLogEntryString(last));
Assert.assertNotNull(last);
- Assert.assertTrue(last.getIndex() <= leaderState.getLastAppliedIndex());
+ Assert.assertTrue(last.getIndex() <= RaftServerTestUtil.getLastAppliedIndex(leader));
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index c136add..c40d1d2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -505,8 +505,8 @@ public abstract class MiniRaftCluster implements Closeable {
return b.toString();
}
- public RaftServerImpl getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
- final RaftServerImpl leader = getLeader();
+ public RaftServer.Division getLeaderAndSendFirstMessage(boolean ignoreException) throws IOException {
+ final RaftServer.Division leader = getLeader();
try(RaftClient client = createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("first msg to make leader ready"));
} catch (IOException e) {
@@ -590,7 +590,7 @@ public abstract class MiniRaftCluster implements Closeable {
}
boolean isLeader(String leaderId) {
- final RaftServerImpl leader = getLeader();
+ final RaftServer.Division leader = getLeader();
return leader != null && leader.getId().toString().equals(leaderId);
}
@@ -600,6 +600,12 @@ public abstract class MiniRaftCluster implements Closeable {
.collect(Collectors.toList());
}
+ public List<RaftServer.Division> getFollowerDivisions() {
+ return getServerAliveStream()
+ .filter(RaftServerImpl::isFollower)
+ .collect(Collectors.toList());
+ }
+
public Collection<RaftServerProxy> getServers() {
return servers.values();
}
@@ -613,6 +619,10 @@ public abstract class MiniRaftCluster implements Closeable {
return CollectionUtils.as(getServers(), this::getRaftServerImpl);
}
+ public Iterable<RaftServer.Division> iterateDivisions() {
+ return CollectionUtils.as(getServers(), this::getRaftServerImpl);
+ }
+
private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
return groupId != null?
@@ -636,22 +646,26 @@ public abstract class MiniRaftCluster implements Closeable {
return servers.get(id);
}
+ public RaftServer.Division getDivision(RaftPeerId id) {
+ return getRaftServerImpl(servers.get(id));
+ }
+
+ public RaftServer.Division getDivision(RaftPeerId id, RaftGroupId groupId) {
+ return RaftServerTestUtil.getRaftServerImpl(servers.get(id), groupId);
+ }
+
public RaftServerImpl getRaftServerImpl(RaftPeerId id) {
return getRaftServerImpl(servers.get(id));
}
public RaftServerImpl getRaftServerImpl(RaftPeerId id, RaftGroupId groupId) {
- return getRaftServerImpl(servers.get(id), groupId);
+ return RaftServerTestUtil.getRaftServerImpl(servers.get(id), groupId);
}
public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
}
- public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
- return RaftServerTestUtil.getRaftServerImpl(proxy, groupId);
- }
-
public List<RaftPeer> getPeers() {
return toRaftPeers(getServers());
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index 3486f90..e183dce 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -18,8 +18,6 @@
package org.apache.ratis;
import com.codahale.metrics.Gauge;
-
-import java.util.Set;
import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
@@ -27,14 +25,18 @@ import org.apache.ratis.client.impl.RaftClientTestUtil;
import org.apache.ratis.metrics.MetricRegistries;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
@@ -47,6 +49,7 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.SortedMap;
import java.util.Timer;
import java.util.TimerTask;
@@ -60,11 +63,11 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
+import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS;
import static org.apache.ratis.server.impl.StateMachineMetrics.RATIS_STATEMACHINE_METRICS_DESC;
import static org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_APPLIED_INDEX_GAUGE;
import static org.apache.ratis.server.impl.StateMachineMetrics.STATEMACHINE_APPLY_COMPLETED_GAUGE;
-import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -116,8 +119,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
for (RaftServer s : cluster.getServers()) {
cluster.restartServer(s.getId(), false);
}
- RaftServerImpl leader = waitForLeader(cluster);
- final long term = leader.getState().getCurrentTerm();
+ RaftServer.Division leader = waitForLeader(cluster);
+ final long term = RaftServerTestUtil.getCurrentTerm(leader);
final CompletableFuture<Void> killAndRestartFollower = killAndRestartServer(
cluster.getFollowers().get(0).getId(), 0, 1000, cluster, LOG);
@@ -162,12 +165,11 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
killAndRestartFollower.join();
killAndRestartLeader.join();
- for(RaftServerProxy server : cluster.getServers()) {
- final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(server, cluster.getGroupId());
- if (impl.isAlive()) {
+
+ final List<RaftServer.Division> divisions = cluster.getServerAliveStream().collect(Collectors.toList());
+ for(RaftServer.Division impl: divisions) {
JavaUtils.attempt(() -> RaftTestUtil.assertLogEntries(impl, term, messages),
5, TimeDuration.valueOf(1, TimeUnit.SECONDS), impl.getId() + " assertLogEntries", LOG);
- }
}
}
@@ -178,23 +180,22 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
}
void runTestOldLeaderCommit(CLUSTER cluster) throws Exception {
- final RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
- final long term = leader.getState().getCurrentTerm();
+ final long term = RaftServerTestUtil.getCurrentTerm(leader);
- List<RaftServerImpl> followers = cluster.getFollowers();
- final List<RaftServerImpl> followersToSendLog = followers.subList(0, followers.size() / 2);
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followersToSendLog = followers.subList(0, followers.size() / 2);
for (int i = followers.size() / 2; i < NUM_SERVERS - 1; i++) {
- RaftServerImpl follower = followers.get(i);
- cluster.killServer(follower.getId());
+ cluster.killServer(followers.get(i).getId());
}
SimpleMessage[] messages = SimpleMessage.create(1);
RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
- for (RaftServerImpl followerToSendLog : followersToSendLog) {
- RaftLog followerLog = followerToSendLog.getState().getLog();
+ for (RaftServer.Division followerToSendLog : followersToSendLog) {
+ RaftLog followerLog = RaftServerTestUtil.getRaftLog(followerToSendLog);
Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, messages));
}
@@ -202,9 +203,9 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
cluster.killServer(leaderId);
for (int i = followers.size() / 2; i < NUM_SERVERS - 1; i++) {
- RaftServerImpl follower = followers.get(i);
- LOG.info(String.format("restarting follower: %s", follower.getId().toString()));
- cluster.restartServer(follower.getId(), false);
+ final RaftPeerId followerId = followers.get(i).getId();
+ LOG.info(String.format("restarting follower: %s", followerId));
+ cluster.restartServer(followerId, false);
}
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
@@ -227,12 +228,11 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = waitForLeader(cluster).getId();
- List<RaftServerImpl> followers = cluster.getFollowers();
- final RaftServerImpl followerToCommit = followers.get(0);
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final RaftServer.Division followerToCommit = followers.get(0);
try {
for (int i = 1; i < NUM_SERVERS - 1; i++) {
- RaftServerImpl follower = followers.get(i);
- cluster.killServer(follower.getId());
+ cluster.killServer(followers.get(i).getId());
}
} catch (IndexOutOfBoundsException e) {
throw new org.junit.AssumptionViolatedException("The assumption is follower.size() = NUM_SERVERS - 1, "
@@ -243,14 +243,13 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
- RaftTestUtil.logEntriesContains(followerToCommit.getState().getLog(), messages);
+ RaftTestUtil.logEntriesContains(RaftServerTestUtil.getRaftLog(followerToCommit), messages);
cluster.killServer(leaderId);
cluster.killServer(followerToCommit.getId());
for (int i = 1; i < NUM_SERVERS - 1; i++) {
- RaftServerImpl follower = followers.get(i);
- cluster.restartServer(follower.getId(), false );
+ cluster.restartServer(followers.get(i).getId(), false );
}
waitForLeader(cluster);
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
@@ -372,7 +371,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
if (last != previousLastStep) {
previousLastStep = last;
} else {
- final RaftServerImpl leader = cluster.getLeader();
+ final RaftServer.Division leader = cluster.getLeader();
LOG.info("NO PROGRESS at " + last + ", try to restart leader=" + leader);
if (leader != null) {
try {
@@ -405,7 +404,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
count++;
try {
- RaftServerImpl leader = cluster.getLeader();
+ RaftServer.Division leader = cluster.getLeader();
if (leader != null) {
RaftTestUtil.changeLeader(cluster, leader.getId());
}
@@ -455,11 +454,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
public static void testStateMachineMetrics(boolean async,
MiniRaftCluster cluster, Logger LOG) throws Exception {
- RaftServerImpl leader = waitForLeader(cluster);
+ RaftServer.Division leader = waitForLeader(cluster);
try (final RaftClient client = cluster.createClient()) {
-
- Assert.assertTrue(leader.isLeader());
-
Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
STATEMACHINE_APPLIED_INDEX_GAUGE);
Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,
@@ -488,27 +484,24 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
}
private static void checkFollowerCommitLagsLeader(MiniRaftCluster cluster) {
- List<RaftServerImpl> followers = cluster.getFollowers();
- RaftServerImpl leader = cluster.getLeader();
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final RaftGroupMemberId leader = cluster.getLeader().getMemberId();
- Gauge leaderCommitGauge = RaftServerMetrics
- .getPeerCommitIndexGauge(leader, leader);
+ Gauge leaderCommitGauge = RaftServerMetrics.getPeerCommitIndexGauge(leader, leader.getPeerId());
- for (RaftServerImpl follower : followers) {
- Gauge followerCommitGauge = RaftServerMetrics
- .getPeerCommitIndexGauge(leader, follower);
+ for (RaftServer.Division f : followers) {
+ final RaftGroupMemberId follower = f.getMemberId();
+ Gauge followerCommitGauge = RaftServerMetrics.getPeerCommitIndexGauge(leader, follower.getPeerId());
Assert.assertTrue((Long)leaderCommitGauge.getValue() >=
(Long)followerCommitGauge.getValue());
- Gauge followerMetric = RaftServerMetrics
- .getPeerCommitIndexGauge(follower, follower);
+ Gauge followerMetric = RaftServerMetrics.getPeerCommitIndexGauge(follower, follower.getPeerId());
System.out.println(followerCommitGauge.getValue());
System.out.println(followerMetric.getValue());
Assert.assertTrue((Long)followerCommitGauge.getValue() <= (Long)followerMetric.getValue());
}
}
- private static Gauge getStatemachineGaugeWithName(RaftServerImpl server,
- String gaugeName) {
+ private static Gauge getStatemachineGaugeWithName(RaftServer.Division server, String gaugeName) {
MetricRegistryInfo info = new MetricRegistryInfo(server.getMemberId().toString(),
RATIS_APPLICATION_NAME_METRICS,
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 6dd124b..118293e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,6 +31,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -232,9 +233,9 @@ public interface RaftTestUtil {
}
}
- static void assertLogEntries(RaftServerImpl server, long expectedTerm, SimpleMessage... expectedMessages) {
+ static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage... expectedMessages) {
LOG.info("checking raft log for {}", server.getMemberId());
- final RaftLog log = server.getState().getLog();
+ final RaftLog log = RaftServerTestUtil.getRaftLog(server);
try {
RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
} catch (AssertionError e) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 56b7e60..f0e527f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -28,6 +28,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.JavaUtils;
@@ -107,6 +108,10 @@ public class RaftServerTestUtil {
return ((RaftServerImpl)server).getState().getLastAppliedIndex();
}
+ public static long getLatestInstalledSnapshotIndex(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState().getLatestInstalledSnapshotIndex();
+ }
+
public static long getRetryCacheSize(RaftServer.Division server) {
return ((RaftServerImpl)server).getRetryCache().size();
}
@@ -168,4 +173,14 @@ public class RaftServerTestUtil {
public static DataStreamMap newDataStreamMap(Object name) {
return new DataStreamMapImpl(name);
}
+
+ public static void shutdown(RaftServer.Division server) {
+ ((RaftServerImpl)server).shutdown();
+ }
+
+ public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
+ final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
+ Assert.assertNotNull(f);
+ Assert.assertTrue(f.lostMajorityHeartbeatsRecently());
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
index 9a3380b..7a1334d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.server.impl;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
index 208f34a..777918c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
@@ -18,10 +18,8 @@
package org.apache.ratis.server.impl;
-import static org.apache.ratis.server.impl.RaftServerMetrics.*;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.*;
import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.ClientInvocationId;
@@ -29,6 +27,7 @@ import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.util.TimeDuration;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -44,19 +43,14 @@ public class TestRetryCacheMetrics {
@BeforeClass
public static void setUp() {
- RaftServerImpl raftServer = mock(RaftServerImpl.class);
-
RaftGroupId raftGroupId = RaftGroupId.randomId();
RaftPeerId raftPeerId = RaftPeerId.valueOf("TestId");
RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId
.valueOf(raftPeerId, raftGroupId);
- when(raftServer.getMemberId()).thenReturn(raftGroupMemberId);
-
retryCache = new RetryCache(TimeDuration.valueOf(60, TimeUnit.SECONDS));
- when(raftServer.getRetryCache()).thenReturn(retryCache);
- RaftServerMetrics raftServerMetrics = RaftServerMetrics
- .getRaftServerMetrics(raftServer);
+ final RaftServerMetrics raftServerMetrics = RaftServerMetrics.computeIfAbsentRaftServerMetrics(
+ raftGroupMemberId, () -> null, () -> retryCache);
ratisMetricRegistry = raftServerMetrics.getRegistry();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 03a186b..89ab2da 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -37,7 +37,6 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageDirectory;
@@ -76,9 +75,9 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
private static final int SNAPSHOT_TRIGGER_THRESHOLD = 10;
public static List<File> getSnapshotFiles(MiniRaftCluster cluster, long startIndex, long endIndex) {
- final RaftServerImpl leader = cluster.getLeader();
+ final RaftServer.Division leader = cluster.getLeader();
final SimpleStateMachineStorage storage = SimpleStateMachine4Testing.get(leader).getStateMachineStorage();
- final long term = leader.getState().getCurrentTerm();
+ final long term = RaftServerTestUtil.getCurrentTerm(leader);
return LongStream.range(startIndex, endIndex)
.mapToObj(i -> storage.getSnapshotFile(term, i))
.collect(Collectors.toList());
@@ -86,8 +85,8 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
- final RaftLog leaderLog = leader.getState().getLog();
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
final LogEntryProto e = leaderLog.get(lastIndex);
Assert.assertTrue(e.hasMetadataEntry());
@@ -151,7 +150,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
}
- final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+ final long nextIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
// wait for the snapshot to be done
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
@@ -199,10 +198,9 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
// wait for the snapshot to be done
- RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
- .getStorage().getStorageDir();
+ RaftStorageDirectory storageDirectory = RaftServerTestUtil.getRaftStorage(cluster.getLeader()).getStorageDir();
- final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+ final long nextIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
JavaUtils.attemptRepeatedly(() -> {
@@ -239,9 +237,9 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
cluster.setConfiguration(change.allPeersInNewConf);
for (String newPeer : newPeers) {
- RaftServerImpl s = cluster.getRaftServerImpl(RaftPeerId.valueOf(newPeer));
+ final RaftServer.Division s = cluster.getDivision(RaftPeerId.valueOf(newPeer));
SimpleStateMachine4Testing simpleStateMachine = SimpleStateMachine4Testing.get(s);
- Assert.assertTrue(simpleStateMachine.getLifeCycleState() == LifeCycle.State.RUNNING);
+ Assert.assertSame(LifeCycle.State.RUNNING, simpleStateMachine.getLifeCycleState());
}
// Verify installSnapshot counter on leader before restart.
@@ -262,18 +260,19 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
}
- protected void verifyInstallSnapshotMetric(RaftServerImpl leader) {
- Counter installSnapshotCounter = leader.getRaftServerMetrics().getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC);
+ protected void verifyInstallSnapshotMetric(RaftServer.Division leader) {
+ final Counter installSnapshotCounter = RaftServerTestUtil.getRaftServerMetrics(leader)
+ .getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC);
Assert.assertNotNull(installSnapshotCounter);
Assert.assertTrue(installSnapshotCounter.getCount() >= 1);
}
- private static void verifyTakeSnapshotMetric(RaftServerImpl leader) {
+ private static void verifyTakeSnapshotMetric(RaftServer.Division leader) {
Timer timer = getTakeSnapshotTimer(leader);
Assert.assertTrue(timer.getCount() > 0);
}
- private static Timer getTakeSnapshotTimer(RaftServerImpl leader) {
+ private static Timer getTakeSnapshotTimer(RaftServer.Division leader) {
MetricRegistryInfo info = new MetricRegistryInfo(leader.getMemberId().toString(),
RATIS_APPLICATION_NAME_METRICS,
RATIS_STATEMACHINE_METRICS, RATIS_STATEMACHINE_METRICS_DESC);
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
index 5be8c78..2888fe3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerSlownessDetection.java
@@ -23,8 +23,7 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerMetrics;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -85,13 +84,12 @@ public class TestRaftServerSlownessDetection extends BaseTest {
@Test
public void testSlownessDetection() throws Exception {
- RaftServerImpl leaderServer = RaftTestUtil.waitForLeader(cluster);
+ RaftServer.Division leaderServer = RaftTestUtil.waitForLeader(cluster);
long slownessTimeout = RaftServerConfigKeys.Rpc
.slownessTimeout(cluster.getProperties()).toIntExact(TimeUnit.MILLISECONDS);
- RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+ RaftServer.Division failedFollower = cluster.getFollowers().get(0);
- RatisMetricRegistry ratisMetricRegistry =
- RaftServerMetrics.getRaftServerMetrics(leaderServer).getRegistry();
+ final RatisMetricRegistry ratisMetricRegistry = RaftServerTestUtil.getRaftServerMetrics(leaderServer).getRegistry();
SortedMap<String, Gauge> heartbeatElapsedTimeGauges =
ratisMetricRegistry.getGauges((s, metric) ->
s.contains("lastHeartbeatElapsedTime"));
@@ -110,7 +108,7 @@ public class TestRaftServerSlownessDetection extends BaseTest {
Assert.assertTrue(followerHeartBeatElapsedMetricNew > followerHeartBeatElapsedMetric);
// Followers should not get any failed not notification
- for (RaftServerImpl followerServer : cluster.getFollowers()) {
+ for (RaftServer.Division followerServer : cluster.getFollowers()) {
Assert.assertNull(SimpleStateMachine4Testing.get(followerServer).getSlownessInfo());
}
// the leader should get notification that the follower has failed now
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index fda250c..da05fe0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -17,14 +17,14 @@
*/
package org.apache.ratis.grpc;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_READ_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_STALE_READ_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_WATCH_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RAFT_CLIENT_WRITE_REQUEST;
-import static org.apache.ratis.server.impl.RaftServerMetrics.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
-import static org.apache.ratis.server.impl.RaftServerMetrics.REQUEST_BYTE_SIZE;
-import static org.apache.ratis.server.impl.RaftServerMetrics.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
-import static org.apache.ratis.server.impl.RaftServerMetrics.RESOURCE_LIMIT_HIT_COUNTER;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_READ_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_STALE_READ_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_WATCH_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RAFT_CLIENT_WRITE_REQUEST;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.REQUEST_QUEUE_LIMIT_HIT_COUNTER;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.REQUEST_BYTE_SIZE;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER;
+import static org.apache.ratis.server.metrics.RaftServerMetrics.RESOURCE_LIMIT_HIT_COUNTER;
import com.codahale.metrics.Gauge;
import org.apache.commons.lang3.RandomStringUtils;
@@ -51,7 +51,7 @@ import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerMetrics;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -107,7 +107,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId)));
ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null);
// Close the server rpc for leader so that new raft server can be bound to it.
- cluster.getLeader().getServerRpc().close();
+ RaftServerTestUtil.getServerRpc(cluster.getLeader()).close();
// Create a raft server proxy with server rpc bound to same address as
// the leader. This step would fail as the raft storage has been locked by
@@ -229,9 +229,8 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
client.async().send(new SimpleMessage(message));
- final SortedMap<String, Gauge> gaugeMap =
- cluster.getLeader().getRaftServerMetrics().getRegistry()
- .getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
+ final SortedMap<String, Gauge> gaugeMap = RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
+ .getRegistry().getGauges((s, metric) -> s.contains(REQUEST_BYTE_SIZE));
RaftTestUtil.waitFor(() -> (int) gaugeMap.get(gaugeMap.firstKey()).getValue() == message.length(),
300, 5000);
@@ -243,8 +242,8 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
}
// Because we have passed 11 requests, and the element queue size is 10.
- RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics().getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER)
- .getCount() == 1, 300, 5000);
+ RaftTestUtil.waitFor(() -> RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
+ .getCounter(REQUEST_QUEUE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
stateMachine.unblockFlushStateMachineData();
@@ -255,10 +254,10 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
client.async().send(new SimpleMessage(RandomStringUtils.random(120, true, false)));
clients.add(client);
- RaftTestUtil.waitFor(() -> cluster.getLeader().getRaftServerMetrics()
+ RaftTestUtil.waitFor(() -> RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
.getCounter(REQUEST_BYTE_SIZE_LIMIT_HIT_COUNTER).getCount() == 1, 300, 5000);
- Assert.assertEquals(2, cluster.getLeader().getRaftServerMetrics()
+ Assert.assertEquals(2, RaftServerTestUtil.getRaftServerMetrics(cluster.getLeader())
.getCounter(RESOURCE_LIMIT_HIT_COUNTER).getCount());
} finally {
for (RaftClient client : clients) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
index c5ae2ee..c128d28 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftSnapshotWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -23,7 +23,7 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.metrics.MetricRegistries;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.RaftSnapshotBaseTest;
import org.junit.Assert;
@@ -36,7 +36,7 @@ public class TestRaftSnapshotWithGrpc extends RaftSnapshotBaseTest {
}
@Override
- protected void verifyInstallSnapshotMetric(RaftServerImpl leader) {
+ protected void verifyInstallSnapshotMetric(RaftServer.Division leader) {
MetricRegistryInfo info = new MetricRegistryInfo(leader.getMemberId().toString(),
"ratis_grpc", "log_appender", "Metrics for Ratis Grpc Log Appender");
Optional<RatisMetricRegistry> metricRegistry = MetricRegistries.global().get(info);