You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/07 12:01:02 UTC

[incubator-ratis] branch master updated: RATIS-1211. Fix TestLogAppenderWithSimulatedRpc.testUnlimitedElementBuffer throw NPE (#332)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a92491  RATIS-1211. Fix TestLogAppenderWithSimulatedRpc.testUnlimitedElementBuffer throw NPE (#332)
8a92491 is described below

commit 8a92491be43c9b7c0b4871e466efb3ef66dfb139
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Dec 7 20:00:51 2020 +0800

    RATIS-1211. Fix TestLogAppenderWithSimulatedRpc.testUnlimitedElementBuffer throw NPE (#332)
---
 .../apache/ratis/server/raftlog/segmented/LogSegment.java   | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
index fc0509e..df3b6a3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java
@@ -43,6 +43,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 
@@ -229,10 +230,16 @@ public class LogSegment implements Comparable<Long> {
       final File file = getSegmentFile();
       // note the loading should not exceed the endIndex: it is possible that
       // the on-disk log file should be truncated but has not been done yet.
-      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics,
-          entry -> putEntryCache(ServerProtoUtils.toTermIndex(entry), entry, Op.LOAD_SEGMENT_FILE));
+      final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
+      readSegmentFile(file, startIndex, endIndex, isOpen, getLogCorruptionPolicy(), raftLogMetrics, entry -> {
+        final TermIndex ti = ServerProtoUtils.toTermIndex(entry);
+        putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE);
+        if (ti.equals(key.getTermIndex())) {
+          toReturn.set(entry);
+        }
+      });
       loadingTimes.incrementAndGet();
-      return Objects.requireNonNull(entryCache.get(key.getTermIndex()));
+      return Objects.requireNonNull(toReturn.get());
     }
   }