You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/02 07:22:05 UTC

[incubator-ratis] branch master updated: RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi. (#248)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 12af614  RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi. (#248)
12af614 is described below

commit 12af614ad2c6ebf9bbcda9a49b4984c61f0fd2c2
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 2 15:21:55 2020 +0800

    RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi. (#248)
    
    * RATIS-1127. Add a stream(RaftGroupId) method to DataStreamApi.
    
    * Fix DataStreamBaseTest.
    
    * Fix checkstyle and some minor changes.
---
 .../org/apache/ratis/client/api/DataStreamApi.java |  7 +-
 .../ratis/client/impl/DataStreamClientImpl.java    | 23 +++---
 .../apache/ratis/netty/NettyDataStreamFactory.java |  4 +-
 .../ratis/netty/server/NettyServerStreamRpc.java   |  5 +-
 .../ratis/server/DataStreamServerFactory.java      |  3 +-
 .../server/DisabledDataStreamServerFactory.java    |  3 +-
 .../ratis/server/impl/DataStreamServerImpl.java    |  9 +--
 .../apache/ratis/server/impl/RaftServerProxy.java  |  2 +-
 .../ratis/datastream/DataStreamBaseTest.java       | 91 +++++++++++++++-------
 9 files changed, 94 insertions(+), 53 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
index 467dfb5..8b07fdb 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamApi.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.client.api;
 
+import org.apache.ratis.protocol.RaftGroupId;
+
 /**
  * Stream data asynchronously to all the servers in the {@link org.apache.ratis.protocol.RaftGroup}.
  * Clients may stream data to the nearest server and then the server will forward the data to the other servers.
@@ -35,6 +37,9 @@ package org.apache.ratis.client.api;
  * but {@link MessageStreamApi} streams messages only to the leader.
  */
 public interface DataStreamApi {
-  /** Create a stream to send data. */
+  /** Create a stream to write data to the default group. */
   DataStreamOutput stream();
+
+  /** Create a stream to write data to the given group. */
+  DataStreamOutput stream(RaftGroupId groupId);
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 506e25d..6b27f93 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -36,7 +36,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * Streaming client implementation
@@ -51,8 +50,6 @@ public class DataStreamClientImpl implements DataStreamClient {
   private final DataStreamClientRpc dataStreamClientRpc;
   private final OrderedStreamAsync orderedStreamAsync;
 
-  private final AtomicInteger streamId = new AtomicInteger();
-
   public DataStreamClientImpl(RaftPeer server, RaftProperties properties, Parameters parameters) {
     this.raftServer = Objects.requireNonNull(server, "server == null");
 
@@ -64,24 +61,27 @@ public class DataStreamClientImpl implements DataStreamClient {
   }
 
   public class DataStreamOutputImpl implements DataStreamOutput {
-    private final long streamId;
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
 
     private long streamOffset = 0;
 
-    public DataStreamOutputImpl(long id){
-      this.streamId = id;
-      this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, RaftClientImpl.nextCallId(),
+    public DataStreamOutputImpl(RaftGroupId groupId) {
+      final long streamId = RaftClientImpl.nextCallId();
+      this.header = new RaftClientRequest(clientId, raftServer.getId(), groupId, streamId,
           RaftClientRequest.writeRequestType());
       this.headerFuture = orderedStreamAsync.sendRequest(streamId, -1,
           ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
     }
 
+    long getStreamId() {
+      return header.getCallId();
+    }
+
     // send to the attached dataStreamClientRpc
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf) {
-      final CompletableFuture<DataStreamReply> f = orderedStreamAsync.sendRequest(streamId, streamOffset, buf,
+      final CompletableFuture<DataStreamReply> f = orderedStreamAsync.sendRequest(getStreamId(), streamOffset, buf,
           Type.STREAM_DATA);
       streamOffset += buf.remaining();
       return f;
@@ -110,7 +110,12 @@ public class DataStreamClientImpl implements DataStreamClient {
 
   @Override
   public DataStreamOutput stream() {
-    return new DataStreamOutputImpl(streamId.incrementAndGet());
+    return stream(groupId);
+  }
+
+  @Override
+  public DataStreamOutput stream(RaftGroupId gid) {
+    return new DataStreamOutputImpl(gid);
   }
 
   @Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
index 140e7db..83c2d0f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamFactory.java
@@ -43,7 +43,7 @@ public class NettyDataStreamFactory implements DataStreamServerFactory, DataStre
   }
 
   @Override
-  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties properties) {
-    return new NettyServerStreamRpc(server, properties);
+  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
+    return new NettyServerStreamRpc(server);
   }
 }
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 63beca7..c5b380b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -205,11 +205,12 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
 
   private final Proxies proxies;
 
-  public NettyServerStreamRpc(RaftServer server, RaftProperties properties) {
+  public NettyServerStreamRpc(RaftServer server) {
     this.server = server;
     this.name = server.getId() + "-" + getClass().getSimpleName();
 
-    final int port = NettyConfigKeys.DataStream.port(server.getProperties());
+    final RaftProperties properties = server.getProperties();
+    final int port = NettyConfigKeys.DataStream.port(properties);
     this.channelFuture = new ServerBootstrap()
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
index 65056c9..9fd5c15 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DataStreamServerFactory.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server;
 
 import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.DataStreamFactory;
 import org.apache.ratis.datastream.DataStreamType;
 
@@ -34,5 +33,5 @@ public interface DataStreamServerFactory extends DataStreamFactory {
   }
 
   /** Create a new {@link DataStreamServerRpc}. */
-  DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties properties);
+  DataStreamServerRpc newDataStreamServerRpc(RaftServer server);
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
index 89d3e3a..67425fa 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DisabledDataStreamServerFactory.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.server;
 
 import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.protocol.RaftPeer;
 
@@ -29,7 +28,7 @@ public class DisabledDataStreamServerFactory implements DataStreamServerFactory
   public DisabledDataStreamServerFactory(Parameters parameters) {}
 
   @Override
-  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server, RaftProperties properties) {
+  public DataStreamServerRpc newDataStreamServerRpc(RaftServer server) {
     return new DataStreamServerRpc() {
       @Override
       public void start() {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index d41ea18..7f2e57c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -21,7 +21,6 @@ package org.apache.ratis.server.impl;
 import java.io.IOException;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.conf.Parameters;
-import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.SupportedDataStreamType;
 import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.DataStreamServerFactory;
@@ -35,11 +34,9 @@ public class DataStreamServerImpl implements DataStreamServer {
 
   private final DataStreamServerRpc serverRpc;
 
-  public DataStreamServerImpl(RaftServer server, RaftProperties properties, Parameters parameters) {
-    final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(properties, LOG::info);
-
-    this.serverRpc = DataStreamServerFactory.newInstance(type, parameters)
-        .newDataStreamServerRpc(server, properties);
+  public DataStreamServerImpl(RaftServer server, Parameters parameters) {
+    final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(server.getProperties(), LOG::info);
+    this.serverRpc = DataStreamServerFactory.newInstance(type, parameters).newDataStreamServerRpc(server);
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index fbe9862..ab690d8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -182,7 +182,7 @@ public class RaftServerProxy implements RaftServer {
     this.factory = ServerFactory.cast(rpcType.newFactory(parameters));
 
     this.serverRpc = factory.newRaftServerRpc(this);
-    this.dataStreamServerRpc = new DataStreamServerImpl(this, properties, null).getServerRpc();
+    this.dataStreamServerRpc = new DataStreamServerImpl(this, parameters).getServerRpc();
 
     this.id = id != null? id: RaftPeerId.valueOf(getIdStringFrom(serverRpc));
     this.lifeCycle = new LifeCycle(this.id + "-" + getClass().getSimpleName());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 7646651..739767a 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -38,7 +38,6 @@ import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.SetConfigurationRequest;
 import org.apache.ratis.rpc.RpcType;
-import org.apache.ratis.server.DataStreamServerRpc;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.DataStreamServerImpl;
 import org.apache.ratis.server.impl.ServerFactory;
@@ -55,8 +54,10 @@ import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
@@ -146,14 +147,50 @@ abstract class DataStreamBaseTest extends BaseTest {
     }
   }
 
+  class Server {
+    private final RaftPeer peer;
+    private final RaftServer raftServer;
+    private final DataStreamServerImpl dataStreamServer;
+
+    Server(RaftPeer peer) {
+      this.peer = peer;
+      this.raftServer = newRaftServer(peer, properties);
+      this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
+    }
+
+    RaftPeer getPeer() {
+      return peer;
+    }
+
+    MultiDataStreamStateMachine getStateMachine(RaftGroupId groupId) throws IOException {
+      return (MultiDataStreamStateMachine)raftServer.getStateMachine(groupId);
+    }
+
+    void start() {
+      dataStreamServer.getServerRpc().start();
+    }
+
+    void addRaftPeers(Collection<RaftPeer> peers) {
+      dataStreamServer.getServerRpc().addRaftPeers(peers);
+    }
+
+    void close() throws IOException {
+      dataStreamServer.close();
+    }
+  }
+
   protected RaftProperties properties;
 
-  private List<DataStreamServerImpl> servers;
-  private List<RaftPeer> peers;
-  private ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines;
+  private List<Server> servers;
+
+  Server getPrimaryServer() {
+    return servers.get(0);
+  }
 
   protected RaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
     return new RaftServer() {
+      private final ConcurrentMap<RaftGroupId, MultiDataStreamStateMachine> stateMachines = new ConcurrentHashMap<>();
+
       @Override
       public RaftPeerId getId() {
         return peer.getId();
@@ -277,35 +314,32 @@ abstract class DataStreamBaseTest extends BaseTest {
 
 
   protected void setup(int numServers){
-    peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
+    final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
         .map(RaftPeerId::valueOf)
         .map(id -> new RaftPeer(id, NetUtils.createLocalServerAddress()))
         .collect(Collectors.toList());
     servers = new ArrayList<>(peers.size());
-    stateMachines = new ConcurrentHashMap<>();
     // start stream servers on raft peers.
     for (int i = 0; i < peers.size(); i++) {
       final RaftPeer peer = peers.get(i);
-      final RaftServer server = newRaftServer(peer, properties);
-      final DataStreamServerImpl streamServer = new DataStreamServerImpl(server, properties, null);
-      final DataStreamServerRpc rpc = streamServer.getServerRpc();
+      final Server server = new Server(peer);
       if (i == 0) {
         // only the first server routes requests to peers.
         List<RaftPeer> otherPeers = new ArrayList<>(peers);
         otherPeers.remove(peers.get(i));
-        rpc.addRaftPeers(otherPeers);
+        server.addRaftPeers(otherPeers);
       }
-      rpc.start();
-      servers.add(streamServer);
+      server.start();
+      servers.add(server);
     }
   }
 
   DataStreamClientImpl newDataStreamClientImpl() {
-    return new DataStreamClientImpl(peers.get(0), properties, null);
+    return new DataStreamClientImpl(getPrimaryServer().getPeer(), properties, null);
   }
 
   protected void shutdown() throws IOException {
-    for (DataStreamServerImpl server : servers) {
+    for (Server server : servers) {
       server.close();
     }
   }
@@ -371,23 +405,24 @@ abstract class DataStreamBaseTest extends BaseTest {
       Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
       Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
     }
-
-    final RaftClientRequest header = out.getHeader();
-    for (MultiDataStreamStateMachine s : stateMachines.values()) {
-      final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
-      if (stream == null) {
-        continue;
-      }
-      final RaftClientRequest writeRequest = stream.getWriteRequest();
-      if (writeRequest.getClientId().equals(header.getClientId())) {
-        Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
-        Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
-        Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
-      }
-      Assert.assertEquals(dataSize, stream.getByteWritten());
+    try {
+      assertHeader(out.getHeader(), dataSize);
+    } catch (Throwable e) {
+      throw new CompletionException(e);
     }
   }
 
+  void assertHeader(RaftClientRequest header, int dataSize) throws Exception {
+    final Server server = getPrimaryServer();
+    final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
+    final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
+    final RaftClientRequest writeRequest = stream.getWriteRequest();
+    Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
+    Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
+    Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
+    Assert.assertEquals(dataSize, stream.getByteWritten());
+  }
+
   static ByteBuffer initBuffer(int offset, int size) {
     final ByteBuffer buffer = ByteBuffer.allocateDirect(size);
     final int length = buffer.capacity();