You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/23 08:53:17 UTC

[GitHub] [iotdb] wangchao316 commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

wangchao316 commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r828107880



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;

Review comment:
       does not import *

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;

Review comment:
       the same as above

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private Map<RaftGroupId, RaftClient> clientMap;
+
+  private Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private final Function<IConsensusRequest, ByteBuffer> reqSerializer;
+  private final Function<ByteBuffer, IConsensusRequest> reqDeserializer;
+  private final Function<DataSet, ByteBuffer> dataSetSerializer;
+  private final Function<ByteBuffer, DataSet> datasetDeserializer;
+
+  private RaftClient buildClient(RaftGroupId groupId) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(raftGroupMap.get(groupId))
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward
+    try {
+      server.start();
+    } catch (IOException exception) {
+      exception.printStackTrace();
+      try {
+        server.close();
+      } catch (IOException ioException) {
+        ioException.printStackTrace();

Review comment:
       please use Log.error()

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private Map<RaftGroupId, RaftClient> clientMap;
+
+  private Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private final Function<IConsensusRequest, ByteBuffer> reqSerializer;
+  private final Function<ByteBuffer, IConsensusRequest> reqDeserializer;
+  private final Function<DataSet, ByteBuffer> dataSetSerializer;
+  private final Function<ByteBuffer, DataSet> datasetDeserializer;
+
+  private RaftClient buildClient(RaftGroupId groupId) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(raftGroupMap.get(groupId))
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward
+    try {
+      server.start();
+    } catch (IOException exception) {
+      exception.printStackTrace();

Review comment:
       please use Log.error()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org