You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/12 10:32:41 UTC

incubator-ratis git commit: RATIS-161. gRPC Log appender should check if the appended request size is less than max message size of the server. Contributed by Mukul Kumar Singh

Repository: incubator-ratis
Updated Branches:
  refs/heads/master c1b23fdb0 -> 2b8467ea9


RATIS-161. gRPC Log appender should check if the appended request size is less than max message size of the server.  Contributed by Mukul Kumar Singh


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2b8467ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2b8467ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2b8467ea

Branch: refs/heads/master
Commit: 2b8467ea9e06a972be87b0da95768755c5864773
Parents: c1b23fd
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Dec 12 18:31:48 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Dec 12 18:31:48 2017 +0800

----------------------------------------------------------------------
 .../org/apache/ratis/grpc/RaftGRpcService.java  | 17 +++++++--
 .../apache/ratis/server/impl/LogAppender.java   | 37 ++++++++++++--------
 2 files changed, 36 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2b8467ea/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index e0af140..e4701cc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerRpc;
 import org.apache.ratis.shaded.io.grpc.Server;
 import org.apache.ratis.shaded.io.grpc.ServerBuilder;
@@ -32,6 +33,7 @@ import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
 import org.apache.ratis.shaded.proto.RaftProtos.*;
 import org.apache.ratis.util.CodeInjectionForTesting;
 import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,9 +78,18 @@ public class RaftGRpcService implements RaftServerRpc {
   private RaftGRpcService(RaftServer server) {
     this(server,
         GrpcConfigKeys.Server.port(server.getProperties()),
-        GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt());
-  }
-  private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) {
+        GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(),
+        GrpcConfigKeys.messageSizeMax(server.getProperties()),
+        RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()));
+  }
+  private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize,
+      SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize) {
+    if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
+      throw new IllegalArgumentException("Illegal configuration: "
+          + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+          + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
+    }
+
     ServerBuilder serverBuilder = ServerBuilder.forPort(port);
     idSupplier = raftServer::getId;
     server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2b8467ea/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a5b0791..556ba83 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -114,13 +114,20 @@ public class LogAppender extends Daemon {
     private final List<LogEntryProto> buf = new ArrayList<>();
     private int totalSize = 0;
 
-    void addEntry(LogEntryProto entry) {
-      buf.add(entry);
-      totalSize += entry.getSerializedSize();
-    }
-
-    boolean isFull() {
-      return totalSize >= maxBufferSize;
+    /**
+     * Adds a log entry to the Log entry buffer.
+     * Checks if enough space is available before adding the entry to the buffer.
+     * @return true if the entry is added successfully;
+     *         otherwise, the entry is not added, return false.
+     */
+    boolean addEntry(LogEntryProto entry) {
+      final long entrySize = entry.getSerializedSize();
+      if (totalSize + entrySize <= maxBufferSize) {
+        buf.add(entry);
+        totalSize += entrySize;
+        return true;
+      }
+      return false;
     }
 
     boolean isEmpty() {
@@ -159,20 +166,20 @@ public class LogAppender extends Daemon {
     final TermIndex previous = getPrevious();
     final long leaderNext = raftLog.getNextIndex();
     long next = follower.getNextIndex() + buffer.getPendingEntryNum();
-    boolean toSend = false;
+    final boolean toSend;
 
     if (leaderNext == next && !buffer.isEmpty()) {
       // no new entries, then send out the entries in the buffer
       toSend = true;
     } else if (leaderNext > next) {
-      while (leaderNext > next && !buffer.isFull()) {
-        // stop adding entry once the buffer size is >= the max size
-        buffer.addEntry(raftLog.get(next++));
-      }
-      if (buffer.isFull() || !batchSending) {
-        // buffer is full or batch sending is disabled, send out a request
-        toSend = true;
+      boolean hasSpace = true;
+      for(; hasSpace && leaderNext > next;) {
+        hasSpace = buffer.addEntry(raftLog.get(next++));
       }
+      // buffer is full or batch sending is disabled, send out a request
+      toSend = !hasSpace || !batchSending;
+    } else {
+      toSend = false;
     }
 
     if (toSend || shouldHeartbeat()) {