You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/07 03:22:47 UTC

[incubator-ratis] branch master updated: RATIS-1212. Remove steams from StreamMap when close stream (#329)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5edc953  RATIS-1212. Remove steams from StreamMap when close stream (#329)
5edc953 is described below

commit 5edc9532b14f1916aff851f78946e5148c9de768
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Dec 7 11:22:28 2020 +0800

    RATIS-1212. Remove steams from StreamMap when close stream (#329)
    
    * RATIS-1212. Remove steams from StreamMap when close stream
    
    * close stream
---
 .../java/org/apache/ratis/examples/filestore/cli/DataStream.java | 2 ++
 .../java/org/apache/ratis/netty/server/DataStreamManagement.java | 9 +++++++++
 2 files changed, 11 insertions(+)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index fe059e6..b31229b 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -90,6 +90,8 @@ public class DataStream extends Client {
       } else {
         System.err.println("Error: dataStreamType should be one of DirectByteBuffer, MappedByteBuffer, transferTo");
       }
+
+      dataStreamOutput.closeAsync();
     }
     return fileMap;
   }
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 4ea4507..0b6804b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -189,6 +189,12 @@ public class DataStreamManagement {
       LOG.debug("get({}) returns {}", key, info);
       return info;
     }
+
+    StreamInfo remove(Key key) {
+      final StreamInfo info = map.remove(key);
+      LOG.debug("remove({}) returns {}", key, info);
+      return info;
+    }
   }
 
   private final RaftServer server;
@@ -347,6 +353,9 @@ public class DataStreamManagement {
         throw new IllegalStateException("Failed to create a new stream for " + request
             + " since a stream already exists: " + info);
       }
+    } else if (request.getType() == Type.STREAM_CLOSE) {
+      info = Optional.ofNullable(streams.remove(key)).orElseThrow(
+          () -> new IllegalStateException("Failed to remove StreamInfo for " + request));
     } else {
       info = Optional.ofNullable(streams.get(key)).orElseThrow(
           () -> new IllegalStateException("Failed to get StreamInfo for " + request));