You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/02/05 16:37:01 UTC
incubator-ratis git commit: RATIS-206. Change StateMachine.query to
pass and return Message. Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 1beef8741 -> 2d5365683
RATIS-206. Change StateMachine.query to pass and return Message. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/2d536568
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/2d536568
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/2d536568
Branch: refs/heads/master
Commit: 2d53656837d9d56792bc62a062dc1481aaab3024
Parents: 1beef87
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Mon Feb 5 22:06:36 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Mon Feb 5 22:06:36 2018 +0530
----------------------------------------------------------------------
.../ratis/client/impl/ClientProtoUtils.java | 16 +----------
.../java/org/apache/ratis/protocol/Message.java | 26 +++++++++++++++++
.../java/org/apache/ratis/util/StringUtils.java | 4 ++-
.../arithmetic/ArithmeticStateMachine.java | 15 +++-------
.../examples/arithmetic/AssignmentMessage.java | 4 +--
.../arithmetic/expression/Expression.java | 10 +++----
.../examples/filestore/FileStoreClient.java | 2 +-
.../filestore/FileStoreStateMachine.java | 16 +++++------
.../ratis/server/impl/RaftServerImpl.java | 3 +-
.../apache/ratis/statemachine/StateMachine.java | 4 +--
.../statemachine/impl/BaseStateMachine.java | 8 ++----
.../org/apache/ratis/RaftExceptionBaseTest.java | 4 +--
.../SimpleStateMachine4Testing.java | 30 ++++++++++++++------
13 files changed, 78 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index a7aaf54..861dba6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -209,21 +209,7 @@ public interface ClientProtoUtils {
}
static Message toMessage(final ClientMessageEntryProto p) {
- return toMessage(p.getContent());
- }
-
- static Message toMessage(final ByteString bytes) {
- return new Message() {
- @Override
- public ByteString getContent() {
- return bytes;
- }
-
- @Override
- public String toString() {
- return StringUtils.bytes2HexShortString(getContent());
- }
- };
+ return Message.valueOf(p.getContent());
}
static ClientMessageEntryProto.Builder toClientMessageEntryProtoBuilder(ByteString message) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
index 77ef267..4efd29c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/Message.java
@@ -18,11 +18,37 @@
package org.apache.ratis.protocol;
import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.StringUtils;
+
+import java.util.function.Supplier;
/**
* The information clients append to the raft ring.
*/
public interface Message {
+ static Message valueOf(ByteString bytes, Supplier<String> stringSupplier) {
+ return new Message() {
+ private final MemoizedSupplier<String> memoized = MemoizedSupplier.valueOf(stringSupplier);
+
+ @Override
+ public ByteString getContent() {
+ return bytes;
+ }
+
+ @Override
+ public String toString() {
+ return memoized.get();
+ }
+ };
+ }
+
+ static Message valueOf(ByteString bytes) {
+ return valueOf(bytes, () -> "Message:" + StringUtils.bytes2HexShortString(bytes));
+ }
+
+ Message EMPTY = valueOf(ByteString.EMPTY);
+
/**
* @return the content of the message
*/
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
index 0212a48..710a0a2 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java
@@ -67,7 +67,9 @@ public class StringUtils {
public static String bytes2HexShortString(ByteString bytes) {
final int size = bytes.size();
- if (size > 10) {
+ if (size == 0) {
+ return "<EMPTY>";
+ } else if (size > 10) {
// return only the first 10 bytes
return bytes2HexString(bytes.substring(0, 10)) + "...(size=" + size + ")";
} else {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
index 0b08b89..fafab6e 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/ArithmeticStateMachine.java
@@ -20,8 +20,6 @@ package org.apache.ratis.examples.arithmetic;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.examples.arithmetic.expression.Expression;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.protocol.TermIndex;
@@ -135,19 +133,15 @@ public class ArithmeticStateMachine extends BaseStateMachine {
}
@Override
- public CompletableFuture<RaftClientReply> query(
- RaftClientRequest request) {
- final Expression q = Expression.Utils.bytes2Expression(
- request.getMessage().getContent().toByteArray(), 0);
+ public CompletableFuture<Message> query(Message request) {
+ final Expression q = Expression.Utils.bytes2Expression(request.getContent().toByteArray(), 0);
final Double result;
try(final AutoCloseableLock readLock = readLock()) {
result = q.evaluate(variables);
}
final Expression r = Expression.Utils.double2Expression(result);
LOG.debug("QUERY: {} = {}", q, r);
- final RaftClientReply reply = new RaftClientReply(request,
- Expression.Utils.toMessage(r));
- return CompletableFuture.completedFuture(reply);
+ return CompletableFuture.completedFuture(Expression.Utils.toMessage(r));
}
@Override
@@ -158,8 +152,7 @@ public class ArithmeticStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
final LogEntryProto entry = trx.getLogEntry();
- final AssignmentMessage assignment = new AssignmentMessage(
- () -> entry.getSmLogEntry().getData());
+ final AssignmentMessage assignment = new AssignmentMessage(entry.getSmLogEntry().getData());
final long index = entry.getIndex();
final Double result;
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
index e4e7ca8..097e792 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/AssignmentMessage.java
@@ -43,8 +43,8 @@ public class AssignmentMessage implements Message, Evaluable {
expression = Expression.Utils.bytes2Expression(buf, offset + variable.length());
}
- public AssignmentMessage(Message message) {
- this(message.getContent().toByteArray(), 0);
+ public AssignmentMessage(ByteString bytes) {
+ this(bytes.toByteArray(), 0);
}
public Variable getVariable() {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
index 12818f7..f212ce6 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/arithmetic/expression/Expression.java
@@ -46,12 +46,10 @@ public interface Expression extends Evaluable {
class Utils {
public static Message toMessage(final Expression e) {
- return () -> {
- final byte[] buf = new byte[e.length()];
- final int length = e.toBytes(buf, 0);
- Preconditions.assertTrue(length == buf.length);
- return toByteString(buf);
- };
+ final byte[] buf = new byte[e.length()];
+ final int length = e.toBytes(buf, 0);
+ Preconditions.assertTrue(length == buf.length);
+ return Message.valueOf(toByteString(buf), () -> "Message:" + e);
}
public static Expression double2Expression(Double d) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
index 380f315..de794c2 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java
@@ -57,7 +57,7 @@ public class FileStoreClient implements Closeable {
static ByteString send(
ByteString request, CheckedFunction<Message, RaftClientReply, IOException> sendFunction)
throws IOException {
- final RaftClientReply reply = sendFunction.apply(() -> request);
+ final RaftClientReply reply = sendFunction.apply(Message.valueOf(request));
final StateMachineException sme = reply.getStateMachineException();
if (sme != null) {
throw new IOException("Failed to send request " + request, sme);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 5ecb387..c1b41c6 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -20,7 +20,6 @@ package org.apache.ratis.examples.filestore;
import org.apache.ratis.conf.ConfUtils;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.storage.RaftStorage;
@@ -72,17 +71,17 @@ public class FileStoreStateMachine extends BaseStateMachine {
}
@Override
- public CompletableFuture<RaftClientReply> query(RaftClientRequest request) {
+ public CompletableFuture<Message> query(Message request) {
final ReadRequestProto proto;
try {
- proto = ReadRequestProto.parseFrom(request.getMessage().getContent());
+ proto = ReadRequestProto.parseFrom(request.getContent());
} catch (InvalidProtocolBufferException e) {
return FileStoreCommon.completeExceptionally("Failed to parse " + request, e);
}
final String path = proto.getPath().toStringUtf8();
return files.read(path, proto.getOffset(), proto.getLength())
- .thenApply(reply -> new RaftClientReply(request, () -> reply.toByteString()));
+ .thenApply(reply -> Message.valueOf(reply.toByteString()));
}
@Override
@@ -162,13 +161,14 @@ public class FileStoreStateMachine extends BaseStateMachine {
long index, WriteRequestHeaderProto header, int size) {
final String path = header.getPath().toStringUtf8();
return files.submitCommit(index, path, header.getClose(), header.getOffset(), size)
- .thenApply(reply -> () -> reply.toByteString());
+ .thenApply(reply -> Message.valueOf(reply.toByteString()));
}
private CompletableFuture<Message> delete(long index, DeleteRequestProto request) {
final String path = request.getPath().toStringUtf8();
- return files.delete(index, path).thenApply(resolved -> () ->
- DeleteReplyProto.newBuilder().setResolvedPath(
- FileStoreCommon.toByteString(resolved)).build().toByteString());
+ return files.delete(index, path).thenApply(resolved ->
+ Message.valueOf(DeleteReplyProto.newBuilder().setResolvedPath(
+ FileStoreCommon.toByteString(resolved)).build().toByteString(),
+ () -> "Message:" + resolved));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index a619909..6f8f603 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -462,7 +462,8 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
if (request.isReadOnly()) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
- return stateMachine.query(request);
+ return stateMachine.query(request.getMessage())
+ .thenApply(r -> new RaftClientReply(request, r));
}
// query the retry cache
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index cbedb08..e2aaf29 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -19,7 +19,6 @@ package org.apache.ratis.statemachine;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -116,9 +115,8 @@ public interface StateMachine extends Closeable {
/**
* Query the state machine. The request must be read-only.
- * TODO: extend RaftClientRequest to have a read-only request subclass.
*/
- CompletableFuture<RaftClientReply> query(RaftClientRequest request);
+ CompletableFuture<Message> query(Message request);
/**
* Validate/pre-process the incoming update request in the state machine.
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 8f0b56a..201eff7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -20,7 +20,6 @@ package org.apache.ratis.statemachine.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.impl.RaftConfiguration;
@@ -107,8 +106,8 @@ public class BaseStateMachine implements StateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
// return the same message contained in the entry
- Message msg = () -> trx.getLogEntry().getSmLogEntry().getData();
- return CompletableFuture.completedFuture(msg);
+ return CompletableFuture.completedFuture(
+ Message.valueOf(trx.getLogEntry().getSmLogEntry().getData()));
}
@Override
@@ -159,8 +158,7 @@ public class BaseStateMachine implements StateMachine {
}
@Override
- public CompletableFuture<RaftClientReply> query(
- RaftClientRequest request) {
+ public CompletableFuture<Message> query(Message request) {
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index 7ad5bd2..e69f35d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -175,11 +175,11 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
// Create client using another group
try(RaftClient client = cluster.createClient(anotherGroup)) {
testFailureCase("send(..) with client group being different from the server group",
- () -> client.send(() -> ByteString.EMPTY),
+ () -> client.send(Message.EMPTY),
GroupMismatchException.class);
testFailureCase("sendReadOnly(..) with client group being different from the server group",
- () -> client.sendReadOnly(() -> ByteString.EMPTY),
+ () -> client.sendReadOnly(Message.EMPTY),
GroupMismatchException.class);
testFailureCase("setConfiguration(..) with client group being different from the server group",
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/2d536568/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 86b7b66..261b9ef 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -21,7 +21,6 @@ import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.io.MD5Hash;
import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
@@ -31,16 +30,14 @@ import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.LogInputStream;
import org.apache.ratis.server.storage.LogOutputStream;
import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.statemachine.impl.TransactionContextImpl;
-import org.apache.ratis.util.Daemon;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.MD5FileUtil;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -216,11 +213,26 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
}
}
+ /**
+ * Query the n-th log entry.
+ * @param request an index represented in a UTF-8 String, or an empty message.
+ * @return a completed future of the n-th log entry,
+ * where n is the last applied index if the request is empty,
+ * otherwise, n is the index represented in the request.
+ */
@Override
- public CompletableFuture<RaftClientReply> query(
- RaftClientRequest request) {
- return CompletableFuture.completedFuture(
- new RaftClientReply(request, new SimpleMessage("query success")));
+ public CompletableFuture<Message> query(Message request) {
+ final ByteString bytes = request.getContent();
+ try {
+ final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
+ : Long.parseLong(bytes.toStringUtf8());
+ LOG.info("query log index " + index);
+ final LogEntryProto entry = list.get(Math.toIntExact(index));
+ return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
+ } catch (Exception e) {
+ LOG.warn("Failed request " + request, e);
+ return JavaUtils.completeExceptionally(e);
+ }
}
@Override