You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/08 05:27:54 UTC
[incubator-ratis] branch master updated: RATIS-1210. Expose
RetryCache. (#334)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6c7f1d1 RATIS-1210. Expose RetryCache. (#334)
6c7f1d1 is described below
commit 6c7f1d1c31c3c01e7d09eddb64bd9736af9ba18b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 8 13:27:42 2020 +0800
RATIS-1210. Expose RetryCache. (#334)
* RATIS-1210. Expose RetryCache.
* Fix test failure.
---
.../java/org/apache/ratis/server/RaftServer.java | 3 +
.../apache/ratis/server/RaftServerConfigKeys.java | 10 ++
.../java/org/apache/ratis/server/RetryCache.java | 70 ++++++++++++
.../apache/ratis/server/impl/RaftServerImpl.java | 48 ++++-----
.../impl/{RetryCache.java => RetryCacheImpl.java} | 118 +++++++++++++++------
.../ratis/server/metrics/RaftServerMetrics.java | 24 +++--
.../java/org/apache/ratis/RetryCacheTests.java | 6 +-
.../ratis/server/impl/RaftServerTestUtil.java | 30 ------
.../impl/RaftStateMachineExceptionTests.java | 13 ++-
.../ratis/server/impl/RetryCacheTestUtil.java | 45 +++++++-
.../ratis/server/impl/TestRetryCacheMetrics.java | 23 ++--
.../ratis/datastream/DataStreamBaseTest.java | 6 ++
.../apache/ratis/grpc/TestRetryCacheWithGrpc.java | 4 +-
.../raftlog/segmented/TestSegmentedRaftLog.java | 11 +-
14 files changed, 276 insertions(+), 135 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 4ea4776..5c309a1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -94,6 +94,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the storage of this division. */
RaftStorage getRaftStorage();
+ /** @return the retry cache of this division. */
+ RetryCache getRetryCache();
+
/** @return the data stream map of this division. */
DataStreamMap getDataStreamMap();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 0b5a84f..c2b38e7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -501,6 +501,16 @@ public interface RaftServerConfigKeys {
static void setExpiryTime(RaftProperties properties, TimeDuration expiryTime) {
setTimeDuration(properties::setTimeDuration, EXPIRY_TIME_KEY, expiryTime);
}
+
+ String STATISTICS_EXPIRY_TIME_KEY = PREFIX + ".statistics.expirytime";
+ TimeDuration STATISTICS_EXPIRY_TIME_DEFAULT = TimeDuration.valueOf(100, TimeUnit.MICROSECONDS);
+ static TimeDuration statisticsExpiryTime(RaftProperties properties) {
+ return getTimeDuration(properties.getTimeDuration(STATISTICS_EXPIRY_TIME_DEFAULT.getUnit()),
+ STATISTICS_EXPIRY_TIME_KEY, STATISTICS_EXPIRY_TIME_DEFAULT, getDefaultLog());
+ }
+ static void setStatisticsExpiryTime(RaftProperties properties, TimeDuration expiryTime) {
+ setTimeDuration(properties::setTimeDuration, STATISTICS_EXPIRY_TIME_KEY, expiryTime);
+ }
}
interface Notification {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/RetryCache.java
new file mode 100644
index 0000000..d06cd40
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RetryCache.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * For a server to store {@link RaftClientReply} futures in order to handle client retires.
+ */
+public interface RetryCache extends Closeable {
+ Logger LOG = LoggerFactory.getLogger(RetryCache.class);
+
+ /**
+ * Entry of a {@link RetryCache},
+ * where the key is a {@link ClientInvocationId}
+ * and the value is a {@link CompletableFuture} of a {@link RaftClientReply}.
+ */
+ interface Entry {
+ /** @return the cached key. */
+ ClientInvocationId getKey();
+
+ /** @return the cached value. */
+ CompletableFuture<RaftClientReply> getReplyFuture();
+ }
+
+ /** The statistics of a {@link RetryCache}. */
+ interface Statistics {
+ /** @return the approximate number of entries in the cache. */
+ long size();
+
+ /** @return the number of cache hit, where a cache hit is a cache lookup returned a cached value. */
+ long hitCount();
+
+ /** @return the ratio of hit count to request count. */
+ double hitRate();
+
+ /** @return the number of cache miss, where a cache miss is a cache lookup failed to return a cached value. */
+ long missCount();
+
+ /** @return the ratio of miss count to request count. */
+ double missRate();
+ }
+
+ /** @return the cached entry for the given key if it exists; otherwise, return null. */
+ Entry getIfPresent(ClientInvocationId key);
+
+ /** @return the statistics of this {@link RetryCache}. */
+ Statistics getStatistics();
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 254087f..98e94b5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -40,6 +40,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
@@ -52,7 +53,6 @@ import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.ratis.util.*;
@@ -150,7 +150,7 @@ class RaftServerImpl implements RaftServer.Division,
private final MemoizedSupplier<RaftClient> raftClient;
- private final RetryCache retryCache;
+ private final RetryCacheImpl retryCache;
private final CommitInfoCache commitInfoCache = new CommitInfoCache();
private final RaftServerJmxAdapter jmxAdapter;
@@ -182,7 +182,7 @@ class RaftServerImpl implements RaftServer.Division,
this.proxy = proxy;
this.state = new ServerState(id, group, properties, this, stateMachine);
- this.retryCache = initRetryCache(properties);
+ this.retryCache = new RetryCacheImpl(properties);
this.inProgressInstallSnapshotRequest = new AtomicReference<>(null);
this.dataStreamMap = new DataStreamMapImpl(id);
@@ -190,7 +190,7 @@ class RaftServerImpl implements RaftServer.Division,
this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
getMemberId(), state::getLastLeaderElapsedTimeMs);
this.raftServerMetrics = RaftServerMetrics.computeIfAbsentRaftServerMetrics(
- getMemberId(), () -> commitInfoCache::get, () -> retryCache);
+ getMemberId(), () -> commitInfoCache::get, retryCache::getStatistics);
this.startComplete = new AtomicBoolean(false);
@@ -208,11 +208,6 @@ class RaftServerImpl implements RaftServer.Division,
return divisionProperties;
}
- private RetryCache initRetryCache(RaftProperties prop) {
- final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop);
- return new RetryCache(expireTime);
- }
-
LogAppender newLogAppender(LeaderState leaderState, FollowerInfo f) {
return getRaftServer().getFactory().newLogAppender(this, leaderState, f);
}
@@ -260,8 +255,8 @@ class RaftServerImpl implements RaftServer.Division,
return raftClient.get();
}
- @VisibleForTesting
- public RetryCache getRetryCache() {
+ @Override
+ public RetryCacheImpl getRetryCache() {
return retryCache;
}
@@ -591,27 +586,26 @@ class RaftServerImpl implements RaftServer.Division,
/**
* @return null if the server is in leader state.
*/
- private CompletableFuture<RaftClientReply> checkLeaderState(
- RaftClientRequest request, RetryCache.CacheEntry entry) {
+ private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
try {
assertGroup(request.getRequestorId(), request.getRaftGroupId());
} catch (GroupMismatchException e) {
- return RetryCache.failWithException(e, entry);
+ return RetryCacheImpl.failWithException(e, entry);
}
if (!getInfo().isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, exception);
- return RetryCache.failWithReply(reply, entry);
+ return RetryCacheImpl.failWithReply(reply, entry);
}
if (!getInfo().isLeaderReady()) {
- final RetryCache.CacheEntry cacheEntry = retryCache.get(ClientInvocationId.valueOf(request));
+ final CacheEntry cacheEntry = retryCache.getIfPresent(ClientInvocationId.valueOf(request));
if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
return cacheEntry.getReplyFuture();
}
final LeaderNotReadyException lnre = new LeaderNotReadyException(getMemberId());
final RaftClientReply reply = newExceptionReply(request, lnre);
- return RetryCache.failWithReply(reply, entry);
+ return RetryCacheImpl.failWithReply(reply, entry);
}
return null;
}
@@ -650,8 +644,7 @@ class RaftServerImpl implements RaftServer.Division,
* Handle a normal update request from client.
*/
private CompletableFuture<RaftClientReply> appendTransaction(
- RaftClientRequest request, TransactionContext context,
- RetryCache.CacheEntry cacheEntry) throws IOException {
+ RaftClientRequest request, TransactionContext context, CacheEntry cacheEntry) throws IOException {
assertLifeCycleState(LifeCycle.States.RUNNING);
CompletableFuture<RaftClientReply> reply;
@@ -716,7 +709,7 @@ class RaftServerImpl implements RaftServer.Division,
LOG.debug("{}: receive client request({})", getMemberId(), request);
final Optional<Timer> timer = Optional.ofNullable(raftServerMetrics.getClientRequestTimer(request.getType()));
- CompletableFuture<RaftClientReply> replyFuture;
+ final CompletableFuture<RaftClientReply> replyFuture;
if (request.is(TypeCase.STALEREAD)) {
replyFuture = staleReadAsync(request);
@@ -750,14 +743,13 @@ class RaftServerImpl implements RaftServer.Division,
replyFuture = streamAsync(request);
} else {
// query the retry cache
- final RetryCache.CacheQueryResult previousResult = retryCache.queryCache(ClientInvocationId.valueOf(request));
- if (previousResult.isRetry()) {
+ final RetryCacheImpl.CacheQueryResult queryResult = retryCache.queryCache(ClientInvocationId.valueOf(request));
+ final CacheEntry cacheEntry = queryResult.getEntry();
+ if (queryResult.isRetry()) {
// if the previous attempt is still pending or it succeeded, return its
// future
- replyFuture = previousResult.getEntry().getReplyFuture();
+ replyFuture = cacheEntry.getReplyFuture();
} else {
- final RetryCache.CacheEntry cacheEntry = previousResult.getEntry();
-
// TODO: this client request will not be added to pending requests until
// later which means that any failure in between will leave partial state in
// the state machine. We should call cancelTransaction() for failed requests
@@ -1506,13 +1498,13 @@ class RaftServerImpl implements RaftServer.Division,
Preconditions.assertTrue(logEntry.hasStateMachineLogEntry());
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
// update the retry cache
- final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
+ final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
if (getInfo().isLeader()) {
Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(),
"retry cache entry should be pending: %s", cacheEntry);
}
if (cacheEntry.isFailed()) {
- retryCache.refreshEntry(new RetryCache.CacheEntry(cacheEntry.getKey()));
+ retryCache.refreshEntry(new CacheEntry(cacheEntry.getKey()));
}
final long logIndex = logEntry.getIndex();
@@ -1582,7 +1574,7 @@ class RaftServerImpl implements RaftServer.Division,
void notifyTruncatedLogEntry(LogEntryProto logEntry) {
if (logEntry.hasStateMachineLogEntry()) {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
- final RetryCache.CacheEntry cacheEntry = getRetryCache().get(invocationId);
+ final CacheEntry cacheEntry = getRetryCache().getIfPresent(invocationId);
if (cacheEntry != null) {
cacheEntry.failWithReply(newReplyBuilder(invocationId, logEntry.getIndex())
.setException(generateNotLeaderException())
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
similarity index 66%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
index 4cbcaf9..5f7c4d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java
@@ -17,32 +17,27 @@
*/
package org.apache.ratis.server.impl;
-import java.io.Closeable;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.RetryCache;
import org.apache.ratis.thirdparty.com.google.common.cache.Cache;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheBuilder;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheStats;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.ratis.util.Timestamp;
-public class RetryCache implements Closeable {
- static final Logger LOG = LoggerFactory.getLogger(RetryCache.class);
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
- /**
- * CacheEntry is tracked using unique client ID and callId of the RPC request
- */
- @VisibleForTesting
- public static class CacheEntry {
+class RetryCacheImpl implements RetryCache {
+ static class CacheEntry implements Entry {
private final ClientInvocationId key;
- private final CompletableFuture<RaftClientReply> replyFuture =
- new CompletableFuture<>();
+ private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
/**
* "failed" means we failed to commit the request into the raft group, or
@@ -89,11 +84,13 @@ public class RetryCache implements Closeable {
replyFuture.completeExceptionally(t);
}
- CompletableFuture<RaftClientReply> getReplyFuture() {
+ @Override
+ public CompletableFuture<RaftClientReply> getReplyFuture() {
return replyFuture;
}
- ClientInvocationId getKey() {
+ @Override
+ public ClientInvocationId getKey() {
return key;
}
}
@@ -116,16 +113,72 @@ public class RetryCache implements Closeable {
}
}
+ class StatisticsImpl implements Statistics {
+ private final long size;
+ private final CacheStats cacheStats;
+ private final Timestamp creation = Timestamp.currentTime();
+
+ StatisticsImpl(Cache<?, ?> cache) {
+ this.size = cache.size();
+ this.cacheStats = cache.stats();
+ System.out.println("new StatisticsImpl " + this);
+ }
+
+ boolean isExpired() {
+ return Optional.ofNullable(statisticsExpiryTime).map(t -> creation.elapsedTime().compareTo(t) > 0).orElse(true);
+ }
+
+ @Override
+ public long size() {
+ return size;
+ }
+
+ @Override
+ public long hitCount() {
+ return cacheStats.hitCount();
+ }
+
+ @Override
+ public double hitRate() {
+ return cacheStats.hitRate();
+ }
+
+ @Override
+ public long missCount() {
+ return cacheStats.missCount();
+ }
+
+ @Override
+ public double missRate() {
+ return cacheStats.missRate();
+ }
+
+ @Override
+ public String toString() {
+ return creation + ":size=" + size + "," + cacheStats;
+ }
+ }
+
private final Cache<ClientInvocationId, CacheEntry> cache;
+ /** Cache statistics to reduce the number of expensive statistics computations. */
+ private final AtomicReference<StatisticsImpl> statistics = new AtomicReference<>();
+ private final TimeDuration statisticsExpiryTime;
+
+ RetryCacheImpl(RaftProperties properties) {
+ this(RaftServerConfigKeys.RetryCache.expiryTime(properties),
+ RaftServerConfigKeys.RetryCache.statisticsExpiryTime(properties));
+ }
/**
- * @param expirationTime time for an entry to expire in milliseconds
+ * @param cacheExpiryTime time for a cache entry to expire.
+ * @param statisticsExpiryTime time for a {@link RetryCache.Statistics} object to expire.
*/
- RetryCache(TimeDuration expirationTime) {
- cache = CacheBuilder.newBuilder()
+ RetryCacheImpl(TimeDuration cacheExpiryTime, TimeDuration statisticsExpiryTime) {
+ this.cache = CacheBuilder.newBuilder()
.recordStats()
- .expireAfterWrite(expirationTime.getDuration(), expirationTime.getUnit())
+ .expireAfterWrite(cacheExpiryTime.getDuration(), cacheExpiryTime.getUnit())
.build();
+ this.statisticsExpiryTime = statisticsExpiryTime;
}
CacheEntry getOrCreateEntry(ClientInvocationId key) {
@@ -139,13 +192,13 @@ public class RetryCache implements Closeable {
}
CacheEntry refreshEntry(CacheEntry newEntry) {
- cache.put(newEntry.key, newEntry);
+ cache.put(newEntry.getKey(), newEntry);
return newEntry;
}
CacheQueryResult queryCache(ClientInvocationId key) {
final CacheEntry newEntry = new CacheEntry(key);
- CacheEntry cacheEntry;
+ final CacheEntry cacheEntry;
try {
cacheEntry = cache.get(key, () -> newEntry);
} catch (ExecutionException e) {
@@ -165,7 +218,7 @@ public class RetryCache implements Closeable {
// need to recheck, since there may be other retry attempts being
// processed at the same time. The recheck+replacement should be protected
// by lock.
- CacheEntry currentEntry = cache.getIfPresent(key);
+ final CacheEntry currentEntry = cache.getIfPresent(key);
if (currentEntry == cacheEntry || currentEntry == null) {
// if the failed entry has not got replaced by another retry, or the
// failed entry got invalidated, we add a new cache entry
@@ -176,17 +229,13 @@ public class RetryCache implements Closeable {
}
}
- @VisibleForTesting
- public long size() {
- return cache.size();
- }
-
- public CacheStats stats() {
- return cache.stats();
+ @Override
+ public Statistics getStatistics() {
+ return statistics.updateAndGet(old -> old == null || old.isExpired()? new StatisticsImpl(cache): old);
}
- @VisibleForTesting
- CacheEntry get(ClientInvocationId key) {
+ @Override
+ public CacheEntry getIfPresent(ClientInvocationId key) {
return cache.getIfPresent(key);
}
@@ -194,6 +243,7 @@ public class RetryCache implements Closeable {
public synchronized void close() {
if (cache != null) {
cache.invalidateAll();
+ statistics.set(null);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
index 4be970a..87ae2df 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftServerMetrics.java
@@ -40,7 +40,7 @@ import org.apache.ratis.protocol.RaftClientRequest.Type;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.metrics.RatisMetrics;
-import org.apache.ratis.server.impl.RetryCache;
+import org.apache.ratis.server.RetryCache;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Preconditions;
@@ -95,9 +95,10 @@ public final class RaftServerMetrics extends RatisMetrics {
}
public static RaftServerMetrics computeIfAbsentRaftServerMetrics(RaftGroupMemberId serverId,
- Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache, Supplier<RetryCache> retryCache) {
+ Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+ Supplier<RetryCache.Statistics> retryCacheStatistics) {
return METRICS.computeIfAbsent(serverId,
- key -> new RaftServerMetrics(serverId, commitInfoCache, retryCache));
+ key -> new RaftServerMetrics(serverId, commitInfoCache, retryCacheStatistics));
}
public static void removeRaftServerMetrics(RaftGroupMemberId serverId) {
@@ -105,11 +106,12 @@ public final class RaftServerMetrics extends RatisMetrics {
}
public RaftServerMetrics(RaftGroupMemberId serverId,
- Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache, Supplier<RetryCache> retryCache) {
+ Supplier<Function<RaftPeerId, CommitInfoProto>> commitInfoCache,
+ Supplier<RetryCache.Statistics> retryCacheStatistics) {
this.registry = getMetricRegistryForRaftServer(serverId.toString());
this.commitInfoCache = commitInfoCache;
addPeerCommitIndexGauge(serverId.getPeerId());
- addRetryCacheMetric(retryCache);
+ addRetryCacheMetric(retryCacheStatistics);
}
private RatisMetricRegistry getMetricRegistryForRaftServer(String serverId) {
@@ -118,12 +120,12 @@ public final class RaftServerMetrics extends RatisMetrics {
RATIS_SERVER_METRICS_DESC));
}
- private void addRetryCacheMetric(Supplier<RetryCache> retryCache) {
- registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> retryCache.get().size());
- registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC , () -> () -> retryCache.get().stats().hitCount());
- registry.gauge(RETRY_CACHE_HIT_RATE_METRIC , () -> () -> retryCache.get().stats().hitRate());
- registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC , () -> () -> retryCache.get().stats().missCount());
- registry.gauge(RETRY_CACHE_MISS_RATE_METRIC , () -> () -> retryCache.get().stats().missRate());
+ private void addRetryCacheMetric(Supplier<RetryCache.Statistics> retryCacheStatistics) {
+ registry.gauge(RETRY_CACHE_ENTRY_COUNT_METRIC, () -> () -> retryCacheStatistics.get().size());
+ registry.gauge(RETRY_CACHE_HIT_COUNT_METRIC , () -> () -> retryCacheStatistics.get().hitCount());
+ registry.gauge(RETRY_CACHE_HIT_RATE_METRIC , () -> () -> retryCacheStatistics.get().hitRate());
+ registry.gauge(RETRY_CACHE_MISS_COUNT_METRIC , () -> () -> retryCacheStatistics.get().missCount());
+ registry.gauge(RETRY_CACHE_MISS_RATE_METRIC , () -> () -> retryCacheStatistics.get().missRate());
}
/**
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 0fde62c..2799d3d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -29,7 +29,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.util.JavaUtils;
@@ -97,8 +97,8 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
if (server.getInfo().getLastAppliedIndex() < leaderApplied) {
Thread.sleep(1000);
}
- Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
- Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId));
+ Assert.assertEquals(2, server.getRetryCache().getStatistics().size());
+ Assert.assertNotNull(RetryCacheTestUtil.get(server, clientId, callId));
// make sure there is only one log entry committed
Assert.assertEquals(1, count(server.getRaftLog(), oldLastApplied + 1));
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 8e09b8d..df23a56 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -19,9 +19,6 @@ package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
@@ -48,11 +45,6 @@ import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class RaftServerTestUtil {
static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
@@ -113,18 +105,6 @@ public class RaftServerTestUtil {
return ((RaftServerImpl)server).getState().getLatestInstalledSnapshotIndex();
}
- public static long getRetryCacheSize(RaftServer.Division server) {
- return ((RaftServerImpl)server).getRetryCache().size();
- }
-
- public static RetryCache.CacheEntry getRetryEntry(RaftServer.Division server, ClientId clientId, long callId) {
- return ((RaftServerImpl)server).getRetryCache().get(ClientInvocationId.valueOf(clientId, callId));
- }
-
- public static boolean isRetryCacheEntryFailed(RetryCache.CacheEntry entry) {
- return entry.isFailed();
- }
-
static ServerState getState(RaftServer.Division server) {
return ((RaftServerImpl)server).getState();
}
@@ -187,14 +167,4 @@ public class RaftServerTestUtil {
server::submitUpdateCommitEvent,
storage, -1, properties);
}
-
- public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, RetryCache retryCache,
- RaftStorage storage, RaftProperties properties) {
- final RaftServerImpl server = mock(RaftServerImpl.class);
- when(server.getRetryCache()).thenReturn(retryCache);
- when(server.getMemberId()).thenReturn(memberId);
- doCallRealMethod().when(server).notifyTruncatedLogEntry(any(RaftProtos.LogEntryProto.class));
- return new SegmentedRaftLog(memberId, server, null,
- server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent, storage, -1, properties);
- }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index ec79ff4..91ce6c1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -27,6 +27,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -129,8 +130,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
LOG.info("check server " + server.getId());
JavaUtils.attemptRepeatedly(() -> {
- Assert.assertNotNull(
- RaftServerTestUtil.getRetryEntry(server, client.getId(), callId));
+ Assert.assertNotNull(RetryCacheTestUtil.get(server, client.getId(), callId));
return null;
}, 5, BaseTest.ONE_SECOND, "GetRetryEntry", LOG);
@@ -160,9 +160,9 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
RaftClientReply reply = rpc.sendRequest(r);
Objects.requireNonNull(reply.getStateMachineException());
- final RetryCache.CacheEntry oldEntry = RaftServerTestUtil.getRetryEntry(oldLeader, client.getId(), callId);
+ final RetryCache.Entry oldEntry = RetryCacheTestUtil.get(oldLeader, client.getId(), callId);
Assert.assertNotNull(oldEntry);
- Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
+ Assert.assertTrue(RetryCacheTestUtil.isFailed(oldEntry));
// At this point of time the old leader would have stepped down. wait for leader election to complete
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
@@ -171,10 +171,9 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
reply = rpc.sendRequest(r);
Objects.requireNonNull(reply.getStateMachineException());
- RetryCache.CacheEntry currentEntry = RaftServerTestUtil.getRetryEntry(
- leader, client.getId(), callId);
+ final RetryCache.Entry currentEntry = RetryCacheTestUtil.get(leader, client.getId(), callId);
Assert.assertNotNull(currentEntry);
- Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(currentEntry));
+ Assert.assertTrue(RetryCacheTestUtil.isFailed(currentEntry));
Assert.assertNotEquals(oldEntry, currentEntry);
failPreAppend = false;
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index 70308ef..5b1c04b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -17,34 +17,71 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RetryCache;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
+import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import java.util.concurrent.TimeUnit;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class RetryCacheTestUtil {
public static RetryCache createRetryCache(){
- return new RetryCache(TimeDuration.valueOf(60, TimeUnit.SECONDS));
+ return new RetryCacheImpl(TimeDuration.valueOf(60, TimeUnit.SECONDS), null);
}
public static void createEntry(RetryCache cache, LogEntryProto logEntry){
if(logEntry.hasStateMachineLogEntry()) {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
- cache.getOrCreateEntry(invocationId);
+ getOrCreateEntry(cache, invocationId);
}
}
+ public static boolean isFailed(RetryCache.Entry entry) {
+ return ((RetryCacheImpl.CacheEntry)entry).isFailed();
+ }
+
public static void assertFailure(RetryCache cache, LogEntryProto logEntry, boolean isFailed) {
if(logEntry.hasStateMachineLogEntry()) {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
- Assert.assertEquals(isFailed, cache.get(invocationId).isFailed());
+ Assert.assertEquals(isFailed, get(cache, invocationId).isFailed());
}
}
public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
- ((RaftServerImpl)server).getRetryCache().getOrCreateEntry(invocationId);
+ getOrCreateEntry(server.getRetryCache(), invocationId);
+ }
+
+ private static RetryCache.Entry getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) {
+ return ((RetryCacheImpl)cache).getOrCreateEntry(invocationId);
+ }
+
+ public static RetryCache.Entry get(RaftServer.Division server, ClientId clientId, long callId) {
+ return get(server.getRetryCache(), ClientInvocationId.valueOf(clientId, callId));
+ }
+
+ private static RetryCacheImpl.CacheEntry get(RetryCache cache, ClientInvocationId invocationId) {
+ return ((RetryCacheImpl)cache).getIfPresent(invocationId);
+ }
+
+ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, RetryCache retryCache,
+ RaftStorage storage, RaftProperties properties) {
+ final RaftServerImpl server = mock(RaftServerImpl.class);
+ when(server.getRetryCache()).thenReturn((RetryCacheImpl) retryCache);
+ when(server.getMemberId()).thenReturn(memberId);
+ doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
+ return new SegmentedRaftLog(memberId, server, null,
+ server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent, storage, -1, properties);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
index c3540e7..a6d06f6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRetryCacheMetrics.java
@@ -21,26 +21,27 @@ package org.apache.ratis.server.impl;
import static org.apache.ratis.server.metrics.RaftServerMetrics.*;
import static org.junit.Assert.assertEquals;
+import com.codahale.metrics.Gauge;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.RaftServerMetrics;
-import org.apache.ratis.util.TimeDuration;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.util.concurrent.TimeUnit;
+import java.util.Map;
/**
* Test for metrics of retry cache.
*/
public class TestRetryCacheMetrics {
private static RatisMetricRegistry ratisMetricRegistry;
- private static RetryCache retryCache;
+ private static RetryCacheImpl retryCache;
@BeforeClass
public static void setUp() {
@@ -48,10 +49,10 @@ public class TestRetryCacheMetrics {
RaftPeerId raftPeerId = RaftPeerId.valueOf("TestId");
RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId
.valueOf(raftPeerId, raftGroupId);
- retryCache = new RetryCache(TimeDuration.valueOf(60, TimeUnit.SECONDS));
+ retryCache = new RetryCacheImpl(RaftServerConfigKeys.RetryCache.EXPIRY_TIME_DEFAULT, null);
final RaftServerMetrics raftServerMetrics = RaftServerMetrics.computeIfAbsentRaftServerMetrics(
- raftGroupMemberId, () -> null, () -> retryCache);
+ raftGroupMemberId, () -> null, retryCache::getStatistics);
ratisMetricRegistry = raftServerMetrics.getRegistry();
}
@@ -67,7 +68,7 @@ public class TestRetryCacheMetrics {
ClientId clientId = ClientId.randomId();
final ClientInvocationId key = ClientInvocationId.valueOf(clientId, 1);
- RetryCache.CacheEntry entry = new RetryCache.CacheEntry(key);
+ final RetryCacheImpl.CacheEntry entry = new RetryCacheImpl.CacheEntry(key);
retryCache.refreshEntry(entry);
checkEntryCount(1);
@@ -110,9 +111,11 @@ public class TestRetryCacheMetrics {
assertEquals(missRate.doubleValue(), rate, 0.0);
}
- private static void checkEntryCount(long count) {
- Long entryCount = (Long) ratisMetricRegistry.getGauges((s, metric) ->
- s.contains(RETRY_CACHE_ENTRY_COUNT_METRIC)).values().iterator().next().getValue();
- assertEquals(entryCount.longValue(), count);
+ private static void checkEntryCount(long expected) {
+ final Map<String, Gauge> map = ratisMetricRegistry.getGauges(
+ (s, metric) -> s.contains(RETRY_CACHE_ENTRY_COUNT_METRIC));
+ assertEquals(1, map.size());
+ final Map.Entry<String, Gauge> entry = map.entrySet().iterator().next();
+ assertEquals(expected, entry.getValue().getValue());
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index da44b68..32eb2de 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -21,6 +21,7 @@ import org.apache.ratis.BaseTest;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
@@ -135,6 +136,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public RetryCache getRetryCache() {
+ return null;
+ }
+
+ @Override
public DataStreamMap getDataStreamMap() {
return streamMap;
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
index 113386a..db92182 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRetryCacheWithGrpc.java
@@ -27,7 +27,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.junit.Test;
@@ -67,7 +67,7 @@ public class TestRetryCacheWithGrpc
CompletableFuture<RaftClientReply> f = leaderProxy.submitClientRequestAsync(r);
f.exceptionally(e -> {
if (e.getCause() instanceof ResourceUnavailableException) {
- RaftServerTestUtil.isRetryCacheEntryFailed(RaftServerTestUtil.getRetryEntry(leader, clientId, cid));
+ RetryCacheTestUtil.isFailed(RetryCacheTestUtil.get(leader, clientId, cid));
failure.set(true);
}
return null;
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index 4ed18be..dd525fc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -28,9 +28,8 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
-import org.apache.ratis.server.impl.RetryCache;
+import org.apache.ratis.server.RetryCache;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
@@ -447,8 +446,8 @@ public class TestSegmentedRaftLog extends BaseTest {
List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
- RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
- try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
+ final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
+ try (SegmentedRaftLog raftLog = RetryCacheTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry));
// append entries to the raftlog
@@ -463,7 +462,7 @@ public class TestSegmentedRaftLog extends BaseTest {
List<LogEntryProto> newEntries = prepareLogEntries(
Arrays.asList(r1, r2, r3), null);
- try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
+ try (SegmentedRaftLog raftLog = RetryCacheTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
LOG.info("newEntries[0] = {}", newEntries.get(0));
final int last = newEntries.size() - 1;
@@ -480,7 +479,7 @@ public class TestSegmentedRaftLog extends BaseTest {
}
// load the raftlog again and check
- try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
+ try (SegmentedRaftLog raftLog = RetryCacheTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);