You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/15 11:06:18 UTC
[incubator-ratis] branch master updated: RATIS-1150. Return
DataStreamException to client (#277)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a054165 RATIS-1150. Return DataStreamException to client (#277)
a054165 is described below
commit a05416545a4b0d1fe3f9e2d19f1f4fd5c1d772f5
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Sun Nov 15 19:06:09 2020 +0800
RATIS-1150. Return DataStreamException to client (#277)
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 38 +++++++++++++++
.../org/apache/ratis/protocol/RaftClientReply.java | 8 ++-
.../protocol/exceptions/DataStreamException.java | 30 ++++++++++++
.../ratis/netty/server/DataStreamManagement.java | 37 ++++++++++----
ratis-proto/src/main/proto/Raft.proto | 7 +++
.../ratis/datastream/DataStreamBaseTest.java | 10 ++--
.../ratis/datastream/TestDataStreamNetty.java | 57 +++++++++++++++-------
7 files changed, 152 insertions(+), 35 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 673d58f..5a7adf7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.client.impl;
import java.util.Optional;
import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
@@ -34,6 +35,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.ALREADYCLOSEDEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.DATASTREAMEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERNOTREADYEXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
@@ -191,6 +193,17 @@ public interface ClientProtoUtils {
b.setStateMachineException(smeBuilder.build());
}
+ final DataStreamException dse = reply.getDataStreamException();
+ if (dse != null) {
+ DataStreamExceptionProto.Builder dseBuilder =
+ DataStreamExceptionProto.newBuilder();
+ final Throwable t = dse.getCause() != null ? dse.getCause() : dse;
+ dseBuilder.setExceptionClassName(t.getClass().getName())
+ .setErrorMsg(t.getMessage())
+ .setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace()));
+ b.setDataStreamException(dseBuilder.build());
+ }
+
final NotReplicatedException nre = reply.getNotReplicatedException();
if (nre != null) {
final NotReplicatedExceptionProto.Builder nreBuilder = NotReplicatedExceptionProto.newBuilder()
@@ -282,6 +295,10 @@ public interface ClientProtoUtils {
StateMachineExceptionProto smeProto = replyProto.getStateMachineException();
e = wrapStateMachineException(serverMemberId,
smeProto.getExceptionClassName(), smeProto.getErrorMsg(), smeProto.getStacktrace());
+ } else if (replyProto.getExceptionDetailsCase().equals(DATASTREAMEXCEPTION)) {
+ DataStreamExceptionProto dseProto = replyProto.getDataStreamException();
+ e = wrapDataStreamException(serverMemberId.getPeerId(),
+ dseProto.getExceptionClassName(), dseProto.getErrorMsg(), dseProto.getStacktrace());
} else if (replyProto.getExceptionDetailsCase().equals(LEADERNOTREADYEXCEPTION)) {
LeaderNotReadyExceptionProto lnreProto = replyProto.getLeaderNotReadyException();
e = new LeaderNotReadyException(ProtoUtils.toRaftGroupMemberId(lnreProto.getServerId()));
@@ -344,6 +361,27 @@ public interface ClientProtoUtils {
return sme;
}
+ static DataStreamException wrapDataStreamException(
+ RaftPeerId peerId, String className, String errorMsg, ByteString stackTraceBytes) {
+ DataStreamException dse;
+ if (className == null) {
+ dse = new DataStreamException(errorMsg);
+ } else {
+ try {
+ Class<?> clazz = Class.forName(className);
+ final Exception e = ReflectionUtils.instantiateException(
+ clazz.asSubclass(Exception.class), errorMsg, null);
+ dse = new DataStreamException(peerId, e);
+ } catch (Exception e) {
+ dse = new DataStreamException(className + ": " + errorMsg);
+ }
+ }
+ StackTraceElement[] stacktrace =
+ (StackTraceElement[]) ProtoUtils.toObject(stackTraceBytes);
+ dse.setStackTrace(stacktrace);
+ return dse;
+ }
+
static AlreadyClosedException wrapAlreadyClosedException(
String className, String errorMsg, ByteString stackTraceBytes) {
AlreadyClosedException ace;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 36e2fd2..a9bcd83 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -19,6 +19,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
@@ -84,7 +85,7 @@ public class RaftClientReply extends RaftClientMessage {
Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
AlreadyClosedException.class,
NotLeaderException.class, NotReplicatedException.class,
- LeaderNotReadyException.class, StateMachineException.class),
+ LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class),
() -> "Unexpected exception class: " + this);
}
}
@@ -167,6 +168,11 @@ public class RaftClientReply extends RaftClientMessage {
return JavaUtils.cast(exception, StateMachineException.class);
}
+ /** If this reply has {@link DataStreamException}, return it; otherwise return null. */
+ public DataStreamException getDataStreamException() {
+ return JavaUtils.cast(exception, DataStreamException.class);
+ }
+
public LeaderNotReadyException getLeaderNotReadyException() {
return JavaUtils.cast(exception, LeaderNotReadyException.class);
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/DataStreamException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/DataStreamException.java
new file mode 100644
index 0000000..de10d6f
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/DataStreamException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftPeerId;
+
+public class DataStreamException extends RaftException {
+ public DataStreamException(RaftPeerId peerId, Throwable cause) {
+ super(cause.getClass().getName() + " from Server " + peerId + ": " + cause.getMessage(), cause);
+ }
+
+ public DataStreamException(String msg) {
+ super(msg);
+ }
+}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 8808a95..6ea857d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -28,6 +28,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.statemachine.StateMachine;
@@ -305,13 +306,7 @@ public class DataStreamManagement {
}
}, executor);
} catch (IOException e) {
- // TODO include IOException in the reply
- final DataStreamReplyByteBuffer reply = DataStreamReplyByteBuffer.newBuilder()
- .setDataStreamPacket(request)
- .setSuccess(false)
- .build();
- ctx.writeAndFlush(reply);
- return CompletableFuture.completedFuture(null);
+ throw new CompletionException(e);
}
}
@@ -332,6 +327,13 @@ public class DataStreamManagement {
ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, chosen));
}
+ static void replyDataStreamException(RaftServer server, Throwable cause, RaftClientRequest raftClientRequest,
+ DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+ DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
+ RaftClientReply reply = new RaftClientReply(raftClientRequest, dataStreamException, null);
+ ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+ }
+
static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
ChannelHandlerContext ctx, Executor executor) {
final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
@@ -371,7 +373,15 @@ public class DataStreamManagement {
// for peers to start transaction
final StreamInfo info = streams.get(key);
composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
- .thenAccept(v -> buf.release());
+ .whenComplete((v, exception) -> {
+ try {
+ if (exception != null) {
+ replyDataStreamException(server, exception, info.getRequest(), request, ctx);
+ }
+ } finally {
+ buf.release();
+ }
+ });
return;
}
@@ -410,9 +420,16 @@ public class DataStreamManagement {
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
- buf.release();
return null;
- }, executor));
+ }, executor)).whenComplete((v, exception) -> {
+ try {
+ if (exception != null) {
+ replyDataStreamException(server, exception, info.getRequest(), request, ctx);
+ }
+ } finally {
+ buf.release();
+ }
+ });
}
static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index c799e79..f3ca97f 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -333,6 +333,12 @@ message AlreadyClosedExceptionProto {
bytes stacktrace = 3;
}
+message DataStreamExceptionProto {
+ string exceptionClassName = 1;
+ string errorMsg = 2;
+ bytes stacktrace = 3;
+}
+
message RaftClientReplyProto {
RaftRpcReplyProto rpcReply = 1;
ClientMessageEntryProto message = 2;
@@ -343,6 +349,7 @@ message RaftClientReplyProto {
StateMachineExceptionProto stateMachineException = 5;
LeaderNotReadyExceptionProto leaderNotReadyException = 6;
AlreadyClosedExceptionProto alreadyClosedException = 7;
+ DataStreamExceptionProto dataStreamException = 8;
}
uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 33a90a1..8fc78a1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,6 +39,7 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
@@ -400,7 +401,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
- void runTestMockCluster(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
+ void runTestMockCluster(int bufferSize, int bufferNum, Exception expectedException)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream()) {
final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
@@ -409,13 +410,10 @@ abstract class DataStreamBaseTest extends BaseTest {
final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
- Assert.assertEquals(clientReply.getCallId(), expectedClientReply.getCallId());
- Assert.assertEquals(clientReply.getClientId(), expectedClientReply.getClientId());
- Assert.assertEquals(clientReply.getLogIndex(), expectedClientReply.getLogIndex());
- if (expectedClientReply.getException() != null) {
+ if (expectedException != null) {
Assert.assertFalse(replyByteBuffer.isSuccess());
Assert.assertTrue(clientReply.getException().getMessage().contains(
- expectedClientReply.getException().getMessage()));
+ expectedException.getMessage()));
} else {
Assert.assertTrue(replyByteBuffer.isSuccess());
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 21800ab..edfc8da 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -37,6 +37,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -80,15 +81,14 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
runTestDataStream(3);
}
- private void testMockCluster(int leaderIndex, int numServers, RaftException leaderException) throws Exception {
+ private void testMockCluster(int leaderIndex, int numServers, RaftException leaderException,
+ Exception submitException) throws Exception {
List<RaftServer> raftServers = new ArrayList<>();
ClientId clientId = ClientId.randomId();
RaftGroupId groupId = RaftGroupId.randomId();
- long callId = 100;
- long longIndex = 200;
final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" + leaderIndex).build();
- RaftClientReply expectedClientReply = new RaftClientReply(clientId, suggestedLeader.getId(), groupId, callId,
- leaderException == null ? true : false, null, leaderException, longIndex, null);
+ RaftClientReply expectedClientReply = new RaftClientReply(clientId, suggestedLeader.getId(), groupId, 0,
+ leaderException == null ? true : false, null, leaderException, 0, null);
for (int i = 0; i < numServers; i ++) {
RaftServer raftServer = mock(RaftServer.class);
@@ -104,11 +104,17 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId.valueOf(peerId, groupId);
NotLeaderException notLeaderException = new NotLeaderException(raftGroupMemberId, suggestedLeader, null);
raftClientReply = new RaftClientReply(clientId, peerId,
- groupId, callId, false, null, notLeaderException, longIndex, null);
+ groupId, 0, false, null, notLeaderException, 0, null);
+ }
+
+ if (submitException != null) {
+ when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
+ .thenThrow(submitException);
+ } else {
+ when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
+ .thenReturn(CompletableFuture.completedFuture(raftClientReply));
}
- when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
- .thenReturn(CompletableFuture.completedFuture(raftClientReply));
when(raftServer.getProperties()).thenReturn(properties);
when(raftServer.getId()).thenReturn(peerId);
when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new MultiDataStreamStateMachine());
@@ -116,17 +122,18 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
raftServers.add(raftServer);
}
- runTestMockCluster(raftServers, 1_000_000, 10, expectedClientReply);
+ runTestMockCluster(raftServers, 1_000_000, 10,
+ submitException != null ? submitException : leaderException);
}
void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int bufferNum,
- RaftClientReply expectedClientReply) throws Exception {
+ Exception expectedException) throws Exception {
try {
final List<RaftPeer> peers = raftServers.stream()
.map(TestDataStreamNetty::newRaftPeer)
.collect(Collectors.toList());
setup(peers, raftServers);
- runTestMockCluster(bufferSize, bufferNum, expectedClientReply);
+ runTestMockCluster(bufferSize, bufferNum, expectedException);
} finally {
shutdown();
}
@@ -135,32 +142,46 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
@Test
public void testCloseStreamPrimaryIsLeader() throws Exception {
// primary is 0, leader is 0
- testMockCluster(0, 3, null);
+ testMockCluster(0, 3, null, null);
}
@Test
public void testCloseStreamPrimaryIsNotLeader() throws Exception {
// primary is 0, leader is 1
- testMockCluster(1, 3, null);
+ testMockCluster(1, 3, null, null);
}
@Test
public void testCloseStreamOneServer() throws Exception {
// primary is 0, leader is 0
- testMockCluster(0, 1, null);
+ testMockCluster(0, 1, null, null);
}
@Test
- public void testExceptionInReplyPrimaryIsLeader() throws Exception {
+ public void testStateMachineExceptionInReplyPrimaryIsLeader() throws Exception {
// primary is 0, leader is 0
StateMachineException stateMachineException = new StateMachineException("leader throw StateMachineException");
- testMockCluster(0, 3, stateMachineException);
+ testMockCluster(0, 3, stateMachineException, null);
}
@Test
- public void testExceptionInReplyPrimaryIsNotLeader() throws Exception {
+ public void testStateMachineExceptionInReplyPrimaryIsNotLeader() throws Exception {
// primary is 0, leader is 1
StateMachineException stateMachineException = new StateMachineException("leader throw StateMachineException");
- testMockCluster(1, 3, stateMachineException);
+ testMockCluster(1, 3, stateMachineException, null);
+ }
+
+ @Test
+ public void testDataStreamExceptionInReplyPrimaryIsLeader() throws Exception {
+ // primary is 0, leader is 0
+ IOException ioException = new IOException("leader throw IOException");
+ testMockCluster(0, 3, null, ioException);
+ }
+
+ @Test
+ public void testDataStreamExceptionInReplyPrimaryIsNotLeader() throws Exception {
+ // primary is 0, leader is 1
+ IOException ioException = new IOException("leader throw IOException");
+ testMockCluster(1, 3, null, ioException);
}
}