You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/10 09:35:28 UTC
[incubator-ratis] branch master updated: RATIS-1142. Remove
STREAM_CLOSE_FORWARD (#265)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6aa4d58 RATIS-1142. Remove STREAM_CLOSE_FORWARD (#265)
6aa4d58 is described below
commit 6aa4d58656fb35855724eb8085d897a7bc16d363
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Nov 10 17:28:58 2020 +0800
RATIS-1142. Remove STREAM_CLOSE_FORWARD (#265)
---
.../apache/ratis/client/DataStreamOutputRpc.java | 3 ---
.../ratis/client/impl/DataStreamClientImpl.java | 5 -----
.../ratis/netty/server/NettyServerStreamRpc.java | 26 ++++++++++++++--------
ratis-proto/src/main/proto/Raft.proto | 3 +--
4 files changed, 18 insertions(+), 19 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
index 19fb46b..ba48324 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/DataStreamOutputRpc.java
@@ -27,9 +27,6 @@ public interface DataStreamOutputRpc extends DataStreamOutput {
/** Get the future of the header request. */
CompletableFuture<DataStreamReply> getHeaderFuture();
- /** Peer close asynchronously. */
- CompletableFuture<DataStreamReply> closeForwardAsync();
-
/** Create a transaction asynchronously once the stream data is replicated to all servers */
CompletableFuture<DataStreamReply> startTransactionAsync();
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 2798de6..27583b6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -88,11 +88,6 @@ public class DataStreamClientImpl implements DataStreamClient {
}
@Override
- public CompletableFuture<DataStreamReply> closeForwardAsync() {
- return send(Type.STREAM_CLOSE_FORWARD);
- }
-
- @Override
public CompletableFuture<DataStreamReply> startTransactionAsync() {
return send(Type.START_TRANSACTION);
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 496e83c..dff6a87 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -32,6 +32,7 @@ import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -254,7 +255,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
return new StreamInfo(request, stateMachine.data().stream(request),
- server.getId().equals(request.getServerId())?
+ isPrimary(request.getServerId())?
proxies.getDataStreamOutput(request) : Collections.EMPTY_LIST);
} catch (Throwable e) {
throw new CompletionException(e);
@@ -357,6 +358,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
});
}
+ private boolean isPrimary(RaftPeerId primaryId) {
+ return server.getId().equals(primaryId);
+ }
+
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
@@ -379,7 +384,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
remoteWrites.add(previous.thenComposeAsync(v -> out.writeAsync(request.slice().nioBuffer()), executorService));
}
- } else if (request.getType() == Type.STREAM_CLOSE || request.getType() == Type.STREAM_CLOSE_FORWARD) {
+ } else if (request.getType() == Type.STREAM_CLOSE) {
info = streams.get(key);
final CompletableFuture<?> previous = info.getPrevious().get();
@@ -392,9 +397,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
}, executorService);
- if (request.getType() == Type.STREAM_CLOSE) {
+ if (isPrimary(info.getRequest().getServerId())) {
for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
- remoteWrites.add(previous.thenComposeAsync(v -> out.closeForwardAsync(), executorService));
+ remoteWrites.add(previous.thenComposeAsync(v -> out.closeAsync(), executorService));
}
}
} else if (request.getType() == Type.START_TRANSACTION) {
@@ -411,13 +416,16 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
if (request.getType() == Type.STREAM_HEADER
- || request.getType() == Type.STREAM_DATA
- || request.getType() == Type.STREAM_CLOSE_FORWARD) {
+ || request.getType() == Type.STREAM_DATA) {
sendReply(remoteWrites, request, bytesWritten, ctx);
} else if (request.getType() == Type.STREAM_CLOSE) {
- // after all server close stream, primary server start transaction
- // TODO(runzhiwang): send start transaction to leader directly
- startTransaction(info, request, ctx);
+ if (isPrimary(info.getRequest().getServerId())) {
+ // after all server close stream, primary server start transaction
+ // TODO(runzhiwang): send start transaction to leader directly
+ startTransaction(info, request, ctx);
+ } else {
+ sendReply(remoteWrites, request, bytesWritten, ctx);
+ }
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index f3517d6..c799e79 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -287,8 +287,7 @@ message DataStreamPacketHeaderProto {
STREAM_HEADER = 0;
STREAM_DATA = 1;
STREAM_CLOSE = 2;
- STREAM_CLOSE_FORWARD = 3;
- START_TRANSACTION = 4;
+ START_TRANSACTION = 3;
}
uint64 streamId = 1;