You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/25 16:37:32 UTC
[incubator-ratis] branch master updated: RATIS-833. Add metrics for
raft log cache count and size in bytes (#78)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0cc6aca RATIS-833. Add metrics for raft log cache count and size in bytes (#78)
0cc6aca is described below
commit 0cc6aca13497f638a3ad1e2224fe614b750d2a5b
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Jun 26 00:37:23 2020 +0800
RATIS-833. Add metrics for raft log cache count and size in bytes (#78)
---
.../ratis/server/metrics/RaftLogMetrics.java | 25 ++++++++++++++++
.../raftlog/segmented/SegmentedRaftLogCache.java | 32 ++++++++++++++++++--
.../segmented/TestSegmentedRaftLogCache.java | 35 +++++++++++++++++++++-
3 files changed, 88 insertions(+), 4 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
index e8725ba..6a5de00 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -23,6 +23,7 @@ import java.util.Queue;
import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache;
import org.apache.ratis.util.DataQueue;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
@@ -55,6 +56,12 @@ public class RaftLogMetrics extends RatisMetrics {
public static final String RAFT_LOG_CACHE_MISS_COUNT = "cacheMissCount";
// Count of RaftLogCache Hits
public static final String RAFT_LOG_CACHE_HIT_COUNT = "cacheHitCount";
+ // Number of SegmentedRaftLogCache::closedSegments
+ public static final String RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM = "closedSegmentsNum";
+ // Size of SegmentedRaftLogCache::closedSegments in bytes
+ public static final String RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES = "closedSegmentsSizeInBytes";
+ // Size of SegmentedRaftLogCache::openSegment in bytes
+ public static final String RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES = "openSegmentSizeInBytes";
// Total time taken to append a raft log entry
public static final String RAFT_LOG_APPEND_ENTRY_LATENCY = "appendEntryLatency";
// Time spent by a Raft log operation in the queue.
@@ -103,6 +110,24 @@ public class RaftLogMetrics extends RatisMetrics {
});
}
+ public void addClosedSegmentsNum(SegmentedRaftLogCache cache) {
+ registry.gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM, () -> () -> {
+ return cache.getCachedSegmentNum();
+ });
+ }
+
+ public void addClosedSegmentsSizeInBytes(SegmentedRaftLogCache cache) {
+ registry.gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES, () -> () -> {
+ return cache.getClosedSegmentsSizeInBytes();
+ });
+ }
+
+ public void addOpenSegmentSizeInBytes(SegmentedRaftLogCache cache) {
+ registry.gauge(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES, () -> () -> {
+ return cache.getOpenSegmentSizeInBytes();
+ });
+ }
+
public void addLogWorkerQueueSizeGauge(Queue queue) {
registry.gauge(RAFT_LOG_WORKER_QUEUE_SIZE, () -> () -> queue.size());
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index bd2dff8..98d3099 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -45,7 +45,7 @@ import java.util.function.Consumer;
* caches all the segments in the memory. The cache is not thread-safe and
* requires external lock protection.
*/
-class SegmentedRaftLogCache {
+public class SegmentedRaftLogCache {
public static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogCache.class);
static final class SegmentFileInfo {
@@ -138,10 +138,12 @@ class SegmentedRaftLogCache {
private final Object name;
private final List<LogSegment> segments = new ArrayList<>();
private final AutoCloseableReadWriteLock lock;
+ private long sizeInBytes;
LogSegmentList(Object name) {
this.name = name;
this.lock = new AutoCloseableReadWriteLock(name);
+ this.sizeInBytes = 0;
}
AutoCloseableLock readLock() {
@@ -166,6 +168,10 @@ class SegmentedRaftLogCache {
}
}
+ long sizeInBytes() {
+ return sizeInBytes;
+ }
+
long countCached() {
try(AutoCloseableLock readLock = readLock()) {
return segments.stream().filter(LogSegment::hasCache).count();
@@ -226,6 +232,7 @@ class SegmentedRaftLogCache {
boolean add(LogSegment logSegment) {
try(AutoCloseableLock writeLock = writeLock()) {
+ sizeInBytes += logSegment.getTotalSize();
return segments.add(logSegment);
}
}
@@ -234,6 +241,7 @@ class SegmentedRaftLogCache {
try(AutoCloseableLock writeLock = writeLock()) {
segments.forEach(LogSegment::clear);
segments.clear();
+ sizeInBytes = 0;
}
}
@@ -253,6 +261,7 @@ class SegmentedRaftLogCache {
final SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
oldEnd, true, openSegment.getTotalSize(), openSegment.getEndIndex());
segments.add(openSegment);
+ sizeInBytes += openSegment.getTotalSize();
clearOpenSegment.run();
return new TruncationSegments(info, Collections.emptyList());
}
@@ -261,12 +270,15 @@ class SegmentedRaftLogCache {
final LogSegment ts = segments.get(segmentIndex);
final long oldEnd = ts.getEndIndex();
final List<SegmentFileInfo> list = new ArrayList<>();
+ sizeInBytes -= ts.getTotalSize();
ts.truncate(index);
+ sizeInBytes += ts.getTotalSize();
final int size = segments.size();
for(int i = size - 1;
i >= (ts.numOfEntries() == 0? segmentIndex: segmentIndex + 1);
i--) {
LogSegment s = segments.remove(i);
+ sizeInBytes -= s.getTotalSize();
final long endOfS = i == segmentIndex? oldEnd: s.getEndIndex();
s.clear();
list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, s.getEndIndex()));
@@ -292,10 +304,13 @@ class SegmentedRaftLogCache {
list.add(SegmentFileInfo.newClosedSegmentFileInfo(ls));
}
segments.clear();
+ sizeInBytes = 0;
} else if (segmentIndex >= 0) {
// we start to purge the closedSegments which do not overlap with index.
for (int i = segmentIndex - 1; i >= 0; i--) {
- list.add(SegmentFileInfo.newClosedSegmentFileInfo(segments.remove(i)));
+ LogSegment segment = segments.remove(i);
+ sizeInBytes -= segment.getTotalSize();
+ list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
}
} else {
throw new IllegalStateException("Unexpected gap in segments: binarySearch(" + index + ") returns "
@@ -334,6 +349,9 @@ class SegmentedRaftLogCache {
this.closedSegments = new LogSegmentList(name);
this.storage = storage;
this.raftLogMetrics = raftLogMetrics;
+ this.raftLogMetrics.addClosedSegmentsNum(this);
+ this.raftLogMetrics.addClosedSegmentsSizeInBytes(this);
+ this.raftLogMetrics.addOpenSegmentSizeInBytes(this);
this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
}
@@ -350,10 +368,18 @@ class SegmentedRaftLogCache {
}
}
- long getCachedSegmentNum() {
+ public long getCachedSegmentNum() {
return closedSegments.countCached();
}
+ public long getClosedSegmentsSizeInBytes() {
+ return closedSegments.sizeInBytes();
+ }
+
+ public long getOpenSegmentSizeInBytes() {
+ return openSegment.getTotalSize();
+ }
+
boolean shouldEvict() {
return closedSegments.countCached() > maxCachedSegments;
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index a42b43d..1d4f8ca 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -17,17 +17,22 @@
*/
package org.apache.ratis.server.raftlog.segmented;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.*;
+
import java.io.IOException;
import java.util.Iterator;
import java.util.stream.IntStream;
import org.apache.ratis.RaftTestUtil.SimpleOperation;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -36,10 +41,19 @@ public class TestSegmentedRaftLogCache {
private static final RaftProperties prop = new RaftProperties();
private SegmentedRaftLogCache cache;
+ private RaftLogMetrics raftLogMetrics;
+ private RatisMetricRegistry ratisMetricRegistry;
@Before
public void setup() {
- cache = new SegmentedRaftLogCache(null, null, prop);
+ raftLogMetrics = new RaftLogMetrics("test");
+ ratisMetricRegistry = raftLogMetrics.getRegistry();
+ cache = new SegmentedRaftLogCache(null, null, prop, raftLogMetrics);
+ }
+
+ @After
+ public void clear() {
+ raftLogMetrics.unregister();
}
private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
@@ -301,4 +315,23 @@ public class TestSegmentedRaftLogCache {
Iterator<TermIndex> iterator = cache.iterator(300);
Assert.assertFalse(iterator.hasNext());
}
+
+ @Test
+ public void testCacheMetric() {
+ cache.addSegment(prepareLogSegment(0, 99, false));
+ cache.addSegment(prepareLogSegment(100, 200, false));
+ cache.addSegment(prepareLogSegment(201, 300, true));
+
+ Long closedSegmentsNum = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM)).values().iterator().next().getValue();
+ Assert.assertEquals(2L, closedSegmentsNum.longValue());
+
+ Long closedSegmentsSizeInBytes = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES)).values().iterator().next().getValue();
+ Assert.assertEquals(closedSegmentsSizeInBytes.longValue(), cache.getClosedSegmentsSizeInBytes());
+
+ Long openSegmentSizeInBytes = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+ s.contains(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES)).values().iterator().next().getValue();
+ Assert.assertEquals(openSegmentSizeInBytes.longValue(), cache.getOpenSegmentSizeInBytes());
+ }
}