You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/06 11:33:37 UTC
[incubator-ratis] branch master updated: RATIS-1134. Remove
DataStreamApi.stream(RaftGroupId). (#259)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 189ee94 RATIS-1134. Remove DataStreamApi.stream(RaftGroupId). (#259)
189ee94 is described below
commit 189ee94471afdef8bd6c8b1fb419309230bfaa24
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Nov 6 19:32:44 2020 +0800
RATIS-1134. Remove DataStreamApi.stream(RaftGroupId). (#259)
* RATIS-1134. Remove DataStreamApi.stream(RaftGroupId).
* Fixed checkstyle and changed TestDataStreamNetty slightly.
---
.../org/apache/ratis/client/api/DataStreamApi.java | 7 +-----
.../ratis/client/impl/DataStreamClientImpl.java | 27 +++++++++-------------
.../impl/DataStreamPacketByteBuffer.java | 4 ++--
.../ratis/datastream/DataStreamBaseTest.java | 6 ++---
.../ratis/datastream/TestDataStreamDisabled.java | 1 -
.../ratis/datastream/TestDataStreamNetty.java | 4 ++--
6 files changed, 19 insertions(+), 30 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 8b07fdb..3383567 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -17,8 +17,6 @@
*/
package org.apache.ratis.client.api;
-import org.apache.ratis.protocol.RaftGroupId;
-
/**
* Stream data asynchronously to all the servers in the {@link org.apache.ratis.protocol.RaftGroup}.
* Clients may stream data to the nearest server and then the server will forward the data to the other servers.
@@ -37,9 +35,6 @@ import org.apache.ratis.protocol.RaftGroupId;
* but {@link MessageStreamApi} streams messages only to the leader.
*/
public interface DataStreamApi {
- /** Create a stream to write data to the default group. */
+ /** Create a stream to write data. */
DataStreamOutput stream();
-
- /** Create a stream to write data to the given group. */
- DataStreamOutput stream(RaftGroupId groupId);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index bf54fb9..6c07374 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -25,13 +25,13 @@ import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -75,35 +75,35 @@ public class DataStreamClientImpl implements DataStreamClient {
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
}
- long getStreamId() {
- return header.getCallId();
+ private CompletableFuture<DataStreamReply> send(Type type, ByteBuffer buffer) {
+ return orderedStreamAsync.sendRequest(header.getCallId(), streamOffset, buffer, type);
+ }
+
+ private CompletableFuture<DataStreamReply> send(Type type) {
+ return send(type, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
}
// send to the attached dataStreamClientRpc
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
- final CompletableFuture<DataStreamReply> f = orderedStreamAsync.sendRequest(getStreamId(), streamOffset, buf,
- Type.STREAM_DATA);
+ final CompletableFuture<DataStreamReply> f = send(Type.STREAM_DATA, buf);
streamOffset += buf.remaining();
return f;
}
@Override
public CompletableFuture<DataStreamReply> closeAsync() {
- return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, Unpooled.EMPTY_BUFFER.nioBuffer(),
- Type.STREAM_CLOSE);
+ return send(Type.STREAM_CLOSE);
}
@Override
public CompletableFuture<DataStreamReply> closeForwardAsync() {
- return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, Unpooled.EMPTY_BUFFER.nioBuffer(),
- Type.STREAM_CLOSE_FORWARD);
+ return send(Type.STREAM_CLOSE_FORWARD);
}
@Override
public CompletableFuture<DataStreamReply> startTransactionAsync() {
- return orderedStreamAsync.sendRequest(getStreamId(), streamOffset, Unpooled.EMPTY_BUFFER.nioBuffer(),
- Type.START_TRANSACTION);
+ return send(Type.START_TRANSACTION);
}
public RaftClientRequest getHeader() {
@@ -123,11 +123,6 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public DataStreamOutputRpc stream() {
- return stream(groupId);
- }
-
- @Override
- public DataStreamOutputRpc stream(RaftGroupId gid) {
RaftClientRequest request = new RaftClientRequest(
clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(), RaftClientRequest.writeRequestType());
return new DataStreamOutputImpl(request);
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
index fcb420d..0caa77e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamPacketByteBuffer.java
@@ -24,13 +24,13 @@ import java.nio.ByteBuffer;
* Implements {@link org.apache.ratis.protocol.DataStreamPacket} with {@link ByteBuffer}.
*/
public abstract class DataStreamPacketByteBuffer extends DataStreamPacketImpl {
- private static final ByteBuffer EMPTY = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
+ public static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocateDirect(0).asReadOnlyBuffer();
private final ByteBuffer buffer;
public DataStreamPacketByteBuffer(long streamId, long streamOffset, ByteBuffer buffer, Type type) {
super(streamId, streamOffset, type);
- this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY;
+ this.buffer = buffer != null? buffer.asReadOnlyBuffer(): EMPTY_BYTE_BUFFER;
}
@Override
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 51c6bb7..4d50d7f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -397,8 +397,8 @@ abstract class DataStreamBaseTest extends BaseTest {
final RaftClient client = newRaftClientForDataStream();
clients.add(client);
for (int i = 0; i < numStreams; i++) {
- futures.add(CompletableFuture.runAsync(
- () -> runTestDataStream((DataStreamOutputImpl) client.getDataStreamApi().stream(raftGroup.getGroupId()), bufferSize, bufferNum)));
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
+ (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum)));
}
}
Assert.assertEquals(numClients*numStreams, futures.size());
@@ -413,7 +413,7 @@ abstract class DataStreamBaseTest extends BaseTest {
private void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream()) {
- DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream(raftGroup.getGroupId());
+ final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream();
runTestDataStream(out, bufferSize, bufferNum);
DataStreamReplyByteBuffer replyByteBuffer = (DataStreamReplyByteBuffer) out.closeAsync().join();
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 0350b2d..7444c12 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -20,7 +20,6 @@ package org.apache.ratis.datastream;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DisabledDataStreamClientFactory;
import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.client.impl.DataStreamClientImpl;
import org.apache.ratis.conf.RaftProperties;
import org.junit.Before;
import org.junit.Rule;
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 83decef..4b13599 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -58,13 +58,13 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
@Test
public void testDataStreamSingleServer() throws Exception {
- runTestDataStream(1, 2, 20, 1_000_000, 10);
+ runTestDataStream(1, 5, 10, 1_000_000, 10);
runTestDataStream(1, 2, 20, 1_000, 10_000);
}
@Test
public void testDataStreamMultipleServer() throws Exception {
- runTestDataStream(3, 2, 20, 1_000_000, 100);
+ runTestDataStream(3, 5, 10, 1_000_000, 10);
runTestDataStream(3, 2, 20, 1_000, 10_000);
}