You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2018/08/03 20:45:40 UTC
incubator-ratis git commit: RATIS-289. Optimize readStateMachine by
allowing to enqueue multiple read requests in parallel. Contributed by Mukul
Kumar Singh
Repository: incubator-ratis
Updated Branches:
refs/heads/master c2423179b -> 0236eea30
RATIS-289. Optimize readStateMachine by allowing to enqueue multiple read requests in parallel. Contributed by Mukul Kumar Singh
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/0236eea3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/0236eea3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/0236eea3
Branch: refs/heads/master
Commit: 0236eea307fef8f8829b19dfef8919fb665f42fc
Parents: c242317
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Fri Aug 3 13:44:50 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Fri Aug 3 13:44:50 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/ratis/util/ProtoUtils.java | 15 ++++++-
ratis-proto-shaded/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/LogAppender.java | 14 ++++--
.../ratis/server/storage/MemoryRaftLog.java | 4 +-
.../apache/ratis/server/storage/RaftLog.java | 46 +++++++++++++++++++-
.../ratis/server/storage/SegmentedRaftLog.java | 16 ++-----
6 files changed, 75 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2ac2ca5..06b1346 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -190,10 +190,23 @@ public interface ProtoUtils {
.setSmLogEntry
(SMLogEntryProto.newBuilder()
.setData(smLog.getData())
- .setStateMachineDataAttached(true))
+ .setStateMachineDataAttached(true)
+ .setSerializedProtobufSize(entry.getSerializedSize()))
.build();
}
+ static long getSerializedSize(LogEntryProto entry) {
+ if (entry.getLogEntryBodyCase() != LogEntryBodyCase.SMLOGENTRY) {
+ return entry.getSerializedSize();
+ }
+ final SMLogEntryProto smLog = entry.getSmLogEntry();
+ if (!smLog.getStateMachineDataAttached()) {
+ // if state machine data was never set, return the proto serialized size
+ return entry.getSerializedSize();
+ }
+ return smLog.getSerializedProtobufSize();
+ }
+
static IOException toIOException(ServiceException se) {
final Throwable t = se.getCause();
if (t == null) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 2965f97..62202be 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -47,6 +47,7 @@ message SMLogEntryProto {
bytes stateMachineData = 2; // State machine specific data which is not written to log.
bool stateMachineDataAttached = 3; // set this flag when state machine data is attached.
+ uint64 serializedProtobufSize = 4; // size of the serialized LogEntryProto along with stateMachineData
}
message LeaderNoOp {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index 0398052..b863bad 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.LeaderState.StateUpdateEventType;
import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftLog.EntryWithData;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.storage.RaftLog;
import org.apache.ratis.server.storage.RaftLogIOException;
@@ -134,7 +135,7 @@ public class LogAppender {
* A buffer for log entries with size limitation.
*/
private class LogEntryBuffer {
- private final List<LogEntryProto> buf = new ArrayList<>();
+ private final List<EntryWithData> buf = new ArrayList<>();
private int totalSize = 0;
/**
@@ -143,7 +144,7 @@ public class LogAppender {
* @return true if the entry is added successfully;
* otherwise, the entry is not added, return false.
*/
- boolean addEntry(LogEntryProto entry) {
+ boolean addEntry(EntryWithData entry) {
final long entrySize = entry.getSerializedSize();
if (totalSize + entrySize <= maxBufferSize) {
buf.add(entry);
@@ -157,9 +158,14 @@ public class LogAppender {
return buf.isEmpty();
}
- AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) {
+ AppendEntriesRequestProto getAppendRequest(TermIndex previous, long callId) throws RaftLogIOException {
+ final List<LogEntryProto> protos = new ArrayList<>();
+ // Wait for all the log entry futures to complete and then create a list of LogEntryProtos.
+ for (EntryWithData bufEntry : buf) {
+ protos.add(bufEntry.getEntry());
+ }
final AppendEntriesRequestProto request = leaderState.newAppendEntriesRequestProto(
- getFollowerId(), previous, buf, !follower.isAttendingVote(), callId);
+ getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
buf.clear();
totalSize = 0;
return request;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
index eba6f63..7b3b1b9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/MemoryRaftLog.java
@@ -52,8 +52,8 @@ public class MemoryRaftLog extends RaftLog {
}
@Override
- public LogEntryProto getEntryWithData(long index) {
- return get(index);
+ public EntryWithData getEntryWithData(long index) {
+ return new EntryWithData(get(index), null);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 6dc8835..0d2ec4c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -20,6 +20,7 @@ package org.apache.ratis.server.storage;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.impl.ServerProtoUtils;
@@ -27,6 +28,7 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.AutoCloseableLock;
+import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ProtoUtils;
import org.slf4j.Logger;
@@ -200,7 +202,7 @@ public abstract class RaftLog implements Closeable {
* @return The log entry associated with the given index.
* Null if there is no log entry with the index.
*/
- public abstract LogEntryProto getEntryWithData(long index) throws RaftLogIOException;
+ public abstract EntryWithData getEntryWithData(long index) throws RaftLogIOException;
/**
* Get the TermIndex information of the given index.
@@ -326,4 +328,46 @@ public abstract class RaftLog implements Closeable {
public RaftPeerId getSelfId() {
return selfId;
}
+
+ /**
+ * Holds proto entry along with future which contains read state machine data
+ */
+ public class EntryWithData {
+ private LogEntryProto logEntry;
+ private CompletableFuture<LogEntryProto> future;
+
+ EntryWithData(LogEntryProto logEntry, CompletableFuture<LogEntryProto> future) {
+ this.logEntry = logEntry;
+ this.future = future;
+ }
+
+ public long getSerializedSize() {
+ return ProtoUtils.getSerializedSize(logEntry);
+ }
+
+ public LogEntryProto getEntry() throws RaftLogIOException {
+ LogEntryProto entryProto;
+ if (future == null) {
+ return logEntry;
+ }
+
+ try {
+ entryProto = future.join();
+ } catch (Throwable t) {
+ final String err = selfId + ": Failed readStateMachineData for " +
+ ServerProtoUtils.toLogEntryString(logEntry);
+ LogAppender.LOG.error(err, t);
+ throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(t));
+ }
+ // by this time we have already read the state machine data,
+ // so the log entry data should be set now
+ if (ProtoUtils.shouldReadStateMachineData(entryProto)) {
+ final String err = selfId + ": State machine data not set for " +
+ ServerProtoUtils.toLogEntryString(logEntry);
+ LogAppender.LOG.error(err);
+ throw new RaftLogIOException(err);
+ }
+ return entryProto;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/0236eea3/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 7f59518..9df16f8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -194,30 +194,20 @@ public class SegmentedRaftLog extends RaftLog {
}
@Override
- public LogEntryProto getEntryWithData(long index) throws RaftLogIOException {
+ public EntryWithData getEntryWithData(long index) throws RaftLogIOException {
final LogEntryProto entry = get(index);
if (!ProtoUtils.shouldReadStateMachineData(entry)) {
- return entry;
+ return new EntryWithData(entry, null);
}
- LogEntryProto logEntryProto;
try {
- logEntryProto = server.getStateMachine().readStateMachineData(entry).join();
+ return new EntryWithData(entry, server.getStateMachine().readStateMachineData(entry));
} catch (Throwable e) {
final String err = server.getId() + ": Failed readStateMachineData for " +
ServerProtoUtils.toLogEntryString(entry);
LOG.error(err, e);
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
}
- // by this time we have already read the state machine data,
- // so the log entry data should be set now
- if (!ProtoUtils.shouldReadStateMachineData(logEntryProto)) {
- final String err = server.getId() + ": State machine data not set for " +
- ServerProtoUtils.toLogEntryString(logEntryProto);
- LOG.error(err);
- throw new RaftLogIOException(err);
- }
- return logEntryProto;
}
private void checkAndEvictCache() {