You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/29 10:48:35 UTC

[incubator-ratis] branch master updated: RATIS-1274. Close DataStreamClient when close RaftClient (#384)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ab0cd35  RATIS-1274. Close DataStreamClient when close RaftClient (#384)
ab0cd35 is described below

commit ab0cd356c0888e36642a5ef4f7b9c05d85f4447e
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Tue Dec 29 18:48:29 2020 +0800

    RATIS-1274. Close DataStreamClient when close RaftClient (#384)
---
 .../src/main/java/org/apache/ratis/client/api/DataStreamApi.java    | 3 ++-
 .../src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java  | 6 +++++-
 2 files changed, 7 insertions(+), 2 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 7250b4b..9e5e243 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.api;
 
 import org.apache.ratis.protocol.RoutingTable;
 
+import java.io.Closeable;
 import java.nio.ByteBuffer;
 
 /**
@@ -38,7 +39,7 @@ import java.nio.ByteBuffer;
  * this API streams data to all the servers in the {@link org.apache.ratis.protocol.RaftGroup}
  * but {@link MessageStreamApi} streams messages only to the leader.
  */
-public interface DataStreamApi {
+public interface DataStreamApi extends Closeable {
   /** Create a stream to write data. */
   default DataStreamOutput stream() {
     return stream(null);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 950da1f..9c3aac8 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -40,6 +40,7 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutScheduler;
 
@@ -128,7 +129,7 @@ public final class RaftClientImpl implements RaftClient {
   private final Supplier<AsyncImpl> asyncApi;
   private final Supplier<BlockingImpl> blockingApi;
   private final Supplier<MessageStreamImpl> messageStreamApi;
-  private final Supplier<DataStreamApi> dataStreamApi;
+  private final MemoizedSupplier<DataStreamApi> dataStreamApi;
 
   private final Supplier<AdminImpl> adminApi;
   private final ConcurrentMap<RaftPeerId, GroupManagementImpl> groupManagmenets = new ConcurrentHashMap<>();
@@ -345,5 +346,8 @@ public final class RaftClientImpl implements RaftClient {
   public void close() throws IOException {
     scheduler.close();
     clientRpc.close();
+    if (dataStreamApi.isInitialized()) {
+      dataStreamApi.get().close();
+    }
   }
 }