You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/03 14:13:28 UTC

[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #237: RATIS-1083. Create transaction once stream data replicated to all servers

amaliujia commented on a change in pull request #237:
URL: https://github.com/apache/incubator-ratis/pull/237#discussion_r516369839



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       I think `startTransaction` is not chained with `previous` async operation? Is `startTransaction` still executed synchronously after all previous operations? 

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {

Review comment:
       it might be better to just use 
   ```
   if (reply.isSuccess()) 
   else if (request.getType() == Type.STREAM_CLOSE)
   else if (request.getType() == Type.START_TRANSACTION)
   else
   ```
   

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       I might have missed something: so fail to apply `STREAM_CLOSE` to RaftServer can conclude to that the server is not the leader?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       ```
   CompletableFuture<?> current = previous.get().....
   previous.set(current);
   ```
   Should `startTransaction` chained with this logic?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       I might have missed something: so fail to apply `STREAM_CLOSE` to RaftServer can conclude that the server is not the leader?

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);

Review comment:
       Got it. I was bit confused on the comment here :)

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       I see. So client will control when to start to close the stream. And likely the client will only do that after it confirms data has streamed successfully. Thus there is no need to chain the `startTransaction`?
   
   If so, though client might try to close the stream before confirming data has been written successfully, I guess that is a kind of exception and we don't need to address that now.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -299,35 +304,119 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
     };
   }
 
+  private void startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    try {
+      server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+        if (reply.isSuccess()) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplySuccess(request, buffer, -1, ctx);
+        } else {
+          if (request.getType() == Type.STREAM_CLOSE) {
+            // if this server is not the leader, forward start transition to the other peers
+            forwardStartTransaction(info, request, ctx);
+          } else if (request.getType() == Type.START_TRANSACTION){
+            sendReplyNotSuccess(request, ctx);
+          } else {
+            LOG.error("{}: Unexpected type:{}", this, request.getType());
+          }
+        }
+      });
+    } catch (IOException e) {
+      sendReplyNotSuccess(request, ctx);
+    }
+  }
+
+  private void forwardStartTransaction(
+      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    final List<CompletableFuture<Boolean>> results = new ArrayList<>();
+    for (DataStreamOutput out : info.getDataStreamOutputs()) {
+      final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
+        if (reply.isSuccess()) {
+          final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+              ((DataStreamReplyByteBuffer)reply).slice(): null;
+          sendReplySuccess(request, buffer, -1, ctx);
+          return true;
+        } else {
+          return false;
+        }
+      });
+
+      results.add(f);
+    }
+
+    JavaUtils.allOf(results).thenAccept(v -> {
+      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
+        sendReplyNotSuccess(request, ctx);
+      }
+    });
+  }
+
   private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
-    final boolean isHeader = request.getType() == Type.STREAM_HEADER;
 
     final StreamInfo info;
     final CompletableFuture<Long> localWrite;
     final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
-    if (isHeader) {
+    if (request.getType() == Type.STREAM_HEADER) {
       info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
       localWrite = CompletableFuture.completedFuture(0L);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.getHeaderFuture());
       }
-    } else {
+    } else if (request.getType() == Type.STREAM_DATA) {
       info = streams.get(key);
       localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+
+      for (DataStreamOutput out : info.getDataStreamOutputs()) {
+        remoteWrites.add(out.closeForwardAsync());
+      }
+    } else if (request.getType() == Type.STREAM_CLOSE_FORWARD) {
+      info = streams.get(key);
+      localWrite = info.getStream().thenApplyAsync(stream -> {
+        try {
+          stream.getWritableByteChannel().close();
+          return 0L;
+        } catch (IOException e) {
+          throw new CompletionException("Failed to close " + stream, e);
+        }
+      });
+    } else {
+      // peer server start transaction
+      startTransaction(streams.get(key), request, ctx);

Review comment:
       Got it. Thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org