You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sh...@apache.org on 2020/09/30 10:23:41 UTC
[incubator-ratis] branch master updated: RATIS-1080. Rename
StreamApi to MessageStreamApi. (#211)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9ffceab RATIS-1080. Rename StreamApi to MessageStreamApi. (#211)
9ffceab is described below
commit 9ffceabd770fd52f2b1ad9a227f651c47ea8c104
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Sep 30 18:23:35 2020 +0800
RATIS-1080. Rename StreamApi to MessageStreamApi. (#211)
---
.../main/java/org/apache/ratis/client/RaftClient.java | 5 +++--
.../org/apache/ratis/client/api/DataStreamOutput.java | 18 +++++-------------
.../apache/ratis/client/api/MessageOutputStream.java | 15 ++-------------
.../api/{StreamApi.java => MessageStreamApi.java} | 4 ++--
.../impl/{StreamImpl.java => MessageStreamImpl.java} | 12 ++++++------
.../org/apache/ratis/client/impl/RaftClientImpl.java | 8 ++++----
.../src/main/java/org/apache/ratis/io/CloseAsync.java | 18 +++++++++++-------
...{StreamApiTests.java => MessageStreamApiTests.java} | 6 +++---
...WithGrpc.java => TestMessageStreamApiWithGrpc.java} | 4 ++--
9 files changed, 38 insertions(+), 52 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 37f889c..dc2b2af 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -18,7 +18,7 @@
package org.apache.ratis.client;
import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.api.StreamApi;
+import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.client.impl.ClientImplUtils;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
@@ -48,7 +48,8 @@ public interface RaftClient extends Closeable {
/** @return the client rpct. */
RaftClientRpc getClientRpc();
- StreamApi getStreamApi();
+ /** @return the {@link MessageStreamApi}. */
+ MessageStreamApi getMessageStreamApi();
/**
* Async call to send the given message to the raft service.
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 2c55be8..936b9b6 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -17,22 +17,14 @@
*/
package org.apache.ratis.client.api;
+import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.protocol.DataStreamReply;
+
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-public interface DataStreamOutput extends AutoCloseable {
+/** An asynchronous output stream supporting zero buffer copying. */
+public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
+ /** Send out the data in the buffer asynchronously */
CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
-
- CompletableFuture<DataStreamReply> closeAsync();
-
- default void close() throws Exception {
- try {
- closeAsync().get();
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- throw cause instanceof Exception? (Exception)cause: e;
- }
- }
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java
index 3129847..358ad28 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageOutputStream.java
@@ -17,14 +17,14 @@
*/
package org.apache.ratis.client.api;
+import org.apache.ratis.io.CloseAsync;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
/** Stream {@link Message}(s) asynchronously. */
-public interface MessageOutputStream extends AutoCloseable {
+public interface MessageOutputStream extends CloseAsync<RaftClientReply> {
/**
* Send asynchronously the given message to this stream.
*
@@ -41,15 +41,4 @@ public interface MessageOutputStream extends AutoCloseable {
default CompletableFuture<RaftClientReply> sendAsync(Message message) {
return sendAsync(message, false);
}
-
- CompletableFuture<RaftClientReply> closeAsync();
-
- default void close() throws Exception {
- try {
- closeAsync().get();
- } catch (ExecutionException e) {
- final Throwable cause = e.getCause();
- throw cause instanceof Exception? (Exception)cause: e;
- }
- }
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/StreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
similarity index 93%
rename from ratis-client/src/main/java/org/apache/ratis/client/api/StreamApi.java
rename to ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
index 16d03bb..4d0f9a7 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/StreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/MessageStreamApi.java
@@ -26,8 +26,8 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.CompletableFuture;
/** A client who sends requests to a raft service. */
-public interface StreamApi {
- Logger LOG = LoggerFactory.getLogger(StreamApi.class);
+public interface MessageStreamApi {
+ Logger LOG = LoggerFactory.getLogger(MessageStreamApi.class);
/** Create a stream to send a large message. */
MessageOutputStream stream();
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/StreamImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
similarity index 87%
rename from ratis-client/src/main/java/org/apache/ratis/client/impl/StreamImpl.java
rename to ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
index 1821ecd..77a8932 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/StreamImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/MessageStreamImpl.java
@@ -19,7 +19,7 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.api.MessageOutputStream;
-import org.apache.ratis.client.api.StreamApi;
+import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
@@ -33,11 +33,11 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
/** Send ordered asynchronous requests to a raft service. */
-public final class StreamImpl implements StreamApi {
- public static final Logger LOG = LoggerFactory.getLogger(StreamImpl.class);
+public final class MessageStreamImpl implements MessageStreamApi {
+ public static final Logger LOG = LoggerFactory.getLogger(MessageStreamImpl.class);
- static StreamImpl newInstance(RaftClientImpl client, RaftProperties properties) {
- return new StreamImpl(client, properties);
+ static MessageStreamImpl newInstance(RaftClientImpl client, RaftProperties properties) {
+ return new MessageStreamImpl(client, properties);
}
class MessageOutputStreamImpl implements MessageOutputStream {
@@ -63,7 +63,7 @@ public final class StreamImpl implements StreamApi {
private final SizeInBytes submessageSize;
private final AtomicLong streamId = new AtomicLong();
- private StreamImpl(RaftClientImpl client, RaftProperties properties) {
+ private MessageStreamImpl(RaftClientImpl client, RaftProperties properties) {
this.client = Objects.requireNonNull(client, "client == null");
this.submessageSize = RaftClientConfigKeys.Stream.submessageSize(properties);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 4d89bd3..4e25fe4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -20,7 +20,7 @@ package org.apache.ratis.client.impl;
import org.apache.ratis.client.retry.ClientRetryEvent;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientRpc;
-import org.apache.ratis.client.api.StreamApi;
+import org.apache.ratis.client.api.MessageStreamApi;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
@@ -114,7 +114,7 @@ public final class RaftClientImpl implements RaftClient {
private final TimeoutScheduler scheduler;
private final Supplier<OrderedAsync> orderedAsync;
- private final Supplier<StreamApi> streamApi;
+ private final Supplier<MessageStreamApi> streamApi;
RaftClientImpl(ClientId clientId, RaftGroup group, RaftPeerId leaderId,
RaftClientRpc clientRpc, RaftProperties properties, RetryPolicy retryPolicy) {
@@ -130,7 +130,7 @@ public final class RaftClientImpl implements RaftClient {
clientRpc.addServers(peers);
this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, properties));
- this.streamApi = JavaUtils.memoize(() -> StreamImpl.newInstance(this, properties));
+ this.streamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
}
public RaftPeerId getLeaderId() {
@@ -177,7 +177,7 @@ public final class RaftClientImpl implements RaftClient {
}
@Override
- public StreamApi getStreamApi() {
+ public MessageStreamApi getMessageStreamApi() {
return streamApi.get();
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
similarity index 73%
copy from ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
copy to ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
index 2c55be8..408e3d5 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/CloseAsync.java
@@ -15,18 +15,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.client.api;
+package org.apache.ratis.io;
-import org.apache.ratis.protocol.DataStreamReply;
-import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-public interface DataStreamOutput extends AutoCloseable {
- CompletableFuture<DataStreamReply> streamAsync(ByteBuffer buf);
-
- CompletableFuture<DataStreamReply> closeAsync();
+/** Support the {@link CloseAsync#closeAsync()} method. */
+public interface CloseAsync<REPLY> extends AutoCloseable {
+ /** Close asynchronously. */
+ CompletableFuture<REPLY> closeAsync();
+ /**
+ * The same as {@link AutoCloseable#close()}.
+ *
+ * The default implementation simply calls {@link CloseAsync#closeAsync()}
+ * and then waits for the returned future to complete.
+ */
default void close() throws Exception {
try {
closeAsync().get();
diff --git a/ratis-server/src/test/java/org/apache/ratis/StreamApiTests.java b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
similarity index 94%
rename from ratis-server/src/test/java/org/apache/ratis/StreamApiTests.java
rename to ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
index 098da69..5da5c91 100644
--- a/ratis-server/src/test/java/org/apache/ratis/StreamApiTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MessageStreamApiTests.java
@@ -36,7 +36,7 @@ import org.junit.Test;
import java.nio.charset.StandardCharsets;
-public abstract class StreamApiTests<CLUSTER extends MiniRaftCluster> extends BaseTest
+public abstract class MessageStreamApiTests<CLUSTER extends MiniRaftCluster> extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
Log4jUtils.setLogLevel(RaftServerImpl.LOG, Level.DEBUG);
@@ -62,7 +62,7 @@ public abstract class StreamApiTests<CLUSTER extends MiniRaftCluster> extends Ba
final int endOfRequest = 6;
final StringBuilder key = new StringBuilder();
try(RaftClient client = cluster.createClient();
- MessageOutputStream out = client.getStreamApi().stream()) {
+ MessageOutputStream out = client.getMessageStreamApi().stream()) {
for (int i = 1; i <= numParts; i++) {
key.append(i);
out.sendAsync(new SimpleMessage(i + ""), i == endOfRequest);
@@ -112,7 +112,7 @@ public abstract class StreamApiTests<CLUSTER extends MiniRaftCluster> extends Ba
}
try(RaftClient client = cluster.createClient()) {
- final RaftClientReply reply = client.getStreamApi().streamAsync(Message.valueOf(bytes)).get();
+ final RaftClientReply reply = client.getMessageStreamApi().streamAsync(Message.valueOf(bytes)).get();
Assert.assertTrue(reply.isSuccess());
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestStreamApiWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestMessageStreamApiWithGrpc.java
similarity index 86%
rename from ratis-test/src/test/java/org/apache/ratis/grpc/TestStreamApiWithGrpc.java
rename to ratis-test/src/test/java/org/apache/ratis/grpc/TestMessageStreamApiWithGrpc.java
index f377ff9..5d170b5 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestStreamApiWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestMessageStreamApiWithGrpc.java
@@ -17,8 +17,8 @@
*/
package org.apache.ratis.grpc;
-import org.apache.ratis.StreamApiTests;
+import org.apache.ratis.MessageStreamApiTests;
-public class TestStreamApiWithGrpc extends StreamApiTests<MiniRaftClusterWithGrpc>
+public class TestMessageStreamApiWithGrpc extends MessageStreamApiTests<MiniRaftClusterWithGrpc>
implements MiniRaftClusterWithGrpc.FactoryGet {
}
\ No newline at end of file