You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/15 11:06:18 UTC

[incubator-ratis] branch master updated: RATIS-1150. Return DataStreamException to client (#277)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a054165  RATIS-1150. Return DataStreamException to client (#277)
a054165 is described below

commit a05416545a4b0d1fe3f9e2d19f1f4fd5c1d772f5
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Sun Nov 15 19:06:09 2020 +0800

    RATIS-1150. Return DataStreamException to client (#277)
---
 .../apache/ratis/client/impl/ClientProtoUtils.java | 38 +++++++++++++++
 .../org/apache/ratis/protocol/RaftClientReply.java |  8 ++-
 .../protocol/exceptions/DataStreamException.java   | 30 ++++++++++++
 .../ratis/netty/server/DataStreamManagement.java   | 37 ++++++++++----
 ratis-proto/src/main/proto/Raft.proto              |  7 +++
 .../ratis/datastream/DataStreamBaseTest.java       | 10 ++--
 .../ratis/datastream/TestDataStreamNetty.java      | 57 +++++++++++++++-------
 7 files changed, 152 insertions(+), 35 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 673d58f..5a7adf7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -20,6 +20,7 @@ package org.apache.ratis.client.impl;
 import java.util.Optional;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.ALREADYCLOSEDEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.DATASTREAMEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERNOTREADYEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
@@ -191,6 +193,17 @@ public interface ClientProtoUtils {
         b.setStateMachineException(smeBuilder.build());
       }
 
+      final DataStreamException dse = reply.getDataStreamException();
+      if (dse != null) {
+        DataStreamExceptionProto.Builder dseBuilder =
+            DataStreamExceptionProto.newBuilder();
+        final Throwable t = dse.getCause() != null ? dse.getCause() : dse;
+        dseBuilder.setExceptionClassName(t.getClass().getName())
+            .setErrorMsg(t.getMessage())
+            .setStacktrace(ProtoUtils.writeObject2ByteString(t.getStackTrace()));
+        b.setDataStreamException(dseBuilder.build());
+      }
+
       final NotReplicatedException nre = reply.getNotReplicatedException();
       if (nre != null) {
         final NotReplicatedExceptionProto.Builder nreBuilder = NotReplicatedExceptionProto.newBuilder()
@@ -282,6 +295,10 @@ public interface ClientProtoUtils {
       StateMachineExceptionProto smeProto = replyProto.getStateMachineException();
       e = wrapStateMachineException(serverMemberId,
           smeProto.getExceptionClassName(), smeProto.getErrorMsg(), smeProto.getStacktrace());
+    } else if (replyProto.getExceptionDetailsCase().equals(DATASTREAMEXCEPTION)) {
+      DataStreamExceptionProto dseProto = replyProto.getDataStreamException();
+      e = wrapDataStreamException(serverMemberId.getPeerId(),
+          dseProto.getExceptionClassName(), dseProto.getErrorMsg(), dseProto.getStacktrace());
     } else if (replyProto.getExceptionDetailsCase().equals(LEADERNOTREADYEXCEPTION)) {
       LeaderNotReadyExceptionProto lnreProto = replyProto.getLeaderNotReadyException();
       e = new LeaderNotReadyException(ProtoUtils.toRaftGroupMemberId(lnreProto.getServerId()));
@@ -344,6 +361,27 @@ public interface ClientProtoUtils {
     return sme;
   }
 
+  static DataStreamException wrapDataStreamException(
+      RaftPeerId peerId, String className, String errorMsg, ByteString stackTraceBytes) {
+    DataStreamException dse;
+    if (className == null) {
+      dse = new DataStreamException(errorMsg);
+    } else {
+      try {
+        Class<?> clazz = Class.forName(className);
+        final Exception e = ReflectionUtils.instantiateException(
+            clazz.asSubclass(Exception.class), errorMsg, null);
+        dse = new DataStreamException(peerId, e);
+      } catch (Exception e) {
+        dse = new DataStreamException(className + ": " + errorMsg);
+      }
+    }
+    StackTraceElement[] stacktrace =
+        (StackTraceElement[]) ProtoUtils.toObject(stackTraceBytes);
+    dse.setStackTrace(stacktrace);
+    return dse;
+  }
+
   static AlreadyClosedException wrapAlreadyClosedException(
       String className, String errorMsg, ByteString stackTraceBytes) {
     AlreadyClosedException ace;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 36e2fd2..a9bcd83 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -19,6 +19,7 @@ package org.apache.ratis.protocol;
 
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
+import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
@@ -84,7 +85,7 @@ public class RaftClientReply extends RaftClientMessage {
       Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
           AlreadyClosedException.class,
           NotLeaderException.class, NotReplicatedException.class,
-          LeaderNotReadyException.class, StateMachineException.class),
+          LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class),
           () -> "Unexpected exception class: " + this);
     }
   }
@@ -167,6 +168,11 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, StateMachineException.class);
   }
 
+  /** If this reply has {@link DataStreamException}, return it; otherwise return null. */
+  public DataStreamException getDataStreamException() {
+    return JavaUtils.cast(exception, DataStreamException.class);
+  }
+
   public LeaderNotReadyException getLeaderNotReadyException() {
     return JavaUtils.cast(exception, LeaderNotReadyException.class);
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/DataStreamException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/DataStreamException.java
new file mode 100644
index 0000000..de10d6f
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/DataStreamException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftPeerId;
+
+public class DataStreamException extends RaftException {
+  public DataStreamException(RaftPeerId peerId, Throwable cause) {
+    super(cause.getClass().getName() + " from Server " + peerId + ": " + cause.getMessage(), cause);
+  }
+
+  public DataStreamException(String msg) {
+    super(msg);
+  }
+}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 8808a95..6ea857d 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -28,6 +28,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.statemachine.StateMachine;
@@ -305,13 +306,7 @@ public class DataStreamManagement {
         }
       }, executor);
     } catch (IOException e) {
-      // TODO include IOException in the reply
-      final DataStreamReplyByteBuffer reply = DataStreamReplyByteBuffer.newBuilder()
-          .setDataStreamPacket(request)
-          .setSuccess(false)
-          .build();
-      ctx.writeAndFlush(reply);
-      return CompletableFuture.completedFuture(null);
+      throw new CompletionException(e);
     }
   }
 
@@ -332,6 +327,13 @@ public class DataStreamManagement {
     ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, chosen));
   }
 
+  static void replyDataStreamException(RaftServer server, Throwable cause, RaftClientRequest raftClientRequest,
+      DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
+    RaftClientReply reply = new RaftClientReply(raftClientRequest, dataStreamException, null);
+    ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+  }
+
   static void forwardStartTransaction(StreamInfo info, DataStreamRequestByteBuf request, RaftClientReply localReply,
       ChannelHandlerContext ctx, Executor executor) {
     final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
@@ -371,7 +373,15 @@ public class DataStreamManagement {
       // for peers to start transaction
       final StreamInfo info = streams.get(key);
       composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
-          .thenAccept(v -> buf.release());
+          .whenComplete((v, exception) -> {
+        try {
+          if (exception != null) {
+            replyDataStreamException(server, exception, info.getRequest(), request, ctx);
+          }
+        } finally {
+          buf.release();
+        }
+      });
       return;
     }
 
@@ -410,9 +420,16 @@ public class DataStreamManagement {
           } else {
             throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
           }
-          buf.release();
           return null;
-        }, executor));
+        }, executor)).whenComplete((v, exception) -> {
+      try {
+        if (exception != null) {
+          replyDataStreamException(server, exception, info.getRequest(), request, ctx);
+        }
+      } finally {
+        buf.release();
+      }
+    });
   }
 
   static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index c799e79..f3ca97f 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -333,6 +333,12 @@ message AlreadyClosedExceptionProto {
   bytes stacktrace = 3;
 }
 
+message DataStreamExceptionProto {
+  string exceptionClassName = 1;
+  string errorMsg = 2;
+  bytes stacktrace = 3;
+}
+
 message RaftClientReplyProto {
   RaftRpcReplyProto rpcReply = 1;
   ClientMessageEntryProto message = 2;
@@ -343,6 +349,7 @@ message RaftClientReplyProto {
     StateMachineExceptionProto stateMachineException = 5;
     LeaderNotReadyExceptionProto leaderNotReadyException = 6;
     AlreadyClosedExceptionProto alreadyClosedException = 7;
+    DataStreamExceptionProto dataStreamException = 8;
   }
 
   uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 33a90a1..8fc78a1 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -39,6 +39,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.SetConfigurationRequest;
+import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
@@ -400,7 +401,7 @@ abstract class DataStreamBaseTest extends BaseTest {
   }
 
 
-  void runTestMockCluster(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
+  void runTestMockCluster(int bufferSize, int bufferNum, Exception expectedException)
       throws IOException {
     try (final RaftClient client = newRaftClientForDataStream()) {
       final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
@@ -409,13 +410,10 @@ abstract class DataStreamBaseTest extends BaseTest {
 
       final RaftClientReply clientReply = ClientProtoUtils.toRaftClientReply(
           RaftClientReplyProto.parseFrom(replyByteBuffer.slice()));
-      Assert.assertEquals(clientReply.getCallId(), expectedClientReply.getCallId());
-      Assert.assertEquals(clientReply.getClientId(), expectedClientReply.getClientId());
-      Assert.assertEquals(clientReply.getLogIndex(), expectedClientReply.getLogIndex());
-      if (expectedClientReply.getException() != null) {
+      if (expectedException != null) {
         Assert.assertFalse(replyByteBuffer.isSuccess());
         Assert.assertTrue(clientReply.getException().getMessage().contains(
-            expectedClientReply.getException().getMessage()));
+            expectedException.getMessage()));
       } else {
         Assert.assertTrue(replyByteBuffer.isSuccess());
       }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 21800ab..edfc8da 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -37,6 +37,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
@@ -80,15 +81,14 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
     runTestDataStream(3);
   }
 
-  private void testMockCluster(int leaderIndex, int numServers, RaftException leaderException) throws Exception {
+  private void testMockCluster(int leaderIndex, int numServers, RaftException leaderException,
+      Exception submitException) throws Exception {
     List<RaftServer> raftServers = new ArrayList<>();
     ClientId clientId = ClientId.randomId();
     RaftGroupId groupId = RaftGroupId.randomId();
-    long callId = 100;
-    long longIndex = 200;
     final RaftPeer suggestedLeader = RaftPeer.newBuilder().setId("s" + leaderIndex).build();
-    RaftClientReply expectedClientReply = new RaftClientReply(clientId, suggestedLeader.getId(), groupId, callId,
-        leaderException == null ? true : false, null, leaderException, longIndex, null);
+    RaftClientReply expectedClientReply = new RaftClientReply(clientId, suggestedLeader.getId(), groupId, 0,
+        leaderException == null ? true : false, null, leaderException, 0, null);
 
     for (int i = 0; i < numServers; i ++) {
       RaftServer raftServer = mock(RaftServer.class);
@@ -104,11 +104,17 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
         RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId.valueOf(peerId, groupId);
         NotLeaderException notLeaderException = new NotLeaderException(raftGroupMemberId, suggestedLeader, null);
         raftClientReply = new RaftClientReply(clientId, peerId,
-            groupId, callId, false, null, notLeaderException, longIndex, null);
+            groupId, 0, false, null, notLeaderException, 0, null);
+      }
+
+      if (submitException != null) {
+        when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
+            .thenThrow(submitException);
+      } else {
+        when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
+            .thenReturn(CompletableFuture.completedFuture(raftClientReply));
       }
 
-      when(raftServer.submitClientRequestAsync(Mockito.any(RaftClientRequest.class)))
-          .thenReturn(CompletableFuture.completedFuture(raftClientReply));
       when(raftServer.getProperties()).thenReturn(properties);
       when(raftServer.getId()).thenReturn(peerId);
       when(raftServer.getStateMachine(Mockito.any(RaftGroupId.class))).thenReturn(new MultiDataStreamStateMachine());
@@ -116,17 +122,18 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
       raftServers.add(raftServer);
     }
 
-    runTestMockCluster(raftServers, 1_000_000, 10, expectedClientReply);
+    runTestMockCluster(raftServers, 1_000_000, 10,
+        submitException != null ? submitException : leaderException);
   }
 
   void runTestMockCluster(List<RaftServer> raftServers, int bufferSize, int bufferNum,
-      RaftClientReply expectedClientReply) throws Exception {
+      Exception expectedException) throws Exception {
     try {
       final List<RaftPeer> peers = raftServers.stream()
           .map(TestDataStreamNetty::newRaftPeer)
           .collect(Collectors.toList());
       setup(peers, raftServers);
-      runTestMockCluster(bufferSize, bufferNum, expectedClientReply);
+      runTestMockCluster(bufferSize, bufferNum, expectedException);
     } finally {
       shutdown();
     }
@@ -135,32 +142,46 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
   @Test
   public void testCloseStreamPrimaryIsLeader() throws Exception {
     // primary is 0, leader is 0
-    testMockCluster(0, 3, null);
+    testMockCluster(0, 3, null, null);
   }
 
   @Test
   public void testCloseStreamPrimaryIsNotLeader() throws Exception {
     // primary is 0, leader is 1
-    testMockCluster(1, 3, null);
+    testMockCluster(1, 3, null, null);
   }
 
   @Test
   public void testCloseStreamOneServer() throws Exception {
     // primary is 0, leader is 0
-    testMockCluster(0, 1, null);
+    testMockCluster(0, 1, null, null);
   }
 
   @Test
-  public void testExceptionInReplyPrimaryIsLeader() throws Exception {
+  public void testStateMachineExceptionInReplyPrimaryIsLeader() throws Exception {
     // primary is 0, leader is 0
     StateMachineException stateMachineException = new StateMachineException("leader throw StateMachineException");
-    testMockCluster(0, 3, stateMachineException);
+    testMockCluster(0, 3, stateMachineException, null);
   }
 
   @Test
-  public void testExceptionInReplyPrimaryIsNotLeader() throws Exception {
+  public void testStateMachineExceptionInReplyPrimaryIsNotLeader() throws Exception {
     // primary is 0, leader is 1
     StateMachineException stateMachineException = new StateMachineException("leader throw StateMachineException");
-    testMockCluster(1, 3, stateMachineException);
+    testMockCluster(1, 3, stateMachineException, null);
+  }
+
+  @Test
+  public void testDataStreamExceptionInReplyPrimaryIsLeader() throws Exception {
+    // primary is 0, leader is 0
+    IOException ioException = new IOException("leader throw IOException");
+    testMockCluster(0, 3, null, ioException);
+  }
+
+  @Test
+  public void testDataStreamExceptionInReplyPrimaryIsNotLeader() throws Exception {
+    // primary is 0, leader is 1
+    IOException ioException = new IOException("leader throw IOException");
+    testMockCluster(1, 3, null, ioException);
   }
 }