You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/27 23:32:07 UTC
[incubator-ratis] branch master updated: RATIS-1270. Set default
primary DataStreamServer in RaftClient.Builder. (#380)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ead7504 RATIS-1270. Set default primary DataStreamServer in RaftClient.Builder. (#380)
ead7504 is described below
commit ead7504ee1f499cd70cf2a0adc9c6bf5deeacb17
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 28 07:32:01 2020 +0800
RATIS-1270. Set default primary DataStreamServer in RaftClient.Builder. (#380)
---
.../java/org/apache/ratis/client/RaftClient.java | 12 +-
.../ratis/datastream/DataStreamBaseTest.java | 315 +--------------------
.../ratis/datastream/TestDataStreamDisabled.java | 31 +-
.../datastream/TestNettyDataStreamWithMock.java | 28 +-
4 files changed, 47 insertions(+), 339 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 62d62ac..e7bd3bf 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
+import java.util.Collection;
import java.util.Objects;
/** A client who sends requests to a raft service. */
@@ -98,9 +99,14 @@ public interface RaftClient extends Closeable {
clientRpc = factory.newRaftClientRpc(clientId, properties);
}
}
- return ClientImplUtils.newRaftClient(clientId,
- Objects.requireNonNull(group, "The 'group' field is not initialized."),
- leaderId, primaryDataStreamServer,
+ Objects.requireNonNull(group, "The 'group' field is not initialized.");
+ if (primaryDataStreamServer == null) {
+ final Collection<RaftPeer> peers = group.getPeers();
+ if (!peers.isEmpty()) {
+ primaryDataStreamServer = peers.iterator().next();
+ }
+ }
+ return ClientImplUtils.newRaftClient(clientId, group, leaderId, primaryDataStreamServer,
Objects.requireNonNull(clientRpc, "The 'clientRpc' field is not initialized."),
properties, retryPolicy);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index b554bf7..382b31e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,149 +18,34 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.protocol.TransferLeadershipRequest;
-import org.apache.ratis.server.DataStreamServer;
-import org.apache.ratis.server.DataStreamServerRpc;
-import org.apache.ratis.server.DivisionInfo;
-import org.apache.ratis.server.DivisionProperties;
-import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.RetryCache;
-import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.DataStreamTestUtils.MyDataChannel;
-import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
-import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.protocol.GroupInfoReply;
-import org.apache.ratis.protocol.GroupInfoRequest;
-import org.apache.ratis.protocol.GroupListReply;
-import org.apache.ratis.protocol.GroupListRequest;
-import org.apache.ratis.protocol.GroupManagementRequest;
import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.server.DataStreamMap;
-import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.RaftConfiguration;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.ServerFactory;
-import org.apache.ratis.server.metrics.RaftServerMetrics;
-import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.server.storage.RaftStorage;
-import org.apache.ratis.statemachine.StateMachine;
-import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.LifeCycle;
-import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
abstract class DataStreamBaseTest extends BaseTest {
- class MyDivision implements RaftServer.Division {
- private final RaftServer server;
- private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
- private final DataStreamMap streamMap;
- private RaftClient client;
-
- MyDivision(RaftServer server) {
- this.server = server;
- this.streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
- }
-
- @Override
- public DivisionProperties properties() {
- return null;
- }
-
- @Override
- public RaftGroupMemberId getMemberId() {
- return null;
- }
-
- @Override
- public DivisionInfo getInfo() {
- return null;
- }
-
- @Override
- public RaftConfiguration getRaftConf() {
+ RaftConfiguration getRaftConf() {
final List<RaftPeer> peers = servers.stream().map(Server::getPeer).collect(Collectors.toList());
return RaftServerTestUtil.newRaftConfiguration(peers);
- }
-
- @Override
- public RaftServer getRaftServer() {
- return server;
- }
-
- @Override
- public RaftServerMetrics getRaftServerMetrics() {
- return null;
- }
-
- @Override
- public MultiDataStreamStateMachine getStateMachine() {
- return stateMachine;
- }
-
- @Override
- public RaftLog getRaftLog() {
- return null;
- }
-
- @Override
- public RaftStorage getRaftStorage() {
- return null;
- }
-
- @Override
- public RetryCache getRetryCache() {
- return null;
- }
-
- @Override
- public DataStreamMap getDataStreamMap() {
- return streamMap;
- }
-
- public void setRaftClient(RaftClient client) {
- this.client = client;
- }
-
- @Override
- public RaftClient getRaftClient() {
- return this.client;
- }
-
- @Override
- public void close() {}
}
static class Server {
@@ -205,192 +90,6 @@ abstract class DataStreamBaseTest extends BaseTest {
return servers.get(0);
}
- protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
- return new MyRaftServer(peer, properties);
- }
-
- class MyRaftServer implements RaftServer {
- private final RaftPeer peer;
- private final RaftProperties properties;
- private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
-
- MyRaftServer(RaftPeer peer, RaftProperties properties) {
- this.peer = peer;
- this.properties = properties;
- }
-
- @Override
- public RaftPeerId getId() {
- return peer.getId();
- }
-
- @Override
- public RaftPeer getPeer() {
- return peer;
- }
-
- @Override
- public MyDivision getDivision(RaftGroupId groupId) {
- return divisions.computeIfAbsent(groupId, key -> new MyDivision(this));
- }
-
- @Override
- public RaftProperties getProperties() {
- return properties;
- }
-
- @Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto request) {
- return null;
- }
-
- @Override
- public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
- return null;
- }
-
- @Override
- public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) {
- return null;
- }
-
- @Override
- public StartLeaderElectionReplyProto startLeaderElection(StartLeaderElectionRequestProto request) throws IOException {
- return null;
- }
-
- @Override
- public CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(AppendEntriesRequestProto request) {
- return null;
- }
-
- @Override
- public RaftServerRpc getServerRpc() {
- return null;
- }
-
- @Override
- public DataStreamServerRpc getDataStreamServerRpc() {
- return null;
- }
-
- @Override
- public RaftClientReply submitClientRequest(RaftClientRequest request) {
- return submitClientRequestAsync(request).join();
- }
-
- @Override
- public RaftClientReply setConfiguration(SetConfigurationRequest request) {
- return null;
- }
-
- @Override
- public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
- return null;
- }
-
- @Override
- public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
- final MyDivision d = getDivision(request.getRaftGroupId());
- return d.getDataStreamMap()
- .remove(ClientInvocationId.valueOf(request))
- .thenApply(StateMachine.DataStream::getDataChannel)
- .thenApply(channel -> buildRaftClientReply(request, channel));
- }
-
- RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
- Assert.assertTrue(channel instanceof MyDataChannel);
- final MyDataChannel dataChannel = (MyDataChannel) channel;
- return RaftClientReply.newBuilder()
- .setRequest(request)
- .setSuccess()
- .setMessage(() -> DataStreamTestUtils.bytesWritten2ByteString(dataChannel.getBytesWritten()))
- .build();
- }
-
- @Override
- public CompletableFuture<RaftClientReply> setConfigurationAsync(SetConfigurationRequest request) {
- return null;
- }
-
- @Override
- public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
- throws IOException {
- return null;
- }
-
- @Override
- public GroupListReply getGroupList(GroupListRequest request) {
- return null;
- }
-
- @Override
- public GroupInfoReply getGroupInfo(GroupInfoRequest request) {
- return null;
- }
-
- @Override
- public RaftClientReply groupManagement(GroupManagementRequest request) {
- return null;
- }
-
- @Override
- public CompletableFuture<GroupListReply> getGroupListAsync(GroupListRequest request) {
- return null;
- }
-
- @Override
- public CompletableFuture<GroupInfoReply> getGroupInfoAsync(GroupInfoRequest request) {
- return null;
- }
-
- @Override
- public CompletableFuture<RaftClientReply> groupManagementAsync(GroupManagementRequest request) {
- return null;
- }
-
- @Override
- public void close() {
- }
-
- @Override
- public Iterable<RaftGroupId> getGroupIds() {
- return null;
- }
-
- @Override
- public Iterable<RaftGroup> getGroups() {
- return null;
- }
-
- @Override
- public ServerFactory getFactory() {
- return null;
- }
-
- @Override
- public void start() {
- }
-
- @Override
- public LifeCycle.State getLifeCycleState() {
- return null;
- }
- }
-
-
- protected void setup(int numServers){
- final List<RaftPeer> peers = Arrays.stream(MiniRaftCluster.generateIds(numServers, 0))
- .map(RaftPeerId::valueOf)
- .map(id -> RaftPeer.newBuilder().setId(id).setDataStreamAddress(NetUtils.createLocalServerAddress()).build())
- .collect(Collectors.toList());
-
- List<RaftServer> raftServers = new ArrayList<>();
- peers.forEach(peer -> raftServers.add(newRaftServer(peer, properties)));
- setup(RaftGroupId.randomId(), peers, raftServers);
- }
-
-
void setup(RaftGroupId groupId, List<RaftPeer> peers, List<RaftServer> raftServers) {
raftGroup = RaftGroup.valueOf(groupId, peers);
this.peers = peers;
@@ -410,14 +109,6 @@ abstract class DataStreamBaseTest extends BaseTest {
return otherPeers;
}
- RaftClient newRaftClientForDataStream() {
- return RaftClient.newBuilder()
- .setRaftGroup(raftGroup)
- .setPrimaryDataStreamServer(getPrimaryServer().getPeer())
- .setProperties(properties)
- .build();
- }
-
RaftClient newRaftClientForDataStream(ClientId clientId) {
return RaftClient.newBuilder()
.setClientId(clientId)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
index 7444c12..cc0c4d9 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamDisabled.java
@@ -17,38 +17,39 @@
*/
package org.apache.ratis.datastream;
+import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.DisabledDataStreamClientFactory;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
-import org.junit.Before;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-public class TestDataStreamDisabled extends DataStreamBaseTest {
+public class TestDataStreamDisabled extends BaseTest {
@Rule
public final ExpectedException exception = ExpectedException.none();
- @Before
- public void setup() {
- properties = new RaftProperties();
- RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.DISABLED);
- }
-
@Test
- public void testDataStreamDisabled() throws Exception {
- try {
- setup(1);
- final RaftClient client = newRaftClientForDataStream();
+ public void testDataStreamDisabled() {
+ final RaftProperties properties = new RaftProperties();
+ Assert.assertEquals(SupportedDataStreamType.DISABLED, RaftConfigKeys.DataStream.type(properties, LOG::info));
+
+ final RaftPeer server = RaftPeer.newBuilder().setId("s0").build();
+ final RaftClient client = RaftClient.newBuilder()
+ .setRaftGroup(RaftGroup.valueOf(RaftGroupId.randomId(), server))
+ .setProperties(properties)
+ .build();
+
exception.expect(UnsupportedOperationException.class);
exception.expectMessage(DisabledDataStreamClientFactory.class.getName()
+ "$1 does not support streamAsync");
// stream() will create a header request, thus it will hit UnsupportedOperationException due to
// DisabledDataStreamFactory.
client.getDataStreamApi().stream();
- } finally {
- shutdown();
- }
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index 9b4e36b..39dbf4b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -22,6 +22,7 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.AsyncRpcApi;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
@@ -31,7 +32,9 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.util.NetUtils;
import org.junit.Before;
import org.junit.Test;
@@ -65,11 +68,19 @@ public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
RaftConfigKeys.DataStream.setType(properties, SupportedDataStreamType.NETTY);
}
- @Override
- protected MyRaftServer newRaftServer(RaftPeer peer, RaftProperties properties) {
- final RaftProperties p = new RaftProperties(properties);
- NettyConfigKeys.DataStream.setPort(p, NetUtils.createSocketAddr(peer.getDataStreamAddress()).getPort());
- return super.newRaftServer(peer, p);
+ RaftServer.Division mockDivision(RaftServer server, RaftClient client) {
+ final RaftServer.Division division = mock(RaftServer.Division.class);
+ when(division.getRaftServer()).thenReturn(server);
+ when(division.getRaftClient()).thenReturn(client);
+ when(division.getRaftConf()).thenAnswer(i -> getRaftConf());
+
+ final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
+ when(division.getStateMachine()).thenReturn(stateMachine);
+
+ final DataStreamMap streamMap = RaftServerTestUtil.newDataStreamMap(server.getId());
+ when(division.getDataStreamMap()).thenReturn(streamMap);
+
+ return division;
}
private void testMockCluster(int numServers, RaftException leaderException,
@@ -94,15 +105,14 @@ public class TestNettyDataStreamWithMock extends DataStreamBaseTest {
when(raftServer.getId()).thenReturn(peerId);
when(raftServer.getPeer()).thenReturn(RaftPeer.newBuilder().setId(peerId).build());
if (getStateMachineException == null) {
- MyDivision myDivision = new MyDivision(raftServer);
- when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
-
RaftClient client = Mockito.mock(RaftClient.class);
when(client.getId()).thenReturn(clientId);
- myDivision.setRaftClient(client);
AsyncRpcApi asyncRpcApi = Mockito.mock(AsyncRpcApi.class);
when(client.async()).thenReturn(asyncRpcApi);
+ final RaftServer.Division myDivision = mockDivision(raftServer, client);
+ when(raftServer.getDivision(Mockito.any(RaftGroupId.class))).thenReturn(myDivision);
+
if (submitException != null) {
when(asyncRpcApi.sendForward(Mockito.any(RaftClientRequest.class))).thenThrow(submitException);
} else if (i == 0) {