You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/27 23:32:07 UTC

[incubator-ratis] branch master updated: RATIS-1270. Set default primary DataStreamServer in RaftClient.Builder. (#380)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ead7504  RATIS-1270. Set default primary DataStreamServer in RaftClient.Builder. (#380)
ead7504 is described below

commit ead7504ee1f499cd70cf2a0adc9c6bf5deeacb17
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 28 07:32:01 2020 +0800

    RATIS-1270. Set default primary DataStreamServer in RaftClient.Builder. (#380)
---
 .../java/org/apache/ratis/client/RaftClient.java   |  12 +-
 .../ratis/datastream/DataStreamBaseTest.java       | 315 +--------------------
 .../ratis/datastream/TestDataStreamDisabled.java   |  31 +-
 .../datastream/TestNettyDataStreamWithMock.java    |  28 +-
 4 files changed, 47 insertions(+), 339 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 62d62ac..e7bd3bf 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.Objects;
 
 /** A client who sends requests to a raft service. */
@@ -98,9 +99,14 @@ public interface RaftClient extends Closeable {
           clientRpc = factory.newRaftClientRpc(clientId, properties);
         }
       }
-      return ClientImplUtils.newRaftClient(clientId,
-          Objects.requireNonNull(group, "The 'group' field is not initialized."),
-          leaderId, primaryDataStreamServer,
+      Objects.requireNonNull(group, "The 'group' field is not initialized.");
+      if (primaryDataStreamServer == null) {
+        final Collection<RaftPeer> peers = group.getPeers();
+        if (!peers.isEmpty()) {
+          primaryDataStreamServer = peers.iterator().next();
+        }
+      }
+      return ClientImplUtils.newRaftClient(clientId, group, leaderId, primaryDataStreamServer,
           Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
           properties, retryPolicy);
     }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b554bf7..382b31e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,149 +18,34 @@
 package org.apache.ratis.datastream;
 
 import org.apache.ratis.BaseTest;
-import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.server.DataStreamServer;
-import org.apache.ratis.server.DataStreamServerRpc;
-import org.apache.ratis.server.DivisionInfo;
-import org.apache.ratis.server.DivisionProperties;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.RetryCache;
-import org.apache.ratis.server.impl.MiniRaftCluster;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamTestUtils.MyDataChannel;
-import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.GroupInfoReply;
-import org.apache.ratis.protocol.GroupInfoRequest;
-import org.apache.ratis.protocol.GroupListReply;
-import org.apache.ratis.protocol.GroupListRequest;
-import org.apache.ratis.protocol.GroupManagementRequest;
 import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.server.DataStreamMap;
-import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.DataStreamServer;
 import org.apache.ratis.server.RaftConfiguration;
+import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.ServerFactory;
-import org.apache.ratis.server.metrics.RaftServerMetrics;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.StateMachine.DataChannel;
 import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.NetUtils;
 import org.junit.Assert;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 abstract class DataStreamBaseTest extends BaseTest {
-  class MyDivision implements RaftServer.Division {
-    private final RaftServer server;
-    private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
-    private final DataStreamMap streamMap;
-    private RaftClient client;
-
-    MyDivision(RaftServer server) {
-      this.server = server;
-      this.streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
-    }
-
-    @Override
-    public DivisionProperties properties() {
-      return null;
-    }
-
-    @Override
-    public RaftGroupMemberId getMemberId() {
-      return null;
-    }
-
-    @Override
-    public DivisionInfo getInfo() {
-      return null;
-    }
-
-    @Override
-    public RaftConfiguration getRaftConf() {
+  RaftConfiguration getRaftConf() {
       final List<RaftPeer> peers = servers.stream().map(Server::getPeer).collect(Collectors.toList());
       return RaftServerTestUtil.newRaftConfiguration(peers);
-    }
-
-    @Override
-    public RaftServer getRaftServer() {
-      return server;
-    }
-
-    @Override
-    public RaftServerMetrics getRaftServerMetrics() {
-      return null;
-    }
-
-    @Override
-    public MultiDataStreamStateMachine getStateMachine() {
-      return stateMachine;
-    }
-
-    @Override
-    public RaftLog getRaftLog() {
-      return null;
-    }
-
-    @Override
-    public RaftStorage getRaftStorage() {
-      return null;
-    }
-
-    @Override
-    public RetryCache getRetryCache() {
-      return null;
-    }
-
-    @Override
-    public DataStreamMap getDataStreamMap() {
-      return streamMap;
-    }
-
-    public void setRaftClient(RaftClient client) {
-      this.client = client;
-    }
-
-    @Override
-    public RaftClient getRaftClient() {
-      return this.client;
-    }
-
-    @Override
-    public void close() {}
   }
 
   static class Server {
@@ -205,192 +90,6 @@ abstract class DataStreamBaseTest extends BaseTest {
     return servers.get(0);
   }
 
-  protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
-    return new MyRaftServer(peer, properties);
-  }
-
-  class MyRaftServer implements RaftServer {
-      private final RaftPeer peer;
-      private final RaftProperties properties;
-      private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
-
-      MyRaftServer(RaftPeer peer, RaftProperties properties) {
-        this.peer = peer;
-        this.properties = properties;
-      }
-
-      @Override
-      public RaftPeerId getId() {
-        return peer.getId();
-      }
-
-      @Override
-      public RaftPeer getPeer() {
-        return peer;
-      }
-
-      @Override
-      public MyDivision getDivision(RaftGroupId groupId) {
-        return divisions.computeIfAbsent(groupId, key -> new MyDivision(this));
-      }
-
-      @Override
-      public RaftProperties getProperties() {
-        return properties;
-      }
-
-      @Override
-      public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
-        return null;
-      }
-
-      @Override
-      public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
-        return null;
-      }
-
-      @Override
-      public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) {
-        return null;
-      }
-
-      @Override
-      public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
-        return null;
-      }
-
-    @Override
-      public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
-        return null;
-      }
-
-      @Override
-      public RaftServerRpc getServerRpc() {
-        return null;
-      }
-
-      @Override
-      public DataStreamServerRpc getDataStreamServerRpc() {
-        return null;
-      }
-
-      @Override
-      public RaftClientReply submitClientRequest(RaftClientRequest request) {
-        return submitClientRequestAsync(request).join();
-      }
-
-      @Override
-      public RaftClientReply setConfiguration(SetConfigurationRequest request) {
-        return null;
-      }
-
-      @Override
-      public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-        return null;
-      }
-
-      @Override
-      public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
-        final MyDivision d = getDivision(request.getRaftGroupId());
-        return d.getDataStreamMap()
-            .remove(ClientInvocationId.valueOf(request))
-            .thenApply(StateMachine.DataStream::getDataChannel)
-            .thenApply(channel -> buildRaftClientReply(request, channel));
-      }
-
-      RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
-        Assert.assertTrue(channel instanceof MyDataChannel);
-        final MyDataChannel dataChannel = (MyDataChannel) channel;
-        return RaftClientReply.newBuilder()
-            .setRequest(request)
-            .setSuccess()
-            .setMessage(() -> DataStreamTestUtils.bytesWritten2ByteString(dataChannel.getBytesWritten()))
-            .build();
-      }
-
-      @Override
-      public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
-        return null;
-      }
-
-      @Override
-      public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
-          throws IOException {
-        return null;
-      }
-
-      @Override
-      public GroupListReply getGroupList(GroupListRequest request) {
-        return null;
-      }
-
-      @Override
-      public GroupInfoReply getGroupInfo(GroupInfoRequest request) {
-        return null;
-      }
-
-      @Override
-      public RaftClientReply groupManagement(GroupManagementRequest request) {
-        return null;
-      }
-
-      @Override
-      public CompletableFuture<GroupListReply> getGroupListAsync(GroupListRequest request) {
-        return null;
-      }
-
-      @Override
-      public CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest request) {
-        return null;
-      }
-
-      @Override
-      public CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request) {
-        return null;
-      }
-
-      @Override
-      public void close() {
-      }
-
-      @Override
-      public Iterable<RaftGroupId> getGroupIds() {
-        return null;
-      }
-
-      @Override
-      public Iterable<RaftGroup> getGroups() {
-        return null;
-      }
-
-      @Override
-      public ServerFactory getFactory() {
-        return null;
-      }
-
-      @Override
-      public void start() {
-      }
-
-      @Override
-      public LifeCycle.State getLifeCycleState() {
-        return null;
-      }
-  }
-
-
-  protected void setup(int numServers){
-    final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
-        .map(RaftPeerId::valueOf)
-        .map(id -> RaftPeer.newBuilder().setId(id).setDataStreamAddress(NetUtils.createLocalServerAddress()).build())
-        .collect(Collectors.toList());
-
-    List<RaftServer> raftServers = new ArrayList<>();
-    peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
-    setup(RaftGroupId.randomId(), peers, raftServers);
-  }
-
-
   void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
     raftGroup = RaftGroup.valueOf(groupId, peers);
     this.peers = peers;
@@ -410,14 +109,6 @@ abstract class DataStreamBaseTest extends BaseTest {
     return otherPeers;
   }
 
-  RaftClient newRaftClientForDataStream() {
-    return RaftClient.newBuilder()
-        .setRaftGroup(raftGroup)
-        .setPrimaryDataStreamServer(getPrimaryServer().getPeer())
-        .setProperties(properties)
-        .build();
-  }
-
   RaftClient newRaftClientForDataStream(ClientId clientId) {
     return RaftClient.newBuilder()
         .setClientId(clientId)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 7444c12..cc0c4d9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -17,38 +17,39 @@
  */
 package org.apache.ratis.datastream;
 
+import org.apache.ratis.BaseTest;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.DisabledDataStreamClientFactory;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
-import org.junit.Before;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
-public class TestDataStreamDisabled extends DataStreamBaseTest {
+public class TestDataStreamDisabled extends BaseTest {
   @Rule
   public final ExpectedException exception = ExpectedException.none();
 
-  @Before
-  public void setup() {
-    properties = new RaftProperties();
-    RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.DISABLED);
-  }
-
   @Test
-  public void testDataStreamDisabled() throws Exception {
-    try {
-      setup(1);
-      final RaftClient client = newRaftClientForDataStream();
+  public void testDataStreamDisabled() {
+      final RaftProperties properties = new RaftProperties();
+      Assert.assertEquals(SupportedDataStreamType.DISABLED, RaftConfigKeys.DataStream.type(properties, LOG::info));
+
+      final RaftPeer server = RaftPeer.newBuilder().setId("s0").build();
+      final RaftClient client = RaftClient.newBuilder()
+        .setRaftGroup(RaftGroup.valueOf(RaftGroupId.randomId(), server))
+        .setProperties(properties)
+        .build();
+
       exception.expect(UnsupportedOperationException.class);
       exception.expectMessage(DisabledDataStreamClientFactory.class.getName()
           + "$1 does not support streamAsync");
       // stream() will create a header request, thus it will hit UnsupportedOperationException due to
       // DisabledDataStreamFactory.
       client.getDataStreamApi().stream();
-    } finally {
-      shutdown();
-    }
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 9b4e36b..39dbf4b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.AsyncRpcApi;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
@@ -31,7 +32,9 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.util.NetUtils;
 import org.junit.Before;
 import org.junit.Test;
@@ -65,11 +68,19 @@ public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
     RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
   }
 
-  @Override
-  protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
-    final RaftProperties p = new RaftProperties(properties);
-    NettyConfigKeys.DataStream.setPort(p, NetUtils.createSocketAddr(peer.getDataStreamAddress()).getPort());
-    return super.newRaftServer(peer, p);
+  RaftServer.Division mockDivision(RaftServer server, RaftClient client) {
+    final RaftServer.Division division = mock(RaftServer.Division.class);
+    when(division.getRaftServer()).thenReturn(server);
+    when(division.getRaftClient()).thenReturn(client);
+    when(division.getRaftConf()).thenAnswer(i -> getRaftConf());
+
+    final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
+    when(division.getStateMachine()).thenReturn(stateMachine);
+
+    final DataStreamMap streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
+    when(division.getDataStreamMap()).thenReturn(streamMap);
+
+    return division;
   }
 
   private void testMockCluster(int numServers, RaftException leaderException,
@@ -94,15 +105,14 @@ public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
       when(raftServer.getId()).thenReturn(peerId);
       when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
       if (getStateMachineException == null) {
-        MyDivision myDivision = new MyDivision(raftServer);
-        when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
-
         RaftClient client = Mockito.mock(RaftClient.class);
         when(client.getId()).thenReturn(clientId);
-        myDivision.setRaftClient(client);
         AsyncRpcApi asyncRpcApi = Mockito.mock(AsyncRpcApi.class);
         when(client.async()).thenReturn(asyncRpcApi);
 
+        final RaftServer.Division myDivision = mockDivision(raftServer, client);
+        when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
+
         if (submitException != null) {
           when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class))).thenThrow(submitException);
         } else if (i == 0) {