You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/06/25 16:37:32 UTC

[incubator-ratis] branch master updated: RATIS-833. Add metrics for raft log cache count and size in bytes (#78)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cc6aca  RATIS-833. Add metrics for raft log cache count and size in bytes (#78)
0cc6aca is described below

commit 0cc6aca13497f638a3ad1e2224fe614b750d2a5b
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Jun 26 00:37:23 2020 +0800

    RATIS-833. Add metrics for raft log cache count and size in bytes (#78)
---
 .../ratis/server/metrics/RaftLogMetrics.java       | 25 ++++++++++++++++
 .../raftlog/segmented/SegmentedRaftLogCache.java   | 32 ++++++++++++++++++--
 .../segmented/TestSegmentedRaftLogCache.java       | 35 +++++++++++++++++++++-
 3 files changed, 88 insertions(+), 4 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
index e8725ba..6a5de00 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/RaftLogMetrics.java
@@ -23,6 +23,7 @@ import java.util.Queue;
 import org.apache.ratis.metrics.MetricRegistryInfo;
 import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.metrics.RatisMetrics;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache;
 import org.apache.ratis.util.DataQueue;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 
@@ -55,6 +56,12 @@ public class RaftLogMetrics extends RatisMetrics {
   public static final String RAFT_LOG_CACHE_MISS_COUNT = "cacheMissCount";
   // Count of RaftLogCache Hits
   public static final String RAFT_LOG_CACHE_HIT_COUNT = "cacheHitCount";
+  // Number of SegmentedRaftLogCache::closedSegments
+  public static final String RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM = "closedSegmentsNum";
+  // Size of SegmentedRaftLogCache::closedSegments in bytes
+  public static final String RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES = "closedSegmentsSizeInBytes";
+  // Size of SegmentedRaftLogCache::openSegment in bytes
+  public static final String RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES = "openSegmentSizeInBytes";
   // Total time taken to append a raft log entry
   public static final String RAFT_LOG_APPEND_ENTRY_LATENCY = "appendEntryLatency";
   // Time spent by a Raft log operation in the queue.
@@ -103,6 +110,24 @@ public class RaftLogMetrics extends RatisMetrics {
     });
   }
 
+  public void addClosedSegmentsNum(SegmentedRaftLogCache cache) {
+    registry.gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM, () -> () -> {
+      return cache.getCachedSegmentNum();
+    });
+  }
+
+  public void addClosedSegmentsSizeInBytes(SegmentedRaftLogCache cache) {
+    registry.gauge(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES, () -> () -> {
+      return cache.getClosedSegmentsSizeInBytes();
+    });
+  }
+
+  public void addOpenSegmentSizeInBytes(SegmentedRaftLogCache cache) {
+    registry.gauge(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES, () -> () -> {
+      return cache.getOpenSegmentSizeInBytes();
+    });
+  }
+
   public void addLogWorkerQueueSizeGauge(Queue queue) {
     registry.gauge(RAFT_LOG_WORKER_QUEUE_SIZE, () -> () -> queue.size());
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
index bd2dff8..98d3099 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogCache.java
@@ -45,7 +45,7 @@ import java.util.function.Consumer;
  * caches all the segments in the memory. The cache is not thread-safe and
  * requires external lock protection.
  */
-class SegmentedRaftLogCache {
+public class SegmentedRaftLogCache {
   public static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogCache.class);
 
   static final class SegmentFileInfo {
@@ -138,10 +138,12 @@ class SegmentedRaftLogCache {
     private final Object name;
     private final List<LogSegment> segments = new ArrayList<>();
     private final AutoCloseableReadWriteLock lock;
+    private long sizeInBytes;
 
     LogSegmentList(Object name) {
       this.name = name;
       this.lock = new AutoCloseableReadWriteLock(name);
+      this.sizeInBytes = 0;
     }
 
     AutoCloseableLock readLock() {
@@ -166,6 +168,10 @@ class SegmentedRaftLogCache {
       }
     }
 
+    long sizeInBytes() {
+      return sizeInBytes;
+    }
+
     long countCached() {
       try(AutoCloseableLock readLock = readLock()) {
         return segments.stream().filter(LogSegment::hasCache).count();
@@ -226,6 +232,7 @@ class SegmentedRaftLogCache {
 
     boolean add(LogSegment logSegment) {
       try(AutoCloseableLock writeLock = writeLock()) {
+        sizeInBytes += logSegment.getTotalSize();
         return segments.add(logSegment);
       }
     }
@@ -234,6 +241,7 @@ class SegmentedRaftLogCache {
       try(AutoCloseableLock writeLock = writeLock()) {
         segments.forEach(LogSegment::clear);
         segments.clear();
+        sizeInBytes = 0;
       }
     }
 
@@ -253,6 +261,7 @@ class SegmentedRaftLogCache {
               final SegmentFileInfo info = new SegmentFileInfo(openSegment.getStartIndex(),
                   oldEnd, true, openSegment.getTotalSize(), openSegment.getEndIndex());
               segments.add(openSegment);
+              sizeInBytes += openSegment.getTotalSize();
               clearOpenSegment.run();
               return new TruncationSegments(info, Collections.emptyList());
             }
@@ -261,12 +270,15 @@ class SegmentedRaftLogCache {
           final LogSegment ts = segments.get(segmentIndex);
           final long oldEnd = ts.getEndIndex();
           final List<SegmentFileInfo> list = new ArrayList<>();
+          sizeInBytes -= ts.getTotalSize();
           ts.truncate(index);
+          sizeInBytes += ts.getTotalSize();
           final int size = segments.size();
           for(int i = size - 1;
               i >= (ts.numOfEntries() == 0? segmentIndex: segmentIndex + 1);
               i--) {
             LogSegment s = segments.remove(i);
+            sizeInBytes -= s.getTotalSize();
             final long endOfS = i == segmentIndex? oldEnd: s.getEndIndex();
             s.clear();
             list.add(new SegmentFileInfo(s.getStartIndex(), endOfS, false, 0, s.getEndIndex()));
@@ -292,10 +304,13 @@ class SegmentedRaftLogCache {
             list.add(SegmentFileInfo.newClosedSegmentFileInfo(ls));
           }
           segments.clear();
+          sizeInBytes = 0;
         } else if (segmentIndex >= 0) {
           // we start to purge the closedSegments which do not overlap with index.
           for (int i = segmentIndex - 1; i >= 0; i--) {
-            list.add(SegmentFileInfo.newClosedSegmentFileInfo(segments.remove(i)));
+            LogSegment segment = segments.remove(i);
+            sizeInBytes -= segment.getTotalSize();
+            list.add(SegmentFileInfo.newClosedSegmentFileInfo(segment));
           }
         } else {
           throw new IllegalStateException("Unexpected gap in segments: binarySearch(" + index + ") returns "
@@ -334,6 +349,9 @@ class SegmentedRaftLogCache {
     this.closedSegments = new LogSegmentList(name);
     this.storage = storage;
     this.raftLogMetrics = raftLogMetrics;
+    this.raftLogMetrics.addClosedSegmentsNum(this);
+    this.raftLogMetrics.addClosedSegmentsSizeInBytes(this);
+    this.raftLogMetrics.addOpenSegmentSizeInBytes(this);
     this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
   }
 
@@ -350,10 +368,18 @@ class SegmentedRaftLogCache {
     }
   }
 
-  long getCachedSegmentNum() {
+  public long getCachedSegmentNum() {
     return closedSegments.countCached();
   }
 
+  public long getClosedSegmentsSizeInBytes() {
+    return closedSegments.sizeInBytes();
+  }
+
+  public long getOpenSegmentSizeInBytes() {
+    return openSegment.getTotalSize();
+  }
+
   boolean shouldEvict() {
     return closedSegments.countCached() > maxCachedSegments;
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
index a42b43d..1d4f8ca 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLogCache.java
@@ -17,17 +17,22 @@
  */
 package org.apache.ratis.server.raftlog.segmented;
 
+import static org.apache.ratis.server.metrics.RaftLogMetrics.*;
+
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.stream.IntStream;
 
 import org.apache.ratis.RaftTestUtil.SimpleOperation;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.metrics.RaftLogMetrics;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogCache.TruncationSegments;
 import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -36,10 +41,19 @@ public class TestSegmentedRaftLogCache {
   private static final RaftProperties prop = new RaftProperties();
 
   private SegmentedRaftLogCache cache;
+  private RaftLogMetrics raftLogMetrics;
+  private RatisMetricRegistry ratisMetricRegistry;
 
   @Before
   public void setup() {
-    cache = new SegmentedRaftLogCache(null, null, prop);
+    raftLogMetrics = new RaftLogMetrics("test");
+    ratisMetricRegistry = raftLogMetrics.getRegistry();
+    cache = new SegmentedRaftLogCache(null, null, prop, raftLogMetrics);
+  }
+
+  @After
+  public void clear() {
+    raftLogMetrics.unregister();
   }
 
   private LogSegment prepareLogSegment(long start, long end, boolean isOpen) {
@@ -301,4 +315,23 @@ public class TestSegmentedRaftLogCache {
     Iterator<TermIndex> iterator = cache.iterator(300);
     Assert.assertFalse(iterator.hasNext());
   }
+
+  @Test
+  public void testCacheMetric() {
+    cache.addSegment(prepareLogSegment(0, 99, false));
+    cache.addSegment(prepareLogSegment(100, 200, false));
+    cache.addSegment(prepareLogSegment(201, 300, true));
+
+    Long closedSegmentsNum = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+        s.contains(RAFT_LOG_CACHE_CLOSED_SEGMENTS_NUM)).values().iterator().next().getValue();
+    Assert.assertEquals(2L, closedSegmentsNum.longValue());
+
+    Long closedSegmentsSizeInBytes = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+        s.contains(RAFT_LOG_CACHE_CLOSED_SEGMENTS_SIZE_IN_BYTES)).values().iterator().next().getValue();
+    Assert.assertEquals(closedSegmentsSizeInBytes.longValue(), cache.getClosedSegmentsSizeInBytes());
+
+    Long openSegmentSizeInBytes = (Long) ratisMetricRegistry.getGauges((s, metric) ->
+        s.contains(RAFT_LOG_CACHE_OPEN_SEGMENT_SIZE_IN_BYTES)).values().iterator().next().getValue();
+    Assert.assertEquals(openSegmentSizeInBytes.longValue(), cache.getOpenSegmentSizeInBytes());
+  }
 }