You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/12 10:32:41 UTC
incubator-ratis git commit: RATIS-161. gRPC Log appender should check
if the appended request size is less than max message size of the server.
Contributed by Mukul Kumar Singh
Repository: incubator-ratis
Updated Branches:
refs/heads/master c1b23fdb0 -> 2b8467ea9
RATIS-161. gRPC Log appender should check if the appended request size is less than max message size of the server. Contributed by Mukul Kumar Singh
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2b8467ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2b8467ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2b8467ea
Branch: refs/heads/master
Commit: 2b8467ea9e06a972be87b0da95768755c5864773
Parents: c1b23fd
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Tue Dec 12 18:31:48 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Tue Dec 12 18:31:48 2017 +0800
----------------------------------------------------------------------
.../org/apache/ratis/grpc/RaftGRpcService.java | 17 +++++++--
.../apache/ratis/server/impl/LogAppender.java | 37 ++++++++++++--------
2 files changed, 36 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2b8467ea/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
index e0af140..e4701cc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/RaftGRpcService.java
@@ -25,6 +25,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.shaded.io.grpc.Server;
import org.apache.ratis.shaded.io.grpc.ServerBuilder;
@@ -32,6 +33,7 @@ import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder;
import org.apache.ratis.shaded.proto.RaftProtos.*;
import org.apache.ratis.util.CodeInjectionForTesting;
import org.apache.ratis.util.ExitUtils;
+import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,9 +78,18 @@ public class RaftGRpcService implements RaftServerRpc {
private RaftGRpcService(RaftServer server) {
this(server,
GrpcConfigKeys.Server.port(server.getProperties()),
- GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt());
- }
- private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize) {
+ GrpcConfigKeys.messageSizeMax(server.getProperties()).getSizeInt(),
+ GrpcConfigKeys.messageSizeMax(server.getProperties()),
+ RaftServerConfigKeys.Log.Appender.bufferCapacity(server.getProperties()));
+ }
+ private RaftGRpcService(RaftServer raftServer, int port, int maxMessageSize,
+ SizeInBytes grpcMessageSizeMax, SizeInBytes appenderBufferSize) {
+ if (appenderBufferSize.getSize() > grpcMessageSizeMax.getSize()) {
+ throw new IllegalArgumentException("Illegal configuration: "
+ + RaftServerConfigKeys.Log.Appender.BUFFER_CAPACITY_KEY + " = " + appenderBufferSize
+ + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
+ }
+
ServerBuilder serverBuilder = ServerBuilder.forPort(port);
idSupplier = raftServer::getId;
server = ((NettyServerBuilder) serverBuilder).maxMessageSize(maxMessageSize)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2b8467ea/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a5b0791..556ba83 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -114,13 +114,20 @@ public class LogAppender extends Daemon {
private final List<LogEntryProto> buf = new ArrayList<>();
private int totalSize = 0;
- void addEntry(LogEntryProto entry) {
- buf.add(entry);
- totalSize += entry.getSerializedSize();
- }
-
- boolean isFull() {
- return totalSize >= maxBufferSize;
+ /**
+ * Adds a log entry to the Log entry buffer.
+ * Checks if enough space is available before adding the entry to the buffer.
+ * @return true if the entry is added successfully;
+ * otherwise, the entry is not added, return false.
+ */
+ boolean addEntry(LogEntryProto entry) {
+ final long entrySize = entry.getSerializedSize();
+ if (totalSize + entrySize <= maxBufferSize) {
+ buf.add(entry);
+ totalSize += entrySize;
+ return true;
+ }
+ return false;
}
boolean isEmpty() {
@@ -159,20 +166,20 @@ public class LogAppender extends Daemon {
final TermIndex previous = getPrevious();
final long leaderNext = raftLog.getNextIndex();
long next = follower.getNextIndex() + buffer.getPendingEntryNum();
- boolean toSend = false;
+ final boolean toSend;
if (leaderNext == next && !buffer.isEmpty()) {
// no new entries, then send out the entries in the buffer
toSend = true;
} else if (leaderNext > next) {
- while (leaderNext > next && !buffer.isFull()) {
- // stop adding entry once the buffer size is >= the max size
- buffer.addEntry(raftLog.get(next++));
- }
- if (buffer.isFull() || !batchSending) {
- // buffer is full or batch sending is disabled, send out a request
- toSend = true;
+ boolean hasSpace = true;
+ for(; hasSpace && leaderNext > next;) {
+ hasSpace = buffer.addEntry(raftLog.get(next++));
}
+ // buffer is full or batch sending is disabled, send out a request
+ toSend = !hasSpace || !batchSending;
+ } else {
+ toSend = false;
}
if (toSend || shouldHeartbeat()) {