You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/30 14:22:37 UTC
[incubator-ratis] branch master updated: RATIS-1276. Use
ClientInvocationId as key instead of StreamMap.Key (#387)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new c9c2c2f RATIS-1276. Use ClientInvocationId as key instead of StreamMap.Key (#387)
c9c2c2f is described below
commit c9c2c2ff6c4b72953c01725227807b7ba0036bde
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 30 22:21:10 2020 +0800
RATIS-1276. Use ClientInvocationId as key instead of StreamMap.Key (#387)
---
.../ratis/netty/server/DataStreamManagement.java | 47 +++-------------------
ratis-proto/src/main/proto/Raft.proto | 10 ++---
2 files changed, 10 insertions(+), 47 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 8ad60e8..43a6096 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -46,7 +46,6 @@ import org.apache.ratis.statemachine.StateMachine.DataStream;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
-import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.function.CheckedBiFunction;
@@ -57,7 +56,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
-import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -168,56 +166,21 @@ public class DataStreamManagement {
}
static class StreamMap {
- static class Key {
- private final ChannelId channelId;
- private final ClientId clientId;
- private final long streamId;
-
- Key(ChannelId channelId, ClientId clientId, long streamId) {
- this.channelId = channelId;
- this.clientId = clientId;
- this.streamId = streamId;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- } else if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final Key that = (Key) obj;
- return this.clientId.equals(that.clientId)
- && this.streamId == that.streamId
- && Objects.equals(this.channelId, that.channelId);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(channelId, clientId, streamId);
- }
-
- @Override
- public String toString() {
- return channelId + "-" + clientId + "-" + streamId;
- }
- }
-
- private final ConcurrentMap<Key, StreamInfo> map = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ClientInvocationId, StreamInfo> map = new ConcurrentHashMap<>();
- StreamInfo computeIfAbsent(Key key, Function<Key, StreamInfo> function) {
+ StreamInfo computeIfAbsent(ClientInvocationId key, Function<ClientInvocationId, StreamInfo> function) {
final StreamInfo info = map.computeIfAbsent(key, function);
LOG.debug("computeIfAbsent({}) returns {}", key, info);
return info;
}
- StreamInfo get(Key key) {
+ StreamInfo get(ClientInvocationId key) {
final StreamInfo info = map.get(key);
LOG.debug("get({}) returns {}", key, info);
return info;
}
- StreamInfo remove(Key key) {
+ StreamInfo remove(ClientInvocationId key) {
final StreamInfo info = map.remove(key);
LOG.debug("remove({}) returns {}", key, info);
return info;
@@ -379,7 +342,7 @@ public class DataStreamManagement {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
- final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getClientId(), request.getStreamId());
+ ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index b63b93a..3bcad1b 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -322,12 +322,12 @@ message DataStreamPacketHeaderProto {
CLOSE = 1;
}
- uint64 streamId = 1;
- uint64 streamOffset = 2;
- Type type = 3;
- repeated Option options = 4;
+ bytes clientId = 1;
+ Type type = 2;
+ uint64 streamId = 3;
+ uint64 streamOffset = 4;
uint64 dataLength = 5;
- bytes clientId = 6;
+ repeated Option options = 6;
}
message DataStreamRequestHeaderProto {