You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/09/30 10:23:41 UTC

[incubator-ratis] branch master updated: RATIS-1080. Rename StreamApi to MessageStreamApi. (#211)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ffceab  RATIS-1080. Rename StreamApi to MessageStreamApi. (#211)
9ffceab is described below

commit 9ffceabd770fd52f2b1ad9a227f651c47ea8c104
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Sep 30 18:23:35 2020 +0800

    RATIS-1080. Rename StreamApi to MessageStreamApi. (#211)
---
 .../main/java/org/apache/ratis/client/RaftClient.java  |  5 +++--
 .../org/apache/ratis/client/api/DataStreamOutput.java  | 18 +++++-------------
 .../apache/ratis/client/api/MessageOutputStream.java   | 15 ++-------------
 .../api/{StreamApi.java => MessageStreamApi.java}      |  4 ++--
 .../impl/{StreamImpl.java => MessageStreamImpl.java}   | 12 ++++++------
 .../org/apache/ratis/client/impl/RaftClientImpl.java   |  8 ++++----
 .../src/main/java/org/apache/ratis/io/CloseAsync.java  | 18 +++++++++++-------
 ...{StreamApiTests.java => MessageStreamApiTests.java} |  6 +++---
 ...WithGrpc.java => TestMessageStreamApiWithGrpc.java} |  4 ++--
 9 files changed, 38 insertions(+), 52 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 37f889c..dc2b2af 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -18,7 +18,7 @@
 package org.apache.ratis.client;
 
 import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.api.StreamApi;
+import org.apache.ratis.client.api.MessageStreamApi;
 import org.apache.ratis.client.impl.ClientImplUtils;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
@@ -48,7 +48,8 @@ public interface RaftClient extends Closeable {
   /** @return the client rpct. */
   RaftClientRpc getClientRpc();
 
-  StreamApi getStreamApi();
+  /** @return the {@link MessageStreamApi}. */
+  MessageStreamApi getMessageStreamApi();
 
   /**
    * Async call to send the given message to the raft service.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 2c55be8..936b9b6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -17,22 +17,14 @@
  */
 package org.apache.ratis.client.api;
 
+import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.protocol.DataStreamReply;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
-public interface DataStreamOutput extends AutoCloseable {
+/** An asynchronous output stream supporting zero buffer copying. */
+public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
+  /** Send out the data in the buffer asynchronously */
   CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
-
-  CompletableFuture<DataStreamReply> closeAsync();
-
-  default void close() throws Exception {
-    try {
-      closeAsync().get();
-    } catch (ExecutionException e) {
-      final Throwable cause = e.getCause();
-      throw cause instanceof Exception? (Exception)cause: e;
-    }
-  }
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java
index 3129847..358ad28 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java
@@ -17,14 +17,14 @@
  */
 package org.apache.ratis.client.api;
 
+import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 
 /** Stream {@link Message}(s) asynchronously. */
-public interface MessageOutputStream extends AutoCloseable {
+public interface MessageOutputStream extends CloseAsync<RaftClientReply> {
   /**
    * Send asynchronously the given message to this stream.
    *
@@ -41,15 +41,4 @@ public interface MessageOutputStream extends AutoCloseable {
   default CompletableFuture<RaftClientReply> sendAsync(Message message) {
     return sendAsync(message, false);
   }
-
-  CompletableFuture<RaftClientReply> closeAsync();
-
-  default void close() throws Exception {
-    try {
-      closeAsync().get();
-    } catch (ExecutionException e) {
-      final Throwable cause = e.getCause();
-      throw cause instanceof Exception? (Exception)cause: e;
-    }
-  }
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/StreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
similarity index 93%
rename from ratis-client/src/main/java/org/apache/ratis/client/api/StreamApi.java
rename to ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
index 16d03bb..4d0f9a7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/StreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
@@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
 import java.util.concurrent.CompletableFuture;
 
 /** A client who sends requests to a raft service. */
-public interface StreamApi {
-  Logger LOG = LoggerFactory.getLogger(StreamApi.class);
+public interface MessageStreamApi {
+  Logger LOG = LoggerFactory.getLogger(MessageStreamApi.class);
 
   /** Create a stream to send a large message. */
   MessageOutputStream stream();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/StreamImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
similarity index 87%
rename from ratis-client/src/main/java/org/apache/ratis/client/impl/StreamImpl.java
rename to ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index 1821ecd..77a8932 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/StreamImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -19,7 +19,7 @@ package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.api.MessageOutputStream;
-import org.apache.ratis.client.api.StreamApi;
+import org.apache.ratis.client.api.MessageStreamApi;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -33,11 +33,11 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
 
 /** Send ordered asynchronous requests to a raft service. */
-public final class StreamImpl implements StreamApi {
-  public static final Logger LOG = LoggerFactory.getLogger(StreamImpl.class);
+public final class MessageStreamImpl implements MessageStreamApi {
+  public static final Logger LOG = LoggerFactory.getLogger(MessageStreamImpl.class);
 
-  static StreamImpl newInstance(RaftClientImpl client, RaftProperties properties) {
-    return new StreamImpl(client, properties);
+  static MessageStreamImpl newInstance(RaftClientImpl client, RaftProperties properties) {
+    return new MessageStreamImpl(client, properties);
   }
 
   class MessageOutputStreamImpl implements MessageOutputStream {
@@ -63,7 +63,7 @@ public final class StreamImpl implements StreamApi {
   private final SizeInBytes submessageSize;
   private final AtomicLong streamId = new AtomicLong();
 
-  private StreamImpl(RaftClientImpl client, RaftProperties properties) {
+  private MessageStreamImpl(RaftClientImpl client, RaftProperties properties) {
     this.client = Objects.requireNonNull(client, "client == null");
     this.submessageSize = RaftClientConfigKeys.Stream.submessageSize(properties);
   }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 4d89bd3..4e25fe4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -20,7 +20,7 @@ package org.apache.ratis.client.impl;
 import org.apache.ratis.client.retry.ClientRetryEvent;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientRpc;
-import org.apache.ratis.client.api.StreamApi;
+import org.apache.ratis.client.api.MessageStreamApi;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
@@ -114,7 +114,7 @@ public final class RaftClientImpl implements RaftClient {
   private final TimeoutScheduler scheduler;
 
   private final Supplier<OrderedAsync> orderedAsync;
-  private final Supplier<StreamApi> streamApi;
+  private final Supplier<MessageStreamApi> streamApi;
 
   RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
       RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
@@ -130,7 +130,7 @@ public final class RaftClientImpl implements RaftClient {
     clientRpc.addServers(peers);
 
     this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, properties));
-    this.streamApi = JavaUtils.memoize(() -> StreamImpl.newInstance(this, properties));
+    this.streamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
   }
 
   public RaftPeerId getLeaderId() {
@@ -177,7 +177,7 @@ public final class RaftClientImpl implements RaftClient {
   }
 
   @Override
-  public StreamApi getStreamApi() {
+  public MessageStreamApi getMessageStreamApi() {
     return streamApi.get();
   }
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
similarity index 73%
copy from ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
copy to ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
index 2c55be8..408e3d5 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
@@ -15,18 +15,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.client.api;
+package org.apache.ratis.io;
 
-import org.apache.ratis.protocol.DataStreamReply;
-import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-public interface DataStreamOutput extends AutoCloseable {
-  CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
-
-  CompletableFuture<DataStreamReply> closeAsync();
+/** Support the {@link CloseAsync#closeAsync()} method. */
+public interface CloseAsync<REPLY> extends AutoCloseable {
+  /** Close asynchronously. */
+  CompletableFuture<REPLY> closeAsync();
 
+  /**
+   * The same as {@link AutoCloseable#close()}.
+   *
+   * The default implementation simply calls {@link CloseAsync#closeAsync()}
+   * and then waits for the returned future to complete.
+   */
   default void close() throws Exception {
     try {
       closeAsync().get();
diff --git a/ratis-server/src/test/java/org/apache/ratis/StreamApiTests.java b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
similarity index 94%
rename from ratis-server/src/test/java/org/apache/ratis/StreamApiTests.java
rename to ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
index 098da69..5da5c91 100644
--- a/ratis-server/src/test/java/org/apache/ratis/StreamApiTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
@@ -36,7 +36,7 @@ import org.junit.Test;
 
 import java.nio.charset.StandardCharsets;
 
-public abstract class StreamApiTests<CLUSTER extends MiniRaftCluster> extends BaseTest
+public abstract class MessageStreamApiTests<CLUSTER extends MiniRaftCluster> extends BaseTest
     implements MiniRaftCluster.Factory.Get<CLUSTER> {
   {
     Log4jUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
@@ -62,7 +62,7 @@ public abstract class StreamApiTests<CLUSTER extends MiniRaftCluster> extends Ba
     final int endOfRequest = 6;
     final StringBuilder key = new StringBuilder();
     try(RaftClient client = cluster.createClient();
-        MessageOutputStream out = client.getStreamApi().stream()) {
+        MessageOutputStream out = client.getMessageStreamApi().stream()) {
       for (int i = 1; i <= numParts; i++) {
         key.append(i);
         out.sendAsync(new SimpleMessage(i + ""), i == endOfRequest);
@@ -112,7 +112,7 @@ public abstract class StreamApiTests<CLUSTER extends MiniRaftCluster> extends Ba
     }
 
     try(RaftClient client = cluster.createClient()) {
-      final RaftClientReply reply = client.getStreamApi().streamAsync(Message.valueOf(bytes)).get();
+      final RaftClientReply reply = client.getMessageStreamApi().streamAsync(Message.valueOf(bytes)).get();
       Assert.assertTrue(reply.isSuccess());
     }
 
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestStreamApiWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestMessageStreamApiWithGrpc.java
similarity index 86%
rename from ratis-test/src/test/java/org/apache/ratis/grpc/TestStreamApiWithGrpc.java
rename to ratis-test/src/test/java/org/apache/ratis/grpc/TestMessageStreamApiWithGrpc.java
index f377ff9..5d170b5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestStreamApiWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestMessageStreamApiWithGrpc.java
@@ -17,8 +17,8 @@
  */
 package org.apache.ratis.grpc;
 
-import org.apache.ratis.StreamApiTests;
+import org.apache.ratis.MessageStreamApiTests;
 
-public class TestStreamApiWithGrpc extends StreamApiTests<MiniRaftClusterWithGrpc>
+public class TestMessageStreamApiWithGrpc extends MessageStreamApiTests<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
 }
\ No newline at end of file