You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/03/28 05:42:19 UTC

incubator-ratis git commit: RATIS-176. Log Appender should throw an Exception in case append entry size exceeds the maxBufferSize configured. Contributed by Shashikant Banerjee

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 9494b75c1 -> f2fed3a4d


RATIS-176. Log Appender should throw an Exception in case append entry size exceeds the maxBufferSize configured.  Contributed by Shashikant Banerjee


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/f2fed3a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/f2fed3a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/f2fed3a4

Branch: refs/heads/master
Commit: f2fed3a4d88b353f22df9a4b88445842ca9a9577
Parents: 9494b75
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Mar 28 13:41:26 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Mar 28 13:41:26 2018 +0800

----------------------------------------------------------------------
 .../ratis/protocol/StateMachineException.java   |  5 +++-
 .../apache/ratis/server/impl/ServerState.java   |  4 +++-
 .../ratis/server/storage/MemoryRaftLog.java     |  4 ++--
 .../apache/ratis/server/storage/RaftLog.java    | 10 +++++++-
 .../server/storage/RaftLogIOException.java      |  4 ++++
 .../ratis/server/storage/SegmentedRaftLog.java  |  3 ++-
 .../org/apache/ratis/RaftExceptionBaseTest.java | 25 +++++++++++++++++++-
 7 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 56f1160..a9710b2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -19,7 +19,10 @@ package org.apache.ratis.protocol;
 
 public class StateMachineException extends RaftException {
   public StateMachineException(RaftPeerId serverId, Throwable cause) {
-    super(cause.getClass().getName() + " from Server " + serverId, cause);
+    // cause.getMessage is added to this exception message as the exception received through
+    // RPC call contains similar message but Simulated RPC doesn't. Adding the message
+    // from cause to this exception makes it consistent across simulated and other RPC implementations.
+    super(cause.getClass().getName() + " from Server " + serverId + ": " + cause.getMessage(), cause);
   }
 
   public StateMachineException(String msg) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 6354e73..5d91120 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -146,7 +146,9 @@ public class ServerState implements Closeable {
       throws IOException {
     final RaftLog log;
     if (RaftServerConfigKeys.Log.useMemory(prop)) {
-      log = new MemoryRaftLog(id);
+      final int maxBufferSize =
+          RaftServerConfigKeys.Log.Appender.bufferCapacity(prop).getSizeInt();
+      log = new MemoryRaftLog(id, maxBufferSize);
     } else {
       log = new SegmentedRaftLog(id, server, this.storage,
           lastIndexInSnapshot, prop);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index 3c14d2c..98162b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -38,8 +38,8 @@ import java.util.concurrent.CompletableFuture;
 public class MemoryRaftLog extends RaftLog {
   private final List<LogEntryProto> entries = new ArrayList<>();
 
-  public MemoryRaftLog(RaftPeerId selfId) {
-    super(selfId);
+  public MemoryRaftLog(RaftPeerId selfId, int maxBufferSize) {
+    super(selfId, maxBufferSize);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 91f3b41..6ae3216 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -59,12 +59,14 @@ public abstract class RaftLog implements Closeable {
   protected final AtomicLong lastCommitted =
       new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
   private final RaftPeerId selfId;
+  private final int maxBufferSize;
 
   private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
   private volatile boolean isOpen = false;
 
-  public RaftLog(RaftPeerId selfId) {
+  public RaftLog(RaftPeerId selfId, int maxBufferSize) {
     this.selfId = selfId;
+    this.maxBufferSize = maxBufferSize;
   }
 
   public long getLastCommittedIndex() {
@@ -147,6 +149,12 @@ public abstract class RaftLog implements Closeable {
       final LogEntryProto e = ProtoUtils.toLogEntryProto(
           operation.getSMLogEntry(), term, nextIndex, clientId, callId);
 
+      int entrySize = e.getSerializedSize();
+      if (entrySize > maxBufferSize) {
+        throw new StateMachineException(selfId, new RaftLogIOException(
+            "Log entry size " + entrySize + " exceeds the max buffer limit of "
+                + maxBufferSize));
+      }
       appendEntry(e);
       operation.setLogEntry(e);
       return nextIndex;

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
index b9fb5ff..0fbf737 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
@@ -26,4 +26,8 @@ public class RaftLogIOException extends RaftException {
   public RaftLogIOException(Throwable cause) {
     super(cause);
   }
+
+  public RaftLogIOException(String msg) {
+    super(msg);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 3f14571..1c00175 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -105,7 +105,8 @@ public class SegmentedRaftLog extends RaftLog {
   public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
       RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties)
       throws IOException {
-    super(selfId);
+    super(selfId, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties)
+        .getSizeInt());
     this.server = server;
     this.storage = storage;
     segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 4562cb8..a38488a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -21,10 +21,12 @@ import org.apache.log4j.Level;
 import org.apache.ratis.RaftTestUtil.SimpleMessage;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.LogUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -51,6 +53,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
 
   @Before
   public void setup() throws IOException {
+    final RaftProperties prop = getProperties();
+    RaftServerConfigKeys.Log.Appender
+        .setBufferCapacity(prop, SizeInBytes.valueOf("4KB"));
     cluster = newCluster(NUM_PEERS);
     cluster.start();
   }
@@ -202,4 +207,22 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
           StateMachineException.class, StaleReadException.class);
     }
   }
+
+  @Test
+  public void testLogAppenderBufferCapacity() throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = cluster.getLeader().getId();
+    final RaftClient client = cluster.createClient(leaderId);
+    byte[] bytes = new byte[8192];
+    Arrays.fill(bytes, (byte) 1);
+    SimpleMessage msg =
+        new SimpleMessage(new String(bytes));
+    try {
+      client.send(msg);
+      Assert.fail("Expected StateMachineException  not thrown");
+    } catch (StateMachineException sme) {
+      Assert.assertTrue(sme.getMessage()
+          .contains("exceeds the max buffer limit"));
+    }
+  }
 }