You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/03/28 05:42:19 UTC
incubator-ratis git commit: RATIS-176. Log Appender should throw an
Exception in case append entry size exceeds the maxBufferSize configured.
Contributed by Shashikant Banerjee
Repository: incubator-ratis
Updated Branches:
refs/heads/master 9494b75c1 -> f2fed3a4d
RATIS-176. Log Appender should throw an Exception in case append entry size exceeds the maxBufferSize configured. Contributed by Shashikant Banerjee
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/f2fed3a4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/f2fed3a4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/f2fed3a4
Branch: refs/heads/master
Commit: f2fed3a4d88b353f22df9a4b88445842ca9a9577
Parents: 9494b75
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Wed Mar 28 13:41:26 2018 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Wed Mar 28 13:41:26 2018 +0800
----------------------------------------------------------------------
.../ratis/protocol/StateMachineException.java | 5 +++-
.../apache/ratis/server/impl/ServerState.java | 4 +++-
.../ratis/server/storage/MemoryRaftLog.java | 4 ++--
.../apache/ratis/server/storage/RaftLog.java | 10 +++++++-
.../server/storage/RaftLogIOException.java | 4 ++++
.../ratis/server/storage/SegmentedRaftLog.java | 3 ++-
.../org/apache/ratis/RaftExceptionBaseTest.java | 25 +++++++++++++++++++-
7 files changed, 48 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 56f1160..a9710b2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -19,7 +19,10 @@ package org.apache.ratis.protocol;
public class StateMachineException extends RaftException {
public StateMachineException(RaftPeerId serverId, Throwable cause) {
- super(cause.getClass().getName() + " from Server " + serverId, cause);
+ // cause.getMessage is added to this exception message as the exception received through
+ // RPC call contains similar message but Simulated RPC doesn't. Adding the message
+ // from cause to this exception makes it consistent across simulated and other RPC implementations.
+ super(cause.getClass().getName() + " from Server " + serverId + ": " + cause.getMessage(), cause);
}
public StateMachineException(String msg) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 6354e73..5d91120 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -146,7 +146,9 @@ public class ServerState implements Closeable {
throws IOException {
final RaftLog log;
if (RaftServerConfigKeys.Log.useMemory(prop)) {
- log = new MemoryRaftLog(id);
+ final int maxBufferSize =
+ RaftServerConfigKeys.Log.Appender.bufferCapacity(prop).getSizeInt();
+ log = new MemoryRaftLog(id, maxBufferSize);
} else {
log = new SegmentedRaftLog(id, server, this.storage,
lastIndexInSnapshot, prop);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index 3c14d2c..98162b7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -38,8 +38,8 @@ import java.util.concurrent.CompletableFuture;
public class MemoryRaftLog extends RaftLog {
private final List<LogEntryProto> entries = new ArrayList<>();
- public MemoryRaftLog(RaftPeerId selfId) {
- super(selfId);
+ public MemoryRaftLog(RaftPeerId selfId, int maxBufferSize) {
+ super(selfId, maxBufferSize);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 91f3b41..6ae3216 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -59,12 +59,14 @@ public abstract class RaftLog implements Closeable {
protected final AtomicLong lastCommitted =
new AtomicLong(RaftServerConstants.INVALID_LOG_INDEX);
private final RaftPeerId selfId;
+ private final int maxBufferSize;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private volatile boolean isOpen = false;
- public RaftLog(RaftPeerId selfId) {
+ public RaftLog(RaftPeerId selfId, int maxBufferSize) {
this.selfId = selfId;
+ this.maxBufferSize = maxBufferSize;
}
public long getLastCommittedIndex() {
@@ -147,6 +149,12 @@ public abstract class RaftLog implements Closeable {
final LogEntryProto e = ProtoUtils.toLogEntryProto(
operation.getSMLogEntry(), term, nextIndex, clientId, callId);
+ int entrySize = e.getSerializedSize();
+ if (entrySize > maxBufferSize) {
+ throw new StateMachineException(selfId, new RaftLogIOException(
+ "Log entry size " + entrySize + " exceeds the max buffer limit of "
+ + maxBufferSize));
+ }
appendEntry(e);
operation.setLogEntry(e);
return nextIndex;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
index b9fb5ff..0fbf737 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLogIOException.java
@@ -26,4 +26,8 @@ public class RaftLogIOException extends RaftException {
public RaftLogIOException(Throwable cause) {
super(cause);
}
+
+ public RaftLogIOException(String msg) {
+ super(msg);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 3f14571..1c00175 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -105,7 +105,8 @@ public class SegmentedRaftLog extends RaftLog {
public SegmentedRaftLog(RaftPeerId selfId, RaftServerImpl server,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties)
throws IOException {
- super(selfId);
+ super(selfId, RaftServerConfigKeys.Log.Appender.bufferCapacity(properties)
+ .getSizeInt());
this.server = server;
this.storage = storage;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/f2fed3a4/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 4562cb8..a38488a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -21,10 +21,12 @@ import org.apache.log4j.Level;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.storage.RaftLog;
-import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.LogUtils;
import org.junit.After;
import org.junit.Assert;
@@ -51,6 +53,9 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
@Before
public void setup() throws IOException {
+ final RaftProperties prop = getProperties();
+ RaftServerConfigKeys.Log.Appender
+ .setBufferCapacity(prop, SizeInBytes.valueOf("4KB"));
cluster = newCluster(NUM_PEERS);
cluster.start();
}
@@ -202,4 +207,22 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
StateMachineException.class, StaleReadException.class);
}
}
+
+ @Test
+ public void testLogAppenderBufferCapacity() throws Exception {
+ RaftTestUtil.waitForLeader(cluster);
+ final RaftPeerId leaderId = cluster.getLeader().getId();
+ final RaftClient client = cluster.createClient(leaderId);
+ byte[] bytes = new byte[8192];
+ Arrays.fill(bytes, (byte) 1);
+ SimpleMessage msg =
+ new SimpleMessage(new String(bytes));
+ try {
+ client.send(msg);
+ Assert.fail("Expected StateMachineException not thrown");
+ } catch (StateMachineException sme) {
+ Assert.assertTrue(sme.getMessage()
+ .contains("exceeds the max buffer limit"));
+ }
+ }
}