You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/08/03 20:45:40 UTC

incubator-ratis git commit: RATIS-289. Optimize readStateMachine by allowing to enqueue multiple read requests in parallel. Contributed by Mukul Kumar Singh

Repository: incubator-ratis
Updated Branches:
  refs/heads/master c2423179b -> 0236eea30


RATIS-289. Optimize readStateMachine by allowing to enqueue multiple read requests in parallel.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0236eea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0236eea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0236eea3

Branch: refs/heads/master
Commit: 0236eea307fef8f8829b19dfef8919fb665f42fc
Parents: c242317
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Fri Aug 3 13:44:50 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Fri Aug 3 13:44:50 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/ratis/util/ProtoUtils.java  | 15 ++++++-
 ratis-proto-shaded/src/main/proto/Raft.proto    |  1 +
 .../apache/ratis/server/impl/LogAppender.java   | 14 ++++--
 .../ratis/server/storage/MemoryRaftLog.java     |  4 +-
 .../apache/ratis/server/storage/RaftLog.java    | 46 +++++++++++++++++++-
 .../ratis/server/storage/SegmentedRaftLog.java  | 16 ++-----
 6 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2ac2ca5..06b1346 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -190,10 +190,23 @@ public interface ProtoUtils {
         .setSmLogEntry
             (SMLogEntryProto.newBuilder()
             .setData(smLog.getData())
-            .setStateMachineDataAttached(true))
+            .setStateMachineDataAttached(true)
+            .setSerializedProtobufSize(entry.getSerializedSize()))
         .build();
   }
 
+  static long getSerializedSize(LogEntryProto entry) {
+    if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) {
+      return entry.getSerializedSize();
+    }
+    final SMLogEntryProto smLog = entry.getSmLogEntry();
+    if (!smLog.getStateMachineDataAttached()) {
+      // if state machine data was never set, return the proto serialized size
+      return entry.getSerializedSize();
+    }
+    return smLog.getSerializedProtobufSize();
+  }
+
   static IOException toIOException(ServiceException se) {
     final Throwable t = se.getCause();
     if (t == null) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 2965f97..62202be 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -47,6 +47,7 @@ message SMLogEntryProto {
 
   bytes stateMachineData = 2; // State machine specific data which is not written to log.
   bool stateMachineDataAttached = 3; // set this flag when state machine data is attached.
+  uint64 serializedProtobufSize = 4; // size of the serialized LogEntryProto along with stateMachineData
 }
 
 message LeaderNoOp {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 0398052..b863bad 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
 import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog.EntryWithData;
 import org.apache.ratis.server.storage.FileInfo;
 import org.apache.ratis.server.storage.RaftLog;
 import org.apache.ratis.server.storage.RaftLogIOException;
@@ -134,7 +135,7 @@ public class LogAppender {
    * A buffer for log entries with size limitation.
    */
   private class LogEntryBuffer {
-    private final List<LogEntryProto> buf = new ArrayList<>();
+    private final List<EntryWithData> buf = new ArrayList<>();
     private int totalSize = 0;
 
     /**
@@ -143,7 +144,7 @@ public class LogAppender {
      * @return true if the entry is added successfully;
      *         otherwise, the entry is not added, return false.
      */
-    boolean addEntry(LogEntryProto entry) {
+    boolean addEntry(EntryWithData entry) {
       final long entrySize = entry.getSerializedSize();
       if (totalSize + entrySize <= maxBufferSize) {
         buf.add(entry);
@@ -157,9 +158,14 @@ public class LogAppender {
       return buf.isEmpty();
     }
 
-    AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) {
+    AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) throws RaftLogIOException {
+      final List<LogEntryProto> protos = new ArrayList<>();
+      // Wait for all the log entry futures to complete and then create a list of LogEntryProtos.
+      for (EntryWithData bufEntry : buf) {
+        protos.add(bufEntry.getEntry());
+      }
       final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto(
-          getFollowerId(), previous, buf, !follower.isAttendingVote(), callId);
+          getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
       buf.clear();
       totalSize = 0;
       return request;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index eba6f63..7b3b1b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -52,8 +52,8 @@ public class MemoryRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto getEntryWithData(long index) {
-    return get(index);
+  public EntryWithData getEntryWithData(long index) {
+    return new EntryWithData(get(index), null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 6dc8835..0d2ec4c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.impl.LogAppender;
 import org.apache.ratis.server.impl.RaftConfiguration;
 import org.apache.ratis.server.impl.RaftServerConstants;
 import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -27,6 +28,7 @@ import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
 import org.slf4j.Logger;
@@ -200,7 +202,7 @@ public abstract class RaftLog implements Closeable {
    * @return The log entry associated with the given index.
    *         Null if there is no log entry with the index.
    */
-  public abstract LogEntryProto getEntryWithData(long index) throws RaftLogIOException;
+  public abstract EntryWithData getEntryWithData(long index) throws RaftLogIOException;
 
   /**
    * Get the TermIndex information of the given index.
@@ -326,4 +328,46 @@ public abstract class RaftLog implements Closeable {
   public RaftPeerId getSelfId() {
     return selfId;
   }
+
+  /**
+   * Holds proto entry along with future which contains read state machine data
+   */
+  public class EntryWithData {
+    private LogEntryProto logEntry;
+    private CompletableFuture<LogEntryProto> future;
+
+    EntryWithData(LogEntryProto logEntry, CompletableFuture<LogEntryProto> future) {
+      this.logEntry = logEntry;
+      this.future = future;
+    }
+
+    public long getSerializedSize() {
+      return ProtoUtils.getSerializedSize(logEntry);
+    }
+
+    public LogEntryProto getEntry() throws RaftLogIOException {
+      LogEntryProto entryProto;
+      if (future == null) {
+        return logEntry;
+      }
+
+      try {
+        entryProto = future.join();
+      } catch (Throwable t) {
+        final String err = selfId + ": Failed readStateMachineData for " +
+            ServerProtoUtils.toLogEntryString(logEntry);
+        LogAppender.LOG.error(err, t);
+        throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(t));
+      }
+      // by this time we have already read the state machine data,
+      // so the log entry data should be set now
+      if (ProtoUtils.shouldReadStateMachineData(entryProto)) {
+        final String err = selfId + ": State machine data not set for " +
+            ServerProtoUtils.toLogEntryString(logEntry);
+        LogAppender.LOG.error(err);
+        throw new RaftLogIOException(err);
+      }
+      return entryProto;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 7f59518..9df16f8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -194,30 +194,20 @@ public class SegmentedRaftLog extends RaftLog {
   }
 
   @Override
-  public LogEntryProto getEntryWithData(long index) throws RaftLogIOException {
+  public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
     final LogEntryProto entry = get(index);
     if (!ProtoUtils.shouldReadStateMachineData(entry)) {
-      return entry;
+      return new EntryWithData(entry, null);
     }
 
-    LogEntryProto logEntryProto;
     try {
-      logEntryProto = server.getStateMachine().readStateMachineData(entry).join();
+      return new EntryWithData(entry, server.getStateMachine().readStateMachineData(entry));
     } catch (Throwable e) {
       final String err = server.getId() + ": Failed readStateMachineData for " +
           ServerProtoUtils.toLogEntryString(entry);
       LOG.error(err, e);
       throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
     }
-    // by this time we have already read the state machine data,
-    // so the log entry data should be set now
-    if (!ProtoUtils.shouldReadStateMachineData(logEntryProto)) {
-      final String err = server.getId() + ": State machine data not set for " +
-          ServerProtoUtils.toLogEntryString(logEntryProto);
-      LOG.error(err);
-      throw new RaftLogIOException(err);
-    }
-    return logEntryProto;
   }
 
   private void checkAndEvictCache() {