You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/12/03 02:37:03 UTC
[ratis] branch master updated: RATIS-1456. Clean StreamMap when an exception occurs in DataStreamManagement#read (#551)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new fb062dd RATIS-1456. Clean StreamMap when an exception occurs in DataStreamManagement#read (#551)
fb062dd is described below
commit fb062dd90146ad54c5ab4d6f459c05d20b7f0322
Author: hao guo <gu...@360.cn>
AuthorDate: Fri Dec 3 10:36:57 2021 +0800
RATIS-1456. Clean StreamMap when an exception occurs in DataStreamManagement#read (#551)
---
.../apache/ratis/netty/server/DataStreamManagement.java | 16 ++++++++++++++--
1 file changed, 14 insertions(+), 2 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 7539245..98e9c9e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -378,6 +378,7 @@ public class DataStreamManagement {
final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
+ streams.remove(key);
throw new IllegalStateException("Failed to create a new stream for " + request
+ " since a stream already exists Key: " + key + " StreamInfo:" + info);
}
@@ -386,7 +387,10 @@ public class DataStreamManagement {
() -> new IllegalStateException("Failed to remove StreamInfo for " + request));
} else {
info = Optional.ofNullable(streams.get(key)).orElseThrow(
- () -> new IllegalStateException("Failed to get StreamInfo for " + request));
+ () -> {
+ streams.remove(key);
+ return new IllegalStateException("Failed to get StreamInfo for " + request);
+ });
}
final CompletableFuture<Long> localWrite;
@@ -420,6 +424,7 @@ public class DataStreamManagement {
}, requestExecutor)).whenComplete((v, exception) -> {
try {
if (exception != null) {
+ streams.remove(key);
replyDataStreamException(server, exception, info.getRequest(), request, ctx);
}
} finally {
@@ -441,7 +446,14 @@ public class DataStreamManagement {
for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
final DataStreamReply reply = replyFuture.join();
assertReplyCorrespondingToRequest(request, reply);
- if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
+ if (!reply.isSuccess()) {
+ LOG.warn("reply is not success, request: {}", request);
+ return false;
+ }
+ if (reply.getBytesWritten() != bytesWritten) {
+ LOG.warn(
+ "reply written bytes not match, local size: {} remote size: {} request: {}",
+ bytesWritten, reply.getBytesWritten(), request);
return false;
}
}