You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/18 23:44:36 UTC

[incubator-ratis] branch master updated: RATIS-1136. Add DataStreamRequestTypeProto. (#287)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2adccb0  RATIS-1136. Add DataStreamRequestTypeProto. (#287)
2adccb0 is described below

commit 2adccb06a5a82d9d4da6547a6e64b1686607c047
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 19 07:40:54 2020 +0800

    RATIS-1136. Add DataStreamRequestTypeProto. (#287)
---
 .../apache/ratis/client/impl/ClientProtoUtils.java |  5 +++
 .../ratis/client/impl/DataStreamClientImpl.java    |  4 +--
 .../apache/ratis/protocol/RaftClientRequest.java   | 33 +++++++++++++++----
 ratis-proto/src/main/proto/Raft.proto              |  4 +++
 .../org/apache/ratis/server/JvmPauseMonitor.java   |  2 +-
 .../apache/ratis/server/impl/PendingRequests.java  |  2 --
 .../ratis/datastream/DataStreamBaseTest.java       | 11 +++++--
 .../ratis/datastream/DataStreamClusterTests.java   | 37 ++++++++++++++--------
 8 files changed, 70 insertions(+), 28 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4498cff..c0591af 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -88,6 +88,8 @@ public interface ClientProtoUtils {
     switch (p.getTypeCase()) {
       case WRITE:
         return RaftClientRequest.Type.valueOf(p.getWrite());
+      case DATASTREAM:
+        return RaftClientRequest.Type.valueOf(p.getDataStream());
       case MESSAGESTREAM:
         return RaftClientRequest.Type.valueOf(p.getMessageStream());
       case READ:
@@ -128,6 +130,9 @@ public interface ClientProtoUtils {
       case WRITE:
         b.setWrite(type.getWrite());
         break;
+      case DATASTREAM:
+        b.setDataStream(type.getDataStream());
+        break;
       case MESSAGESTREAM:
         b.setMessageStream(type.getMessageStream());
         break;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index c7acb6e..45ea1be 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -126,8 +126,8 @@ public class DataStreamClientImpl implements DataStreamClient {
 
   @Override
   public DataStreamOutputRpc stream() {
-    RaftClientRequest request = new RaftClientRequest(
-        clientId, dataStreamServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
+    final RaftClientRequest request = new RaftClientRequest(clientId, dataStreamServer.getId(), groupId,
+        RaftClientImpl.nextCallId(), RaftClientRequest.dataStreamRequestType());
     return new DataStreamOutputImpl(request);
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 331ce20..0ef8626 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -29,17 +29,22 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.
  * Request from client to server
  */
 public class RaftClientRequest extends RaftClientMessage {
+  private static final Type DATA_STREAM_DEFAULT = new Type(DataStreamRequestTypeProto.getDefaultInstance());
   private static final Type WRITE_DEFAULT = new Type(WriteRequestTypeProto.getDefaultInstance());
   private static final Type WATCH_DEFAULT = new Type(
       WatchRequestTypeProto.newBuilder().setIndex(0L).setReplication(ReplicationLevel.MAJORITY).build());
 
-  private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance());
-  private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance());
+  private static final Type READ_DEFAULT = new Type(ReadRequestTypeProto.getDefaultInstance());
+  private static final Type STALE_READ_DEFAULT = new Type(StaleReadRequestTypeProto.getDefaultInstance());
 
   public static Type writeRequestType() {
     return WRITE_DEFAULT;
   }
 
+  public static Type dataStreamRequestType() {
+    return DATA_STREAM_DEFAULT;
+  }
+
   public static Type messageStreamRequestType(long streamId, long messageId, boolean endOfRequest) {
     return new Type(MessageStreamRequestTypeProto.newBuilder()
         .setStreamId(streamId)
@@ -49,11 +54,11 @@ public class RaftClientRequest extends RaftClientMessage {
   }
 
   public static Type readRequestType() {
-    return DEFAULT_READ;
+    return READ_DEFAULT;
   }
 
   public static Type staleReadRequestType(long minIndex) {
-    return minIndex == 0L? DEFAULT_STALE_READ
+    return minIndex == 0L? STALE_READ_DEFAULT
         : new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
   }
 
@@ -70,13 +75,16 @@ public class RaftClientRequest extends RaftClientMessage {
       return WRITE_DEFAULT;
     }
 
+    public static Type valueOf(DataStreamRequestTypeProto dataStream) {
+      return DATA_STREAM_DEFAULT;
+    }
+
     public static Type valueOf(ReadRequestTypeProto read) {
-      return DEFAULT_READ;
+      return READ_DEFAULT;
     }
 
     public static Type valueOf(StaleReadRequestTypeProto staleRead) {
-      return staleRead.getMinIndex() == 0? DEFAULT_STALE_READ
-          : new Type(staleRead);
+      return staleRead.getMinIndex() == 0? STALE_READ_DEFAULT: new Type(staleRead);
     }
 
     public static Type valueOf(WatchRequestTypeProto watch) {
@@ -105,6 +113,10 @@ public class RaftClientRequest extends RaftClientMessage {
       this(WRITE, write);
     }
 
+    private Type(DataStreamRequestTypeProto dataStream) {
+      this(DATASTREAM, dataStream);
+    }
+
     private Type(MessageStreamRequestTypeProto messageStream) {
       this(MESSAGESTREAM, messageStream);
     }
@@ -134,6 +146,11 @@ public class RaftClientRequest extends RaftClientMessage {
       return (WriteRequestTypeProto)proto;
     }
 
+    public DataStreamRequestTypeProto getDataStream() {
+      Preconditions.assertTrue(is(DATASTREAM));
+      return (DataStreamRequestTypeProto)proto;
+    }
+
     public MessageStreamRequestTypeProto getMessageStream() {
       Preconditions.assertTrue(is(MESSAGESTREAM), () -> "proto = " + proto);
       return (MessageStreamRequestTypeProto)proto;
@@ -171,6 +188,8 @@ public class RaftClientRequest extends RaftClientMessage {
       switch (typeCase) {
         case WRITE:
           return "RW";
+        case DATASTREAM:
+          return "DataStream";
         case MESSAGESTREAM:
           return toString(getMessageStream());
         case READ:
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index b6ae878..1749118 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -256,6 +256,9 @@ message MessageStreamRequestTypeProto {
   bool endOfRequest = 3;// Is this the end-of-request?
 }
 
+message DataStreamRequestTypeProto {
+}
+
 message ReadRequestTypeProto {
 }
 
@@ -279,6 +282,7 @@ message RaftClientRequestProto {
     StaleReadRequestTypeProto staleRead = 5;
     WatchRequestTypeProto watch = 6;
     MessageStreamRequestTypeProto messageStream = 7;
+    DataStreamRequestTypeProto dataStream = 8;
   }
 }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
index accd1fc..9470110 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/JvmPauseMonitor.java
@@ -137,7 +137,7 @@ public class JvmPauseMonitor {
         long extraSleepTime = sw.elapsed(TimeUnit.MILLISECONDS) - SLEEP_INTERVAL_MS;
         Map<String, GcTimes> gcTimesAfterSleep = getGcTimes();
 
-        if (extraSleepTime > 0) {
+        if (extraSleepTime > 100) {
           LOG.warn(formatMessage(extraSleepTime, gcTimesAfterSleep, gcTimesBeforeSleep));
         }
         try {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index 9892d06..5a3c493 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -193,7 +192,6 @@ class PendingRequests {
 
   PendingRequest add(Permit permit, RaftClientRequest request, TransactionContext entry) {
     // externally synced for now
-    Preconditions.assertTrue(request.is(RaftClientRequestProto.TypeCase.WRITE));
     final long index = entry.getLogEntry().getIndex();
     LOG.debug("{}: addPendingRequest at index={}, request={}", name, index, request);
     final PendingRequest pending = new PendingRequest(index, request, entry);
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 5b7baf9..b00d1ff 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -59,7 +59,6 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -137,7 +136,7 @@ abstract class DataStreamBaseTest extends BaseTest {
 
     final StateMachineDataChannel channel = new StateMachineDataChannel() {
       @Override
-      public void force(boolean metadata) throws IOException {
+      public void force(boolean metadata) {
         forcedPosition = byteWritten;
       }
 
@@ -560,13 +559,19 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
   void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
+    // check header
+    Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
+    Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), header.getType());
+
+    // check stream
     final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
     final SingleDataStream stream = s.getSingleDataStream(header);
-    Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
     Assert.assertEquals(dataSize, stream.getByteWritten());
     Assert.assertEquals(dataSize, stream.getForcedPosition());
 
+    // check writeRequest
     final RaftClientRequest writeRequest = stream.getWriteRequest();
+    Assert.assertEquals(RaftClientRequest.dataStreamRequestType(), writeRequest.getType());
     assertRaftClientMessage(header, writeRequest);
   }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 4b9cac2..bf1210e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -24,6 +24,7 @@ import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.server.impl.RaftServerImpl;
@@ -57,28 +58,38 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
     Assert.assertEquals(NUM_SERVERS, peers.size());
     RaftPeer raftPeer = peers.iterator().next();
 
+    final ClientId clientId;
+    final long callId;
     try (RaftClient client = cluster.createClient(raftPeer)) {
+      clientId = client.getId();
+
       // send a stream request
-      final long callId;
       try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
         DataStreamBaseTest.writeAndAssertReplies(out, 1000, 10);
         callId = out.getHeader().getCallId();
       }
+    }
 
-      // verify the write request is in the Raft log.
-      RaftLog log = leader.getState().getLog();
-      boolean transactionFound = false;
-      for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
-        final LogEntryProto entryProto = log.get(termIndex.getIndex());
-        if (entryProto.hasStateMachineLogEntry()) {
-            StateMachineLogEntryProto stateMachineEntryProto = entryProto.getStateMachineLogEntry();
-            if (stateMachineEntryProto.getCallId() == callId) {
-              transactionFound = true;
-              break;
-            }
+    // verify the write request is in the Raft log.
+    final RaftLog log = leader.getState().getLog();
+    final LogEntryProto entry = findLogEntry(clientId, callId, log);
+    LOG.info("entry={}", entry);
+    Assert.assertNotNull(entry);
+  }
+
+  static LogEntryProto findLogEntry(ClientId clientId, long callId, RaftLog log) throws Exception {
+    for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
+      final LogEntryProto entry = log.get(termIndex.getIndex());
+      if (entry.hasStateMachineLogEntry()) {
+        final StateMachineLogEntryProto stateMachineEntry = entry.getStateMachineLogEntry();
+        if (stateMachineEntry.getCallId() == callId) {
+          if (clientId.equals(ClientId.valueOf(stateMachineEntry.getClientId()))) {
+            return entry;
+          }
         }
       }
-      Assert.assertTrue(transactionFound);
     }
+    return null;
   }
+
 }