You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/18 23:44:36 UTC
[incubator-ratis] branch master updated: RATIS-1136. Add
DataStreamRequestTypeProto. (#287)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2adccb0 RATIS-1136. Add DataStreamRequestTypeProto. (#287)
2adccb0 is described below
commit 2adccb06a5a82d9d4da6547a6e64b1686607c047
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 19 07:40:54 2020 +0800
RATIS-1136. Add DataStreamRequestTypeProto. (#287)
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 5 +++
.../ratis/client/impl/DataStreamClientImpl.java | 4 +--
.../apache/ratis/protocol/RaftClientRequest.java | 33 +++++++++++++++----
ratis-proto/src/main/proto/Raft.proto | 4 +++
.../org/apache/ratis/server/JvmPauseMonitor.java | 2 +-
.../apache/ratis/server/impl/PendingRequests.java | 2 --
.../ratis/datastream/DataStreamBaseTest.java | 11 +++++--
.../ratis/datastream/DataStreamClusterTests.java | 37 ++++++++++++++--------
8 files changed, 70 insertions(+), 28 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4498cff..c0591af 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -88,6 +88,8 @@ public interface ClientProtoUtils {
switch (p.getTypeCase()) {
case WRITE:
return RaftClientRequest.Type.valueOf(p.getWrite());
+ case DATASTREAM:
+ return RaftClientRequest.Type.valueOf(p.getDataStream());
case MESSAGESTREAM:
return RaftClientRequest.Type.valueOf(p.getMessageStream());
case READ:
@@ -128,6 +130,9 @@ public interface ClientProtoUtils {
case WRITE:
b.setWrite(type.getWrite());
break;
+ case DATASTREAM:
+ b.setDataStream(type.getDataStream());
+ break;
case MESSAGESTREAM:
b.setMessageStream(type.getMessageStream());
break;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index c7acb6e..45ea1be 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -126,8 +126,8 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public DataStreamOutputRpc stream() {
- RaftClientRequest request = new RaftClientRequest(
- clientId, dataStreamServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
+ final RaftClientRequest request = new RaftClientRequest(clientId, dataStreamServer.getId(), groupId,
+ RaftClientImpl.nextCallId(), RaftClientRequest.dataStreamRequestType());
return new DataStreamOutputImpl(request);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 331ce20..0ef8626 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -29,17 +29,22 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.
* Request from client to server
*/
public class RaftClientRequest extends RaftClientMessage {
+ private static final Type DATA_STREAM_DEFAULT = new Type(DataStreamRequestTypeProto.getDefaultInstance());
private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance());
private static final Type WATCH_DEFAULT = new Type(
WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
- private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance());
- private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance());
+ private static final Type READ_DEFAULT = new Type(ReadRequestTypeProto.getDefaultInstance());
+ private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());
public static Type writeRequestType() {
return WRITE_DEFAULT;
}
+ public static Type dataStreamRequestType() {
+ return DATA_STREAM_DEFAULT;
+ }
+
public static Type messageStreamRequestType(long streamId, long messageId, boolean endOfRequest) {
return new Type(MessageStreamRequestTypeProto.newBuilder()
.setStreamId(streamId)
@@ -49,11 +54,11 @@ public class RaftClientRequest extends RaftClientMessage {
}
public static Type readRequestType() {
- return DEFAULT_READ;
+ return READ_DEFAULT;
}
public static Type staleReadRequestType(long minIndex) {
- return minIndex == 0L? DEFAULT_STALE_READ
+ return minIndex == 0L? STALE_READ_DEFAULT
: new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
}
@@ -70,13 +75,16 @@ public class RaftClientRequest extends RaftClientMessage {
return WRITE_DEFAULT;
}
+ public static Type valueOf(DataStreamRequestTypeProto dataStream) {
+ return DATA_STREAM_DEFAULT;
+ }
+
public static Type valueOf(ReadRequestTypeProto read) {
- return DEFAULT_READ;
+ return READ_DEFAULT;
}
public static Type valueOf(StaleReadRequestTypeProto staleRead) {
- return staleRead.getMinIndex() == 0? DEFAULT_STALE_READ
- : new Type(staleRead);
+ return staleRead.getMinIndex() == 0? STALE_READ_DEFAULT: new Type(staleRead);
}
public static Type valueOf(WatchRequestTypeProto watch) {
@@ -105,6 +113,10 @@ public class RaftClientRequest extends RaftClientMessage {
this(WRITE, write);
}
+ private Type(DataStreamRequestTypeProto dataStream) {
+ this(DATASTREAM, dataStream);
+ }
+
private Type(MessageStreamRequestTypeProto messageStream) {
this(MESSAGESTREAM, messageStream);
}
@@ -134,6 +146,11 @@ public class RaftClientRequest extends RaftClientMessage {
return (WriteRequestTypeProto)proto;
}
+ public DataStreamRequestTypeProto getDataStream() {
+ Preconditions.assertTrue(is(DATASTREAM));
+ return (DataStreamRequestTypeProto)proto;
+ }
+
public MessageStreamRequestTypeProto getMessageStream() {
Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
return (MessageStreamRequestTypeProto)proto;
@@ -171,6 +188,8 @@ public class RaftClientRequest extends RaftClientMessage {
switch (typeCase) {
case WRITE:
return "RW";
+ case DATASTREAM:
+ return "DataStream";
case MESSAGESTREAM:
return toString(getMessageStream());
case READ:
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index b6ae878..1749118 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -256,6 +256,9 @@ message MessageStreamRequestTypeProto {
bool endOfRequest = 3;// Is this the end-of-request?
}
+message DataStreamRequestTypeProto {
+}
+
message ReadRequestTypeProto {
}
@@ -279,6 +282,7 @@ message RaftClientRequestProto {
StaleReadRequestTypeProto staleRead = 5;
WatchRequestTypeProto watch = 6;
MessageStreamRequestTypeProto messageStream = 7;
+ DataStreamRequestTypeProto dataStream = 8;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
index accd1fc..9470110 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
@@ -137,7 +137,7 @@ public class JvmPauseMonitor {
long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
- if (extraSleepTime > 0) {
+ if (extraSleepTime > 100) {
LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
}
try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 9892d06..5a3c493 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.RaftClientReply;
@@ -193,7 +192,6 @@ class PendingRequests {
PendingRequest add(Permit permit, RaftClientRequest request, TransactionContext entry) {
// externally synced for now
- Preconditions.assertTrue(request.is(RaftClientRequestProto.TypeCase.WRITE));
final long index = entry.getLogEntry().getIndex();
LOG.debug("{}: addPendingRequest at index={}, request={}", name, index, request);
final PendingRequest pending = new PendingRequest(index, request, entry);
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 5b7baf9..b00d1ff 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -59,7 +59,6 @@ import org.slf4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -137,7 +136,7 @@ abstract class DataStreamBaseTest extends BaseTest {
final StateMachineDataChannel channel = new StateMachineDataChannel() {
@Override
- public void force(boolean metadata) throws IOException {
+ public void force(boolean metadata) {
forcedPosition = byteWritten;
}
@@ -560,13 +559,19 @@ abstract class DataStreamBaseTest extends BaseTest {
}
void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
+ // check header
+ Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
+ Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
+
+ // check stream
final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
final SingleDataStream stream = s.getSingleDataStream(header);
- Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(dataSize, stream.getByteWritten());
Assert.assertEquals(dataSize, stream.getForcedPosition());
+ // check writeRequest
final RaftClientRequest writeRequest = stream.getWriteRequest();
+ Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
assertRaftClientMessage(header, writeRequest);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 4b9cac2..bf1210e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -57,28 +58,38 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
Assert.assertEquals(NUM_SERVERS, peers.size());
RaftPeer raftPeer = peers.iterator().next();
+ final ClientId clientId;
+ final long callId;
try (RaftClient client = cluster.createClient(raftPeer)) {
+ clientId = client.getId();
+
// send a stream request
- final long callId;
try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
DataStreamBaseTest.writeAndAssertReplies(out, 1000, 10);
callId = out.getHeader().getCallId();
}
+ }
- // verify the write request is in the Raft log.
- RaftLog log = leader.getState().getLog();
- boolean transactionFound = false;
- for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
- final LogEntryProto entryProto = log.get(termIndex.getIndex());
- if (entryProto.hasStateMachineLogEntry()) {
- StateMachineLogEntryProto stateMachineEntryProto = entryProto.getStateMachineLogEntry();
- if (stateMachineEntryProto.getCallId() == callId) {
- transactionFound = true;
- break;
- }
+ // verify the write request is in the Raft log.
+ final RaftLog log = leader.getState().getLog();
+ final LogEntryProto entry = findLogEntry(clientId, callId, log);
+ LOG.info("entry={}", entry);
+ Assert.assertNotNull(entry);
+ }
+
+ static LogEntryProto findLogEntry(ClientId clientId, long callId, RaftLog log) throws Exception {
+ for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
+ final LogEntryProto entry = log.get(termIndex.getIndex());
+ if (entry.hasStateMachineLogEntry()) {
+ final StateMachineLogEntryProto stateMachineEntry = entry.getStateMachineLogEntry();
+ if (stateMachineEntry.getCallId() == callId) {
+ if (clientId.equals(ClientId.valueOf(stateMachineEntry.getClientId()))) {
+ return entry;
+ }
}
}
- Assert.assertTrue(transactionFound);
}
+ return null;
}
+
}