You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/02 07:22:05 UTC
[incubator-ratis] branch master updated: RATIS-1127. Add a
stream(RaftGroupId) method to DataStreamApi. (#248)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 12af614 RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi. (#248)
12af614 is described below
commit 12af614ad2c6ebf9bbcda9a49b4984c61f0fd2c2
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 2 15:21:55 2020 +0800
RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi. (#248)
* RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi.
* Fix DataStreamBaseTest.
* Fix checkstyle and some minor changes.
---
.../org/apache/ratis/client/api/DataStreamApi.java | 7 +-
.../ratis/client/impl/DataStreamClientImpl.java | 23 +++---
.../apache/ratis/netty/NettyDataStreamFactory.java | 4 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 5 +-
.../ratis/server/DataStreamServerFactory.java | 3 +-
.../server/DisabledDataStreamServerFactory.java | 3 +-
.../ratis/server/impl/DataStreamServerImpl.java | 9 +--
.../apache/ratis/server/impl/RaftServerProxy.java | 2 +-
.../ratis/datastream/DataStreamBaseTest.java | 91 +++++++++++++++-------
9 files changed, 94 insertions(+), 53 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 467dfb5..8b07fdb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -17,6 +17,8 @@
*/
package org.apache.ratis.client.api;
+import org.apache.ratis.protocol.RaftGroupId;
+
/**
* Stream data asynchronously to all the servers in the {@link org.apache.ratis.protocol.RaftGroup}.
* Clients may stream data to the nearest server and then the server will forward the data to the other servers.
@@ -35,6 +37,9 @@ package org.apache.ratis.client.api;
* but {@link MessageStreamApi} streams messages only to the leader.
*/
public interface DataStreamApi {
- /** Create a stream to send data. */
+ /** Create a stream to write data to the default group. */
DataStreamOutput stream();
+
+ /** Create a stream to write data to the given group. */
+ DataStreamOutput stream(RaftGroupId groupId);
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 506e25d..6b27f93 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -36,7 +36,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Streaming client implementation
@@ -51,8 +50,6 @@ public class DataStreamClientImpl implements DataStreamClient {
private final DataStreamClientRpc dataStreamClientRpc;
private final OrderedStreamAsync orderedStreamAsync;
- private final AtomicInteger streamId = new AtomicInteger();
-
public DataStreamClientImpl(RaftPeer server, RaftProperties properties, Parameters parameters) {
this.raftServer = Objects.requireNonNull(server, "server == null");
@@ -64,24 +61,27 @@ public class DataStreamClientImpl implements DataStreamClient {
}
public class DataStreamOutputImpl implements DataStreamOutput {
- private final long streamId;
private final RaftClientRequest header;
private final CompletableFuture<DataStreamReply> headerFuture;
private long streamOffset = 0;
- public DataStreamOutputImpl(long id){
- this.streamId = id;
- this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(),
+ public DataStreamOutputImpl(RaftGroupId groupId) {
+ final long streamId = RaftClientImpl.nextCallId();
+ this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, streamId,
RaftClientRequest.writeRequestType());
this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
}
+ long getStreamId() {
+ return header.getCallId();
+ }
+
// send to the attached dataStreamClientRpc
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
- final CompletableFuture<DataStreamReply> f = orderedStreamAsync.sendRequest(streamId, streamOffset, buf,
+ final CompletableFuture<DataStreamReply> f = orderedStreamAsync.sendRequest(getStreamId(), streamOffset, buf,
Type.STREAM_DATA);
streamOffset += buf.remaining();
return f;
@@ -110,7 +110,12 @@ public class DataStreamClientImpl implements DataStreamClient {
@Override
public DataStreamOutput stream() {
- return new DataStreamOutputImpl(streamId.incrementAndGet());
+ return stream(groupId);
+ }
+
+ @Override
+ public DataStreamOutput stream(RaftGroupId gid) {
+ return new DataStreamOutputImpl(gid);
}
@Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 140e7db..83c2d0f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -43,7 +43,7 @@ public class NettyDataStreamFactory implements DataStreamServerFactory, DataStre
}
@Override
- public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties properties) {
- return new NettyServerStreamRpc(server, properties);
+ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
+ return new NettyServerStreamRpc(server);
}
}
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 63beca7..c5b380b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -205,11 +205,12 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
private final Proxies proxies;
- public NettyServerStreamRpc(RaftServer server, RaftProperties properties) {
+ public NettyServerStreamRpc(RaftServer server) {
this.server = server;
this.name = server.getId() + "-" + getClass().getSimpleName();
- final int port = NettyConfigKeys.DataStream.port(server.getProperties());
+ final RaftProperties properties = server.getProperties();
+ final int port = NettyConfigKeys.DataStream.port(properties);
this.channelFuture = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index 65056c9..9fd5c15 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server;
import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.DataStreamFactory;
import org.apache.ratis.datastream.DataStreamType;
@@ -34,5 +33,5 @@ public interface DataStreamServerFactory extends DataStreamFactory {
}
/** Create a new {@link DataStreamServerRpc}. */
- DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties properties);
+ DataStreamServerRpc newDataStreamServerRpc(RaftServer server);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
index 89d3e3a..67425fa 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server;
import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.protocol.RaftPeer;
@@ -29,7 +28,7 @@ public class DisabledDataStreamServerFactory implements DataStreamServerFactory
public DisabledDataStreamServerFactory(Parameters parameters) {}
@Override
- public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties properties) {
+ public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
return new DataStreamServerRpc() {
@Override
public void start() {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index d41ea18..7f2e57c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -21,7 +21,6 @@ package org.apache.ratis.server.impl;
import java.io.IOException;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.SupportedDataStreamType;
import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DataStreamServerFactory;
@@ -35,11 +34,9 @@ public class DataStreamServerImpl implements DataStreamServer {
private final DataStreamServerRpc serverRpc;
- public DataStreamServerImpl(RaftServer server, RaftProperties properties, Parameters parameters) {
- final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
-
- this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
- .newDataStreamServerRpc(server, properties);
+ public DataStreamServerImpl(RaftServer server, Parameters parameters) {
+ final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(server.getProperties(), LOG::info);
+ this.serverRpc = DataStreamServerFactory.newInstance(type, parameters).newDataStreamServerRpc(server);
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index fbe9862..ab690d8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -182,7 +182,7 @@ public class RaftServerProxy implements RaftServer {
this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
this.serverRpc = factory.newRaftServerRpc(this);
- this.dataStreamServerRpc = new DataStreamServerImpl(this, properties, null).getServerRpc();
+ this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 7646651..739767a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -38,7 +38,6 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.ServerFactory;
@@ -55,8 +54,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
@@ -146,14 +147,50 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
+ class Server {
+ private final RaftPeer peer;
+ private final RaftServer raftServer;
+ private final DataStreamServerImpl dataStreamServer;
+
+ Server(RaftPeer peer) {
+ this.peer = peer;
+ this.raftServer = newRaftServer(peer, properties);
+ this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
+ }
+
+ RaftPeer getPeer() {
+ return peer;
+ }
+
+ MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) throws IOException {
+ return (MultiDataStreamStateMachine)raftServer.getStateMachine(groupId);
+ }
+
+ void start() {
+ dataStreamServer.getServerRpc().start();
+ }
+
+ void addRaftPeers(Collection<RaftPeer> peers) {
+ dataStreamServer.getServerRpc().addRaftPeers(peers);
+ }
+
+ void close() throws IOException {
+ dataStreamServer.close();
+ }
+ }
+
protected RaftProperties properties;
- private List<DataStreamServerImpl> servers;
- private List<RaftPeer> peers;
- private ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines;
+ private List<Server> servers;
+
+ Server getPrimaryServer() {
+ return servers.get(0);
+ }
protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
return new RaftServer() {
+ private final ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines = new ConcurrentHashMap<>();
+
@Override
public RaftPeerId getId() {
return peer.getId();
@@ -277,35 +314,32 @@ abstract class DataStreamBaseTest extends BaseTest {
protected void setup(int numServers){
- peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
+ final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
.map(RaftPeerId::valueOf)
.map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
.collect(Collectors.toList());
servers = new ArrayList<>(peers.size());
- stateMachines = new ConcurrentHashMap<>();
// start stream servers on raft peers.
for (int i = 0; i < peers.size(); i++) {
final RaftPeer peer = peers.get(i);
- final RaftServer server = newRaftServer(peer, properties);
- final DataStreamServerImpl streamServer = new DataStreamServerImpl(server, properties, null);
- final DataStreamServerRpc rpc = streamServer.getServerRpc();
+ final Server server = new Server(peer);
if (i == 0) {
// only the first server routes requests to peers.
List<RaftPeer> otherPeers = new ArrayList<>(peers);
otherPeers.remove(peers.get(i));
- rpc.addRaftPeers(otherPeers);
+ server.addRaftPeers(otherPeers);
}
- rpc.start();
- servers.add(streamServer);
+ server.start();
+ servers.add(server);
}
}
DataStreamClientImpl newDataStreamClientImpl() {
- return new DataStreamClientImpl(peers.get(0), properties, null);
+ return new DataStreamClientImpl(getPrimaryServer().getPeer(), properties, null);
}
protected void shutdown() throws IOException {
- for (DataStreamServerImpl server : servers) {
+ for (Server server : servers) {
server.close();
}
}
@@ -371,23 +405,24 @@ abstract class DataStreamBaseTest extends BaseTest {
Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
}
-
- final RaftClientRequest header = out.getHeader();
- for (MultiDataStreamStateMachine s : stateMachines.values()) {
- final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
- if (stream == null) {
- continue;
- }
- final RaftClientRequest writeRequest = stream.getWriteRequest();
- if (writeRequest.getClientId().equals(header.getClientId())) {
- Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
- Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
- Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
- }
- Assert.assertEquals(dataSize, stream.getByteWritten());
+ try {
+ assertHeader(out.getHeader(), dataSize);
+ } catch (Throwable e) {
+ throw new CompletionException(e);
}
}
+ void assertHeader(RaftClientRequest header, int dataSize) throws Exception {
+ final Server server = getPrimaryServer();
+ final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
+ final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
+ final RaftClientRequest writeRequest = stream.getWriteRequest();
+ Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
+ Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
+ Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
+ Assert.assertEquals(dataSize, stream.getByteWritten());
+ }
+
static ByteBuffer initBuffer(int offset, int size) {
final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
final int length = buffer.capacity();