You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/03 02:37:03 UTC

[ratis] branch master updated: RATIS-1456. Clean StreamMap when an exception occurs in DataStreamManagement#read (#551)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new fb062dd  RATIS-1456. Clean StreamMap when an exception occurs in DataStreamManagement#read (#551)
fb062dd is described below

commit fb062dd90146ad54c5ab4d6f459c05d20b7f0322
Author: hao guo <gu...@360.cn>
AuthorDate: Fri Dec 3 10:36:57 2021 +0800

    RATIS-1456. Clean StreamMap when an exception occurs in DataStreamManagement#read (#551)
---
 .../apache/ratis/netty/server/DataStreamManagement.java  | 16 ++++++++++++++--
 1 file changed, 14 insertions(+), 2 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 7539245..98e9c9e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -378,6 +378,7 @@ public class DataStreamManagement {
       final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
       info = streams.computeIfAbsent(key, id -> supplier.get());
       if (!supplier.isInitialized()) {
+        streams.remove(key);
         throw new IllegalStateException("Failed to create a new stream for " + request
             + " since a stream already exists Key: " + key + " StreamInfo:" + info);
       }
@@ -386,7 +387,10 @@ public class DataStreamManagement {
           () -> new IllegalStateException("Failed to remove StreamInfo for " + request));
     } else {
       info = Optional.ofNullable(streams.get(key)).orElseThrow(
-          () -> new IllegalStateException("Failed to get StreamInfo for " + request));
+          () -> {
+            streams.remove(key);
+            return new IllegalStateException("Failed to get StreamInfo for " + request);
+          });
     }
 
     final CompletableFuture<Long> localWrite;
@@ -420,6 +424,7 @@ public class DataStreamManagement {
         }, requestExecutor)).whenComplete((v, exception) -> {
       try {
         if (exception != null) {
+          streams.remove(key);
           replyDataStreamException(server, exception, info.getRequest(), request, ctx);
         }
       } finally {
@@ -441,7 +446,14 @@ public class DataStreamManagement {
     for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
       final DataStreamReply reply = replyFuture.join();
       assertReplyCorrespondingToRequest(request, reply);
-      if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
+      if (!reply.isSuccess()) {
+        LOG.warn("reply is not success, request: {}", request);
+        return false;
+      }
+      if (reply.getBytesWritten() != bytesWritten) {
+        LOG.warn(
+            "reply written bytes not match, local size: {} remote size: {} request: {}",
+            bytesWritten, reply.getBytesWritten(), request);
         return false;
       }
     }