You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/16 23:47:27 UTC
[incubator-ratis] branch master updated: RATIS-1160.
DataStreamClientImpl.closeAsync() should be idempotent. (#281)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ffedbba RATIS-1160. DataStreamClientImpl.closeAsync() should be idempotent. (#281)
ffedbba is described below
commit ffedbba1141d636749d60bf3967f576bf71ea485
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Nov 17 07:47:19 2020 +0800
RATIS-1160. DataStreamClientImpl.closeAsync() should be idempotent. (#281)
---
.../ratis/client/impl/DataStreamClientImpl.java | 15 ++-
.../apache/ratis/protocol/RaftClientMessage.java | 11 ++-
.../org/apache/ratis/protocol/RaftClientReply.java | 10 +-
.../apache/ratis/protocol/RaftClientRequest.java | 10 +-
.../ratis/datastream/DataStreamBaseTest.java | 102 +++++++++++++++------
.../ratis/datastream/TestDataStreamNetty.java | 36 ++++----
6 files changed, 119 insertions(+), 65 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index c078218..2acf657 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -28,6 +28,9 @@ import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -57,6 +60,8 @@ public class DataStreamClientImpl implements DataStreamClient {
public final class DataStreamOutputImpl implements DataStreamOutputRpc {
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
+ private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier
+ = JavaUtils.memoize(() -> send(Type.STREAM_CLOSE));
private long streamOffset = 0;
@@ -81,6 +86,10 @@ public class DataStreamClientImpl implements DataStreamClient {
// send to the attached dataStreamClientRpc
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
+ if (isClosed()) {
+ return JavaUtils.completeExceptionally(new AlreadyClosedException(
+ clientId + ": stream already closed, request=" + header));
+ }
final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
streamOffset += buf.remaining();
return combineHeader(f);
@@ -88,7 +97,11 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public CompletableFuture<DataStreamReply> closeAsync() {
- return send(Type.STREAM_CLOSE);
+ return closeSupplier.get();
+ }
+
+ boolean isClosed() {
+ return closeSupplier.isInitialized();
}
@Override
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
index e914f7f..3dfe6d3 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientMessage.java
@@ -23,12 +23,13 @@ public abstract class RaftClientMessage implements RaftRpcMessage {
private final ClientId clientId;
private final RaftPeerId serverId;
private final RaftGroupId groupId;
+ private final long callId;
- public RaftClientMessage(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId) {
+ RaftClientMessage(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
this.clientId = clientId;
this.serverId = serverId;
this.groupId = groupId;
+ this.callId = callId;
}
@Override
@@ -54,9 +55,13 @@ public abstract class RaftClientMessage implements RaftRpcMessage {
return groupId;
}
+ public long getCallId() {
+ return callId;
+ }
+
@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + ":" + clientId + "->" + serverId
- + (groupId != null? "@" + groupId: "");
+ + (groupId != null? "@" + groupId: "") + ", cid=" + getCallId();
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index a9bcd83..7973e0c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -38,7 +38,6 @@ import java.util.Collections;
*/
public class RaftClientReply extends RaftClientMessage {
private final boolean success;
- private final long callId;
/**
* We mainly track two types of exceptions here:
@@ -71,9 +70,8 @@ public class RaftClientReply extends RaftClientMessage {
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
long callId, boolean success, Message message, RaftException exception,
long logIndex, Collection<CommitInfoProto> commitInfos) {
- super(clientId, serverId, groupId);
+ super(clientId, serverId, groupId, callId);
this.success = success;
- this.callId = callId;
this.message = message;
this.exception = exception;
this.logIndex = logIndex;
@@ -125,17 +123,13 @@ public class RaftClientReply extends RaftClientMessage {
return false;
}
- public long getCallId() {
- return callId;
- }
-
public long getLogIndex() {
return logIndex;
}
@Override
public String toString() {
- return super.toString() + ", cid=" + getCallId() + ", "
+ return super.toString() + ", "
+ (isSuccess()? "SUCCESS": "FAILED " + exception)
+ ", logIndex=" + getLogIndex() + ", commits" + ProtoUtils.toString(commitInfos);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 9bc623f..331ce20 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -191,7 +191,6 @@ public class RaftClientRequest extends RaftClientMessage {
r.getCallId(), message, RaftClientRequest.writeRequestType(), r.getSlidingWindowEntry());
}
- private final long callId;
private final Message message;
private final Type type;
@@ -204,8 +203,7 @@ public class RaftClientRequest extends RaftClientMessage {
public RaftClientRequest(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry) {
- super(clientId, serverId, groupId);
- this.callId = callId;
+ super(clientId, serverId, groupId, callId);
this.message = message;
this.type = type;
this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
@@ -216,10 +214,6 @@ public class RaftClientRequest extends RaftClientMessage {
return true;
}
- public long getCallId() {
- return callId;
- }
-
public SlidingWindowEntry getSlidingWindowEntry() {
return slidingWindowEntry;
}
@@ -238,7 +232,7 @@ public class RaftClientRequest extends RaftClientMessage {
@Override
public String toString() {
- return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
+ return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
+ type + ", " + getMessage();
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0cabd07..837e368 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -24,13 +24,16 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.proto.RaftProtos.*;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.GroupInfoReply;
import org.apache.ratis.protocol.GroupInfoRequest;
import org.apache.ratis.protocol.GroupListReply;
import org.apache.ratis.protocol.GroupListRequest;
import org.apache.ratis.protocol.GroupManagementRequest;
+import org.apache.ratis.protocol.RaftClientMessage;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
@@ -39,17 +42,19 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.ServerFactory;
-import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.BaseStateMachine;
import org.apache.ratis.statemachine.StateMachine.DataStream;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
+import org.slf4j.Logger;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -74,6 +79,12 @@ abstract class DataStreamBaseTest extends BaseTest {
return (byte) ('A' + pos%MODULUS);
}
+ static final ByteString MOCK = ByteString.copyFromUtf8("mock");
+
+ static ByteString bytesWritten2ByteString(long bytesWritten) {
+ return ByteString.copyFromUtf8("bytesWritten=" + bytesWritten);
+ }
+
private final Executor executor = Executors.newFixedThreadPool(16);
static class MultiDataStreamStateMachine extends BaseStateMachine {
@@ -81,9 +92,9 @@ abstract class DataStreamBaseTest extends BaseTest {
@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
- final SingleDataStream s = new SingleDataStream();
+ final SingleDataStream s = new SingleDataStream(request);
streams.put(request.getCallId(), s);
- return s.stream(request);
+ return CompletableFuture.completedFuture(s);
}
SingleDataStream getSingleDataStream(long callId) {
@@ -91,9 +102,9 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
- static class SingleDataStream {
+ static class SingleDataStream implements DataStream {
private int byteWritten = 0;
- private RaftClientRequest writeRequest;
+ private final RaftClientRequest writeRequest;
final WritableByteChannel channel = new WritableByteChannel() {
private volatile boolean open = true;
@@ -122,7 +133,6 @@ abstract class DataStreamBaseTest extends BaseTest {
}
};
- final DataStream stream = new DataStream() {
@Override
public WritableByteChannel getWritableByteChannel() {
return channel;
@@ -137,11 +147,9 @@ abstract class DataStreamBaseTest extends BaseTest {
}
return CompletableFuture.completedFuture(null);
}
- };
- public CompletableFuture<DataStream> stream(RaftClientRequest request) {
- writeRequest = request;
- return CompletableFuture.completedFuture(stream);
+ SingleDataStream(RaftClientRequest request) {
+ this.writeRequest = request;
}
public int getByteWritten() {
@@ -204,7 +212,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
- public StateMachine getStateMachine(RaftGroupId groupId) {
+ public MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) {
return stateMachines.computeIfAbsent(groupId, key -> new MultiDataStreamStateMachine());
}
@@ -241,7 +249,7 @@ abstract class DataStreamBaseTest extends BaseTest {
@Override
public RaftClientReply submitClientRequest(RaftClientRequest request) {
- return null;
+ return submitClientRequestAsync(request).join();
}
@Override
@@ -251,7 +259,11 @@ abstract class DataStreamBaseTest extends BaseTest {
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
- return null;
+ final MultiDataStreamStateMachine stateMachine = getStateMachine(request.getRaftGroupId());
+ final SingleDataStream stream = stateMachine.getSingleDataStream(request.getCallId());
+ Assert.assertFalse(stream.getWritableByteChannel().isOpen());
+ return CompletableFuture.completedFuture(new RaftClientReply(request,
+ () -> bytesWritten2ByteString(stream.getByteWritten()), null));
}
@Override
@@ -328,12 +340,12 @@ abstract class DataStreamBaseTest extends BaseTest {
List<RaftServer> raftServers = new ArrayList<>();
peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
- setup(peers, raftServers);
+ setup(RaftGroupId.randomId(), peers, raftServers);
}
- void setup(List<RaftPeer> peers, List<RaftServer> raftServers) {
- raftGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peers);
+ void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
+ raftGroup = RaftGroup.valueOf(groupId, peers);
servers = new ArrayList<>(peers.size());
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
@@ -358,6 +370,15 @@ abstract class DataStreamBaseTest extends BaseTest {
.build();
}
+ RaftClient newRaftClientForDataStream(ClientId clientId) {
+ return RaftClient.newBuilder()
+ .setClientId(clientId)
+ .setRaftGroup(raftGroup)
+ .setPrimaryDataStreamServer(getPrimaryServer().getPeer())
+ .setProperties(properties)
+ .build();
+ }
+
protected void shutdown() throws IOException {
for (Server server : servers) {
server.close();
@@ -400,9 +421,10 @@ abstract class DataStreamBaseTest extends BaseTest {
}
- void runTestMockCluster(int bufferSize, int bufferNum, Exception expectedException, Exception headerException)
+ void runTestMockCluster(ClientId clientId, int bufferSize, int bufferNum,
+ Exception expectedException, Exception headerException)
throws IOException {
- try (final RaftClient client = newRaftClientForDataStream()) {
+ try (final RaftClient client = newRaftClientForDataStream(clientId)) {
final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
if (headerException != null) {
final DataStreamReply headerReply = out.getHeaderFuture().join();
@@ -413,22 +435,18 @@ abstract class DataStreamBaseTest extends BaseTest {
return;
}
- runTestDataStream(out, bufferSize, bufferNum);
- DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer) out.closeAsync().join();
-
- final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
- RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
+ final RaftClientReply clientReply = runTestDataStream(out, bufferSize, bufferNum).join();
if (expectedException != null) {
- Assert.assertFalse(replyByteBuffer.isSuccess());
+ Assert.assertFalse(clientReply.isSuccess());
Assert.assertTrue(clientReply.getException().getMessage().contains(
expectedException.getMessage()));
} else {
- Assert.assertTrue(replyByteBuffer.isSuccess());
+ Assert.assertTrue(clientReply.isSuccess());
}
}
}
- private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+ CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
LOG.info("start stream {}", out.getHeader().getCallId());
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final List<Integer> sizes = new ArrayList<>();
@@ -466,6 +484,34 @@ abstract class DataStreamBaseTest extends BaseTest {
} catch (Throwable e) {
throw new CompletionException(e);
}
+
+ final long byteWritten = dataSize;
+ return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, byteWritten));
+ }
+
+ CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
+ long byteWritten) {
+ // Test close idempotent
+ Assert.assertSame(dataStreamReply, out.closeAsync().join());
+ testFailureCase("writeAsync should fail",
+ () -> out.writeAsync(DataStreamRequestByteBuffer.EMPTY_BYTE_BUFFER).join(),
+ CompletionException.class, (Logger)null, AlreadyClosedException.class);
+
+ try {
+ final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(RaftClientReplyProto.parseFrom(
+ ((DataStreamReplyByteBuffer) dataStreamReply).slice()));
+ assertRaftClientMessage(out.getHeader(), reply);
+ if (reply.isSuccess()) {
+ final ByteString bytes = reply.getMessage().getContent();
+ if (!bytes.equals(MOCK)) {
+ Assert.assertEquals(bytesWritten2ByteString(byteWritten), bytes);
+ }
+ }
+
+ return CompletableFuture.completedFuture(reply);
+ } catch (Throwable t) {
+ return JavaUtils.completeExceptionally(t);
+ }
}
void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
@@ -475,10 +521,10 @@ abstract class DataStreamBaseTest extends BaseTest {
Assert.assertEquals(dataSize, stream.getByteWritten());
final RaftClientRequest writeRequest = stream.getWriteRequest();
- assertRaftClientRequest(header, writeRequest);
+ assertRaftClientMessage(header, writeRequest);
}
- static void assertRaftClientRequest(RaftClientRequest expected, RaftClientRequest computed) {
+ static void assertRaftClientMessage(RaftClientRequest expected, RaftClientMessage computed) {
Assert.assertNotNull(computed);
Assert.assertEquals(expected.getClientId(), computed.getClientId());
Assert.assertEquals(expected.getServerId(), computed.getServerId());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index fdf68ad..bd9e6f0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -36,6 +36,7 @@ import org.apache.ratis.util.NetUtils;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -92,32 +93,33 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
ClientId clientId = ClientId.randomId();
RaftGroupId groupId = RaftGroupId.randomId();
final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" + leaderIndex).build();
- RaftClientReply expectedClientReply = new RaftClientReply(clientId, suggestedLeader.getId(), groupId, 0,
- leaderException == null, null, leaderException, 0, null);
for (int i = 0; i < numServers; i ++) {
RaftServer raftServer = mock(RaftServer.class);
- RaftClientReply raftClientReply;
RaftPeerId peerId = RaftPeerId.valueOf("s" + i);
RaftProperties properties = new RaftProperties();
NettyConfigKeys.DataStream.setPort(properties, NetUtils.createLocalServerAddress().getPort());
RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
- if (i == leaderIndex) {
- raftClientReply = expectedClientReply;
- } else {
- RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId.valueOf(peerId, groupId);
- NotLeaderException notLeaderException = new NotLeaderException(raftGroupMemberId, suggestedLeader, null);
- raftClientReply = new RaftClientReply(clientId, peerId,
- groupId, 0, false, null, notLeaderException, 0, null);
- }
-
if (submitException != null) {
when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
.thenThrow(submitException);
} else {
+ final boolean isLeader = i == leaderIndex;
when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
- .thenReturn(CompletableFuture.completedFuture(raftClientReply));
+ .thenAnswer((Answer<CompletableFuture<RaftClientReply>>) invocation -> {
+ final RaftClientRequest r = (RaftClientRequest) invocation.getArguments()[0];
+ final RaftClientReply reply;
+ if (isLeader) {
+ reply = leaderException != null? new RaftClientReply(r, leaderException, null)
+ : new RaftClientReply(r, () -> MOCK, null);
+ } else {
+ final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(peerId, groupId);
+ final NotLeaderException notLeaderException = new NotLeaderException(memberId, suggestedLeader, null);
+ reply = new RaftClientReply(r, notLeaderException, null);
+ }
+ return CompletableFuture.completedFuture(reply);
+ });
}
when(raftServer.getProperties()).thenReturn(properties);
@@ -131,18 +133,18 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
raftServers.add(raftServer);
}
- runTestMockCluster(raftServers, 1_000_000, 10,
+ runTestMockCluster(groupId, raftServers, clientId, 1_000_000, 10,
submitException != null ? submitException : leaderException, getStateMachineException);
}
- void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int bufferNum,
+ void runTestMockCluster(RaftGroupId groupId, List<RaftServer> raftServers, ClientId clientId, int bufferSize, int bufferNum,
Exception expectedException, Exception headerException) throws Exception {
try {
final List<RaftPeer> peers = raftServers.stream()
.map(TestDataStreamNetty::newRaftPeer)
.collect(Collectors.toList());
- setup(peers, raftServers);
- runTestMockCluster(bufferSize, bufferNum, expectedException, headerException);
+ setup(groupId, peers, raftServers);
+ runTestMockCluster(clientId, bufferSize, bufferNum, expectedException, headerException);
} finally {
shutdown();
}