You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/30 14:22:37 UTC

[incubator-ratis] branch master updated: RATIS-1276. Use ClientInvocationId as key instead of StreamMap.Key (#387)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c9c2c2f  RATIS-1276. Use ClientInvocationId as key instead of StreamMap.Key (#387)
c9c2c2f is described below

commit c9c2c2ff6c4b72953c01725227807b7ba0036bde
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 30 22:21:10 2020 +0800

    RATIS-1276. Use ClientInvocationId as key instead of StreamMap.Key (#387)
---
 .../ratis/netty/server/DataStreamManagement.java   | 47 +++-------------------
 ratis-proto/src/main/proto/Raft.proto              | 10 ++---
 2 files changed, 10 insertions(+), 47 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 8ad60e8..43a6096 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -46,7 +46,6 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
 import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.function.CheckedBiFunction;
@@ -57,7 +56,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -168,56 +166,21 @@ public class DataStreamManagement {
   }
 
   static class StreamMap {
-    static class Key {
-      private final ChannelId channelId;
-      private final ClientId clientId;
-      private final long streamId;
-
-      Key(ChannelId channelId, ClientId clientId, long streamId) {
-        this.channelId = channelId;
-        this.clientId = clientId;
-        this.streamId = streamId;
-      }
-
-      @Override
-      public boolean equals(Object obj) {
-        if (this == obj) {
-          return true;
-        } else if (obj == null || getClass() != obj.getClass()) {
-          return false;
-        }
-        final Key that = (Key) obj;
-        return this.clientId.equals(that.clientId)
-            && this.streamId == that.streamId
-            && Objects.equals(this.channelId, that.channelId);
-      }
-
-      @Override
-      public int hashCode() {
-        return Objects.hash(channelId, clientId, streamId);
-      }
-
-      @Override
-      public String toString() {
-        return channelId + "-" + clientId + "-" + streamId;
-      }
-    }
-
-    private final ConcurrentMap<Key, StreamInfo> map = new ConcurrentHashMap<>();
+    private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new ConcurrentHashMap<>();
 
-    StreamInfo computeIfAbsent(Key key, Function<Key, StreamInfo> function) {
+    StreamInfo computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, StreamInfo> function) {
       final StreamInfo info = map.computeIfAbsent(key, function);
       LOG.debug("computeIfAbsent({}) returns {}", key, info);
       return info;
     }
 
-    StreamInfo get(Key key) {
+    StreamInfo get(ClientInvocationId key) {
       final StreamInfo info = map.get(key);
       LOG.debug("get({}) returns {}", key, info);
       return info;
     }
 
-    StreamInfo remove(Key key) {
+    StreamInfo remove(ClientInvocationId key) {
       final StreamInfo info = map.remove(key);
       LOG.debug("remove({}) returns {}", key, info);
       return info;
@@ -379,7 +342,7 @@ public class DataStreamManagement {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
     boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
-    final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getClientId(), request.getStreamId());
+    ClientInvocationId key =  ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
       final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index b63b93a..3bcad1b 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -322,12 +322,12 @@ message DataStreamPacketHeaderProto {
     CLOSE = 1;
   }
 
-  uint64 streamId = 1;
-  uint64 streamOffset = 2;
-  Type type = 3;
-  repeated Option options = 4;
+  bytes clientId = 1;
+  Type type = 2;
+  uint64 streamId = 3;
+  uint64 streamOffset = 4;
   uint64 dataLength = 5;
-  bytes clientId = 6;
+  repeated Option options = 6;
 }
 
 message DataStreamRequestHeaderProto {