You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/16 07:34:34 UTC

[GitHub] [iotdb] SzyWilliam opened a new pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

SzyWilliam opened a new pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255


   
   # Description
   In this PR, I create a basic Ratis-based Consensus Implementation with the only functionality of 
   * add new group
   * read
   * write
   
   It is now only a prototype, and many are to be refined. (see TODOs)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832320853



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest applicationRequest = null;
+
+    // if this server is leader
+    // it will first try to obtain applicationRequest from transaction context
+    if (trx.getClientRequest() != null
+        && trx.getClientRequest().getMessage() instanceof RequestMessage) {
+      RequestMessage requestMessage = (RequestMessage) trx.getClientRequest().getMessage();
+      applicationRequest = requestMessage.getActualRequest();
+    } else {
+      applicationRequest =
+          new ByteBufferConsensusRequest(
+              log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    }
+
+    assert applicationRequest != null;

Review comment:
       Done, return error here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832330261



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();

Review comment:
       Done, removed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] OneSizeFitsQuorum commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
OneSizeFitsQuorum commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832378091



##########
File path: consensus/pom.xml
##########
@@ -35,7 +35,16 @@
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-server</artifactId>
             <version>2.2.0</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>

Review comment:
       Got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830708934



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusWrongGroupException.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusWrongGroupException extends ConsensusException {

Review comment:
       I'll make it PeerNotServeGroupException, indicating that this peer does not serve the requested consensus group




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] CRZbulabula commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
CRZbulabula commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830723592



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,457 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;

Review comment:
       Should better not import *. You can change the config of IDEA.

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,457 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;

Review comment:
       Should better not import *. You can change the config of IDEA.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47410546/badge)](https://coveralls.io/builds/47410546)
   
   Coverage increased (+0.01%) to 65.357% when pulling **8804ee06443b773e1fcda40cd471249874e8a64d on SzyWilliam:consensus_ratis** into **5a88650415bf786ead97679874d8eea442835cf7 on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r831070425



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,457 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException(e))
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+
+    RaftGroup group = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (group == null || !group.getPeers().contains(myself)) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+
+      RaftClientRequest clientRequest =
+          RaftClientRequest.newBuilder()
+              .setServerId(server.getId())
+              .setClientId(localFakeId)
+              .setGroupId(Utils.toRatisGroupId(groupId))
+              .setCallId(localFakeCallId.incrementAndGet())
+              .setMessage(Message.valueOf(ByteString.copyFrom(request.getContent())))
+              .setType(RaftClientRequest.staleReadRequestType(0))
+              .build();
+
+      reply = server.submitClientRequest(clientRequest);
+    } catch (IOException e) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new RatisRequestFailedException(e))
+          .build();
+    }
+
+    Message ret = reply.getMessage();
+    assert ret instanceof ReadLocalMessage;

Review comment:
       Thx, I'll use if & throw at IStateMachine. Here, I'm sure _ret_ must be an instance of ReadLocalMessage, since _ret_ is only returned from ApplicationStateMachineProxy.query. I put assert here to ensure this. If this assert fails, it MUST be a program bug.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] OneSizeFitsQuorum commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
OneSizeFitsQuorum commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r831830416



##########
File path: consensus/pom.xml
##########
@@ -35,7 +35,16 @@
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-server</artifactId>
             <version>2.2.0</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>

Review comment:
       Do we use this dependency?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest applicationRequest = null;
+
+    // if this server is leader
+    // it will first try to obtain applicationRequest from transaction context
+    if (trx.getClientRequest() != null
+        && trx.getClientRequest().getMessage() instanceof RequestMessage) {
+      RequestMessage requestMessage = (RequestMessage) trx.getClientRequest().getMessage();
+      applicationRequest = requestMessage.getActualRequest();
+    } else {
+      applicationRequest =
+          new ByteBufferConsensusRequest(
+              log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    }
+
+    assert applicationRequest != null;
+
+    TSStatus result = applicationStateMachine.write(applicationRequest);
+    Message ret = new ResponseMessage(result);
+
+    return CompletableFuture.completedFuture(ret);
+  }
+
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    assert request instanceof RequestMessage;

Review comment:
       same as above

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisRequestFailedException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+public class RatisRequestFailedException extends ConsensusException {
+
+  public RatisRequestFailedException(Exception cause) {
+    super("Ratis request failed", cause);

Review comment:
       Capitalize all exception message's first letter to make them uniform

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest applicationRequest = null;
+
+    // if this server is leader
+    // it will first try to obtain applicationRequest from transaction context
+    if (trx.getClientRequest() != null
+        && trx.getClientRequest().getMessage() instanceof RequestMessage) {
+      RequestMessage requestMessage = (RequestMessage) trx.getClientRequest().getMessage();
+      applicationRequest = requestMessage.getActualRequest();
+    } else {
+      applicationRequest =
+          new ByteBufferConsensusRequest(
+              log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    }
+
+    assert applicationRequest != null;

Review comment:
       Instead of using assert, we prefer to return an error TSStatus. This allows the cluster to provide services even if one log has a problem.

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerNotInConsensusGroupException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class PeerNotInConsensusGroupException extends ConsensusException {
+  public PeerNotInConsensusGroupException(ConsensusGroupId groupId) {
+    super(String.format("peer is not in group %d", groupId.getId()));

Review comment:
       Which peer? Maybe we should indicate it's current node.

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will

Review comment:
       The comment is not smooth, maybe lack of `, `

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+  private static final int tempBufferSize = 1024;
+  private static final String IOTDB_RATIS_KEY = "iotdb@_ratis_key";
+  private static final SecretKeySpec key = new SecretKeySpec(IOTDB_RATIS_KEY.getBytes(), "AES");
+
+  public static String IP_PORT(Endpoint endpoint) {
+    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
+  }
+
+  public static String groupFullName(ConsensusGroupId consensusGroupId) {
+    return String.format("%s-%d", consensusGroupId.getType().toString(), consensusGroupId.getId());
+  }
+
+  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+    String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+    return RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+  }
+
+  public static RaftPeer toRaftPeer(Peer peer) {
+    return toRaftPeer(peer.getEndpoint());
+  }
+
+  public static Endpoint getEndPoint(RaftPeer raftPeer) {
+    String address = raftPeer.getAddress(); // ip:port
+    String[] split = address.split(":");
+    return new Endpoint(split[0], Integer.parseInt(split[1]));
+  }
+
+  /**
+   * Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme:
+   * AES/ECB/PKCS5Padding of (GroupType-Id), key = iotdb@_ratis_key
+   */
+  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+    String groupFullName = groupFullName(consensusGroupId);
+
+    byte[] AESEncrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.ENCRYPT_MODE, key);

Review comment:
       Is this too heavy?Does it affect performance? Maybe just splicing is enough?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
+  /**
+   * write will first send request to local server use method call if local server is not leader, it
+   * will use RaftClient to send RPC to read leader
+   */
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    // pre-condition: group exists and myself server serves this group
+    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failedWrite(new ConsensusGroupNotExistException(groupId));
+    }
+
+    // serialize request into Message
+    Message message = new RequestMessage(IConsensusRequest);
+
+    // 1. first try the local server
+    RaftClientRequest clientRequest = buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+    RaftClientReply localServerReply = null;
+    RaftPeer suggestedLeader = null;
+    try {
+      localServerReply = server.submitClientRequest(clientRequest);
+      if (localServerReply.isSuccess()) {
+        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+      }
+
+      NotLeaderException ex = localServerReply.getNotLeaderException();
+      if (ex != null) { // local server is not leader
+        suggestedLeader = ex.getSuggestedLeader();
+      }
+    } catch (IOException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    // 2. try raft client
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    if (suggestedLeader != null) {
+      TSStatus sub = new TSStatus();
+      sub.setMessage(suggestedLeader.getAddress());
+      if (writeResult.subStatus == null) {
+        writeResult.subStatus = new ArrayList<>();
+      }
+      writeResult.subStatus.add(sub);
+    }
+
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+
+    RaftGroup group = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (group == null || !group.getPeers().contains(myself)) {
+      return failedRead(new ConsensusGroupNotExistException(groupId));
+    }
+
+    RaftClientReply reply = null;
+    try {
+      RequestMessage message = new RequestMessage(IConsensusRequest);
+
+      RaftClientRequest clientRequest =
+          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0));
+
+      reply = server.submitClientRequest(clientRequest);
+    } catch (IOException e) {
+      return failedRead(new RatisRequestFailedException(e));
+    }
+
+    Message ret = reply.getMessage();
+    assert ret instanceof ResponseMessage;

Review comment:
       same as above

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+  private static final int tempBufferSize = 1024;
+  private static final String IOTDB_RATIS_KEY = "iotdb@_ratis_key";
+  private static final SecretKeySpec key = new SecretKeySpec(IOTDB_RATIS_KEY.getBytes(), "AES");
+
+  public static String IP_PORT(Endpoint endpoint) {
+    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
+  }
+
+  public static String groupFullName(ConsensusGroupId consensusGroupId) {
+    return String.format("%s-%d", consensusGroupId.getType().toString(), consensusGroupId.getId());
+  }
+
+  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+    String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+    return RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+  }
+
+  public static RaftPeer toRaftPeer(Peer peer) {
+    return toRaftPeer(peer.getEndpoint());
+  }
+
+  public static Endpoint getEndPoint(RaftPeer raftPeer) {
+    String address = raftPeer.getAddress(); // ip:port
+    String[] split = address.split(":");
+    return new Endpoint(split[0], Integer.parseInt(split[1]));
+  }
+
+  /**
+   * Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme:
+   * AES/ECB/PKCS5Padding of (GroupType-Id), key = iotdb@_ratis_key
+   */
+  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+    String groupFullName = groupFullName(consensusGroupId);
+
+    byte[] AESEncrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.ENCRYPT_MODE, key);
+      AESEncrypted = cipher.doFinal(groupFullName.getBytes());
+    } catch (Exception ignored) {
+    }
+
+    return RaftGroupId.valueOf(ByteString.copyFrom(AESEncrypted));
+  }
+
+  /** Given raftGroupId, decrypt ConsensusGroupId out of it */
+  public static ConsensusGroupId toConsensusGroupId(RaftGroupId raftGroupId) {
+    byte[] AESDecrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");

Review comment:
       Same as above

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?

Review comment:
       Just do it

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
+  /**
+   * write will first send request to local server use method call if local server is not leader, it
+   * will use RaftClient to send RPC to read leader
+   */
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    // pre-condition: group exists and myself server serves this group
+    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failedWrite(new ConsensusGroupNotExistException(groupId));
+    }
+
+    // serialize request into Message
+    Message message = new RequestMessage(IConsensusRequest);
+
+    // 1. first try the local server
+    RaftClientRequest clientRequest = buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+    RaftClientReply localServerReply = null;
+    RaftPeer suggestedLeader = null;
+    try {
+      localServerReply = server.submitClientRequest(clientRequest);
+      if (localServerReply.isSuccess()) {
+        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+      }
+
+      NotLeaderException ex = localServerReply.getNotLeaderException();
+      if (ex != null) { // local server is not leader
+        suggestedLeader = ex.getSuggestedLeader();
+      }
+    } catch (IOException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    // 2. try raft client
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    if (suggestedLeader != null) {
+      TSStatus sub = new TSStatus();
+      sub.setMessage(suggestedLeader.getAddress());

Review comment:
       `SetRedirectNode`

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
+  /**
+   * write will first send request to local server use method call if local server is not leader, it
+   * will use RaftClient to send RPC to read leader
+   */
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    // pre-condition: group exists and myself server serves this group
+    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failedWrite(new ConsensusGroupNotExistException(groupId));
+    }
+
+    // serialize request into Message
+    Message message = new RequestMessage(IConsensusRequest);
+
+    // 1. first try the local server
+    RaftClientRequest clientRequest = buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+    RaftClientReply localServerReply = null;
+    RaftPeer suggestedLeader = null;
+    try {
+      localServerReply = server.submitClientRequest(clientRequest);
+      if (localServerReply.isSuccess()) {
+        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+      }
+
+      NotLeaderException ex = localServerReply.getNotLeaderException();
+      if (ex != null) { // local server is not leader
+        suggestedLeader = ex.getSuggestedLeader();
+      }
+    } catch (IOException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    // 2. try raft client
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    if (suggestedLeader != null) {
+      TSStatus sub = new TSStatus();
+      sub.setMessage(suggestedLeader.getAddress());
+      if (writeResult.subStatus == null) {
+        writeResult.subStatus = new ArrayList<>();
+      }
+      writeResult.subStatus.add(sub);

Review comment:
       Do not need to create a new one, just SetRedirectNode to writeResult is enough

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();

Review comment:
       Remove this?
   <img width="605" alt="image" src="https://user-images.githubusercontent.com/32640567/159490226-8a68bb8c-9797-4077-8d42-d5547db14c2f.png">
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r833009908



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private Map<RaftGroupId, RaftClient> clientMap;
+
+  private Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private final Function<IConsensusRequest, ByteBuffer> reqSerializer;
+  private final Function<ByteBuffer, IConsensusRequest> reqDeserializer;
+  private final Function<DataSet, ByteBuffer> dataSetSerializer;
+  private final Function<ByteBuffer, DataSet> datasetDeserializer;
+
+  private RaftClient buildClient(RaftGroupId groupId) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(raftGroupMap.get(groupId))
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward
+    try {
+      server.start();
+    } catch (IOException exception) {
+      exception.printStackTrace();

Review comment:
       Done

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private Map<RaftGroupId, RaftClient> clientMap;
+
+  private Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private final Function<IConsensusRequest, ByteBuffer> reqSerializer;
+  private final Function<ByteBuffer, IConsensusRequest> reqDeserializer;
+  private final Function<DataSet, ByteBuffer> dataSetSerializer;
+  private final Function<ByteBuffer, DataSet> datasetDeserializer;
+
+  private RaftClient buildClient(RaftGroupId groupId) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(raftGroupMap.get(groupId))
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward
+    try {
+      server.start();
+    } catch (IOException exception) {
+      exception.printStackTrace();
+      try {
+        server.close();
+      } catch (IOException ioException) {
+        ioException.printStackTrace();

Review comment:
       Done

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;

Review comment:
       Done

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830712126



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(
+      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;

Review comment:
       RatisConsensus requires user Request implements the ByteBufferRequest, otherwise there's no way for RatisConsensus to serialize and deserialize a common IConsensusRequest to ByteString




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r831076618



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,457 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;

Review comment:
       done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832321111



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest applicationRequest = null;
+
+    // if this server is leader
+    // it will first try to obtain applicationRequest from transaction context
+    if (trx.getClientRequest() != null
+        && trx.getClientRequest().getMessage() instanceof RequestMessage) {
+      RequestMessage requestMessage = (RequestMessage) trx.getClientRequest().getMessage();
+      applicationRequest = requestMessage.getActualRequest();
+    } else {
+      applicationRequest =
+          new ByteBufferConsensusRequest(
+              log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    }
+
+    assert applicationRequest != null;
+
+    TSStatus result = applicationStateMachine.write(applicationRequest);
+    Message ret = new ResponseMessage(result);
+
+    return CompletableFuture.completedFuture(ret);
+  }
+
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    assert request instanceof RequestMessage;

Review comment:
       Done, removed the assert




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls commented on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls commented on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47409390/badge)](https://coveralls.io/builds/47409390)
   
   Coverage increased (+0.009%) to 65.356% when pulling **c67b5d4401d49e6864b8aaacf17980aca7333fb7 on SzyWilliam:consensus_ratis** into **5a88650415bf786ead97679874d8eea442835cf7 on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47410427/badge)](https://coveralls.io/builds/47410427)
   
   Coverage increased (+0.02%) to 65.367% when pulling **8804ee06443b773e1fcda40cd471249874e8a64d on SzyWilliam:consensus_ratis** into **5a88650415bf786ead97679874d8eea442835cf7 on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47449714/badge)](https://coveralls.io/builds/47449714)
   
   Coverage increased (+0.08%) to 65.409% when pulling **095844230b8bd6a590c6d19f9c34fa40e0f96079 on SzyWilliam:consensus_ratis** into **5edf9e966693f0f1be4f0d8e11ee0c6dde5eb02d on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832301795



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest applicationRequest = null;
+
+    // if this server is leader
+    // it will first try to obtain applicationRequest from transaction context
+    if (trx.getClientRequest() != null
+        && trx.getClientRequest().getMessage() instanceof RequestMessage) {
+      RequestMessage requestMessage = (RequestMessage) trx.getClientRequest().getMessage();
+      applicationRequest = requestMessage.getActualRequest();
+    } else {
+      applicationRequest =
+          new ByteBufferConsensusRequest(
+              log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    }
+
+    assert applicationRequest != null;

Review comment:
       OK, I'll return an error TSStatus here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832297023



##########
File path: consensus/pom.xml
##########
@@ -35,7 +35,16 @@
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-server</artifactId>
             <version>2.2.0</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>

Review comment:
       The Ratis requires this dependency, otherwise it won't work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] CRZbulabula commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
CRZbulabula commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830725398



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(
+      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;

Review comment:
       Then, use if statement and throw exceptions would better~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832319384



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerNotInConsensusGroupException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class PeerNotInConsensusGroupException extends ConsensusException {
+  public PeerNotInConsensusGroupException(ConsensusGroupId groupId) {
+    super(String.format("peer is not in group %d", groupId.getId()));

Review comment:
       Done. I add peer's EndPoint to the exception message

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisRequestFailedException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+public class RatisRequestFailedException extends ConsensusException {
+
+  public RatisRequestFailedException(Exception cause) {
+    super("Ratis request failed", cause);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832322697



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
+  /**
+   * write will first send request to local server use method call if local server is not leader, it
+   * will use RaftClient to send RPC to read leader
+   */
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    // pre-condition: group exists and myself server serves this group
+    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failedWrite(new ConsensusGroupNotExistException(groupId));
+    }
+
+    // serialize request into Message
+    Message message = new RequestMessage(IConsensusRequest);
+
+    // 1. first try the local server
+    RaftClientRequest clientRequest = buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+    RaftClientReply localServerReply = null;
+    RaftPeer suggestedLeader = null;
+    try {
+      localServerReply = server.submitClientRequest(clientRequest);
+      if (localServerReply.isSuccess()) {
+        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+      }
+
+      NotLeaderException ex = localServerReply.getNotLeaderException();
+      if (ex != null) { // local server is not leader
+        suggestedLeader = ex.getSuggestedLeader();
+      }
+    } catch (IOException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    // 2. try raft client
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    if (suggestedLeader != null) {
+      TSStatus sub = new TSStatus();
+      sub.setMessage(suggestedLeader.getAddress());
+      if (writeResult.subStatus == null) {
+        writeResult.subStatus = new ArrayList<>();
+      }
+      writeResult.subStatus.add(sub);
+    }
+
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+
+    RaftGroup group = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (group == null || !group.getPeers().contains(myself)) {
+      return failedRead(new ConsensusGroupNotExistException(groupId));
+    }
+
+    RaftClientReply reply = null;
+    try {
+      RequestMessage message = new RequestMessage(IConsensusRequest);
+
+      RaftClientRequest clientRequest =
+          buildRawRequest(groupId, message, RaftClientRequest.staleReadRequestType(0));
+
+      reply = server.submitClientRequest(clientRequest);
+    } catch (IOException e) {
+      return failedRead(new RatisRequestFailedException(e));
+    }
+
+    Message ret = reply.getMessage();
+    assert ret instanceof ResponseMessage;

Review comment:
       Done, removed here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832317930



##########
File path: consensus/pom.xml
##########
@@ -35,7 +35,16 @@
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-server</artifactId>
             <version>2.2.0</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>

Review comment:
       Ratis requires a metric dependency to monitor its internal statistics. Remove this dependency then the test will fail.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] OneSizeFitsQuorum merged pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
OneSizeFitsQuorum merged pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832322466



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
+  /**
+   * write will first send request to local server use method call if local server is not leader, it
+   * will use RaftClient to send RPC to read leader
+   */
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    // pre-condition: group exists and myself server serves this group
+    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failedWrite(new ConsensusGroupNotExistException(groupId));
+    }
+
+    // serialize request into Message
+    Message message = new RequestMessage(IConsensusRequest);
+
+    // 1. first try the local server
+    RaftClientRequest clientRequest = buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+    RaftClientReply localServerReply = null;
+    RaftPeer suggestedLeader = null;
+    try {
+      localServerReply = server.submitClientRequest(clientRequest);
+      if (localServerReply.isSuccess()) {
+        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+      }
+
+      NotLeaderException ex = localServerReply.getNotLeaderException();
+      if (ex != null) { // local server is not leader
+        suggestedLeader = ex.getSuggestedLeader();
+      }
+    } catch (IOException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    // 2. try raft client
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    if (suggestedLeader != null) {
+      TSStatus sub = new TSStatus();
+      sub.setMessage(suggestedLeader.getAddress());

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] wangchao316 commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
wangchao316 commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r828107880



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;

Review comment:
       does not import *

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;

Review comment:
       the same as above

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private Map<RaftGroupId, RaftClient> clientMap;
+
+  private Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private final Function<IConsensusRequest, ByteBuffer> reqSerializer;
+  private final Function<ByteBuffer, IConsensusRequest> reqDeserializer;
+  private final Function<DataSet, ByteBuffer> dataSetSerializer;
+  private final Function<ByteBuffer, DataSet> datasetDeserializer;
+
+  private RaftClient buildClient(RaftGroupId groupId) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(raftGroupMap.get(groupId))
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward
+    try {
+      server.start();
+    } catch (IOException exception) {
+      exception.printStackTrace();
+      try {
+        server.close();
+      } catch (IOException ioException) {
+        ioException.printStackTrace();

Review comment:
       please use Log.error()

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private Map<RaftGroupId, RaftClient> clientMap;
+
+  private Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private final Function<IConsensusRequest, ByteBuffer> reqSerializer;
+  private final Function<ByteBuffer, IConsensusRequest> reqDeserializer;
+  private final Function<DataSet, ByteBuffer> dataSetSerializer;
+  private final Function<ByteBuffer, DataSet> datasetDeserializer;
+
+  private RaftClient buildClient(RaftGroupId groupId) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(raftGroupMap.get(groupId))
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    return builder.build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward
+    try {
+      server.start();
+    } catch (IOException exception) {
+      exception.printStackTrace();

Review comment:
       please use Log.error()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832361259



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+  private static final int tempBufferSize = 1024;
+  private static final String IOTDB_RATIS_KEY = "iotdb@_ratis_key";
+  private static final SecretKeySpec key = new SecretKeySpec(IOTDB_RATIS_KEY.getBytes(), "AES");
+
+  public static String IP_PORT(Endpoint endpoint) {
+    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
+  }
+
+  public static String groupFullName(ConsensusGroupId consensusGroupId) {
+    return String.format("%s-%d", consensusGroupId.getType().toString(), consensusGroupId.getId());
+  }
+
+  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+    String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+    return RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+  }
+
+  public static RaftPeer toRaftPeer(Peer peer) {
+    return toRaftPeer(peer.getEndpoint());
+  }
+
+  public static Endpoint getEndPoint(RaftPeer raftPeer) {
+    String address = raftPeer.getAddress(); // ip:port
+    String[] split = address.split(":");
+    return new Endpoint(split[0], Integer.parseInt(split[1]));
+  }
+
+  /**
+   * Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme:
+   * AES/ECB/PKCS5Padding of (GroupType-Id), key = iotdb@_ratis_key
+   */
+  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+    String groupFullName = groupFullName(consensusGroupId);
+
+    byte[] AESEncrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.ENCRYPT_MODE, key);
+      AESEncrypted = cipher.doFinal(groupFullName.getBytes());
+    } catch (Exception ignored) {
+    }
+
+    return RaftGroupId.valueOf(ByteString.copyFrom(AESEncrypted));
+  }
+
+  /** Given raftGroupId, decrypt ConsensusGroupId out of it */
+  public static ConsensusGroupId toConsensusGroupId(RaftGroupId raftGroupId) {
+    byte[] AESDecrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832333281



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?

Review comment:
       Done. I've change the method signature




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830711550



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC

Review comment:
       That's true. It can be implemented together with method call optimization




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47481202/badge)](https://coveralls.io/builds/47481202)
   
   Coverage increased (+0.1%) to 65.432% when pulling **d679b5dc99a8ff5f9db9e97306b1466ac08907f7 on SzyWilliam:consensus_ratis** into **5edf9e966693f0f1be4f0d8e11ee0c6dde5eb02d on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] CRZbulabula commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
CRZbulabula commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830726570



##########
File path: consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.*;

Review comment:
       Should better not import *. You can change the config of IDEA.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832306599



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
+  /**
+   * write will first send request to local server use method call if local server is not leader, it
+   * will use RaftClient to send RPC to read leader
+   */
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    // pre-condition: group exists and myself server serves this group
+    RaftGroup raftGroup = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failedWrite(new ConsensusGroupNotExistException(groupId));
+    }
+
+    // serialize request into Message
+    Message message = new RequestMessage(IConsensusRequest);
+
+    // 1. first try the local server
+    RaftClientRequest clientRequest = buildRawRequest(groupId, message, RaftClientRequest.writeRequestType());
+    RaftClientReply localServerReply = null;
+    RaftPeer suggestedLeader = null;
+    try {
+      localServerReply = server.submitClientRequest(clientRequest);
+      if (localServerReply.isSuccess()) {
+        ResponseMessage responseMessage = (ResponseMessage) localServerReply.getMessage();
+        TSStatus writeStatus = (TSStatus) responseMessage.getContentHolder();
+        return ConsensusWriteResponse.newBuilder().setStatus(writeStatus).build();
+      }
+
+      NotLeaderException ex = localServerReply.getNotLeaderException();
+      if (ex != null) { // local server is not leader
+        suggestedLeader = ex.getSuggestedLeader();
+      }
+    } catch (IOException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    // 2. try raft client
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return failedWrite(new RatisRequestFailedException(e));
+    }
+
+    if (suggestedLeader != null) {
+      TSStatus sub = new TSStatus();
+      sub.setMessage(suggestedLeader.getAddress());
+      if (writeResult.subStatus == null) {
+        writeResult.subStatus = new ArrayList<>();
+      }
+      writeResult.subStatus.add(sub);

Review comment:
       OK




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832333749



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {

Review comment:
       I'll throw the exception out now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47449136/badge)](https://coveralls.io/builds/47449136)
   
   Coverage increased (+0.1%) to 65.435% when pulling **095844230b8bd6a590c6d19f9c34fa40e0f96079 on SzyWilliam:consensus_ratis** into **5edf9e966693f0f1be4f0d8e11ee0c6dde5eb02d on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] OneSizeFitsQuorum commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
OneSizeFitsQuorum commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830620194



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusWrongGroupException.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusWrongGroupException extends ConsensusException {
+
+  public ConsensusWrongGroupException(ConsensusGroupId groupId) {
+    super(String.format("peer don't server group %d", groupId.getId()));

Review comment:
       Typo `server`

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?

Review comment:
       I think `add IOException to method signature` is fine

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =
+        new ByteBufferConsensusRequest(
+            log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    TSStatus result = applicationStateMachine.write(request);
+
+    Message ret = null;
+    try {
+      ByteBuffer serializedStatus = Utils.serializeTSStatus(result);
+      ret = Message.valueOf(ByteString.copyFrom(serializedStatus));
+    } catch (TException ignored) {
+    }
+
+    return CompletableFuture.completedFuture(ret);
+  }
+
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    IConsensusRequest req =
+        new ByteBufferConsensusRequest(request.getContent().asReadOnlyByteBuffer());

Review comment:
       Maybe we can do some optimization to avoid deserialization in some scenarios

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerNotInGroupException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class PeerNotInGroupException extends ConsensusException {

Review comment:
       How about using`PeerNotInConsensusGroupException` to unify current naming?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerAlreadyInGroupException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class PeerAlreadyInGroupException extends ConsensusException {

Review comment:
       How about using `PeerAlreadyInConsensusGroupException` to unify current naming?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisRequestFailedException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+public class RatisRequestFailedException extends ConsensusException {
+
+  public RatisRequestFailedException() {
+    super("ratis client request failed (IOException)");

Review comment:
       It is better to use this constructor(`Exception(String message, Throwable cause)`) to wrap the exception of ratis to locate the problem, otherwise the exception msg is lost
   
   

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =

Review comment:
       Maybe we can do some optimization to avoid deserialization in some scenarios

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {

Review comment:
       What is the current Snapshot strategy and is there a problem with long-term running?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerAlreadyInGroupException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class PeerAlreadyInGroupException extends ConsensusException {
+  public PeerAlreadyInGroupException(ConsensusGroupId groupId, Peer peer) {
+    super(
+        String.format(
+            "peer %s:%d already in group %d",

Review comment:
       lack of verb...

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusWrongGroupException.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusWrongGroupException extends ConsensusException {

Review comment:
       Sounds weird for `ConsensusWrongGroupException`...

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =
+        new ByteBufferConsensusRequest(
+            log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    TSStatus result = applicationStateMachine.write(request);
+
+    Message ret = null;
+    try {
+      ByteBuffer serializedStatus = Utils.serializeTSStatus(result);
+      ret = Message.valueOf(ByteString.copyFrom(serializedStatus));
+    } catch (TException ignored) {
+    }

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC

Review comment:
       Remove TODO?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;

Review comment:
       need to do optimization

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ReadLocalMessage.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+public class ReadLocalMessage implements Message {

Review comment:
       I think we also need to extend a class that contains `IClientRequest` for write requests to avoid deserialization every time

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+  private static final String IOTDB_RATIS_KEY = "iotdb@_ratis_key";
+  private static final SecretKeySpec key = new SecretKeySpec(IOTDB_RATIS_KEY.getBytes(), "AES");
+
+  public static String IP_PORT(Endpoint endpoint) {
+    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
+  }
+
+  public static String groupFullName(ConsensusGroupId consensusGroupId) {
+    return String.format("%s-%d", consensusGroupId.getType().toString(), consensusGroupId.getId());
+  }
+
+  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+    String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+    return RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+  }
+
+  public static RaftPeer toRaftPeer(Peer peer) {
+    return toRaftPeer(peer.getEndpoint());
+  }
+
+  /**
+   * Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme:
+   * AES/ECB/PKCS5Padding of (GroupType-Id), key = iotdb@_ratis_key
+   */
+  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+    String groupFullName = groupFullName(consensusGroupId);
+
+    byte[] AESEncrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.ENCRYPT_MODE, key);
+      AESEncrypted = cipher.doFinal(groupFullName.getBytes());
+    } catch (Exception ignored) {
+    }
+
+    return RaftGroupId.valueOf(ByteString.copyFrom(AESEncrypted));
+  }
+
+  /** Given raftGroupId, decrypt ConsensusGroupId out of it */
+  public static ConsensusGroupId toConsensusGroupId(RaftGroupId raftGroupId) {
+    byte[] AESDecrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.DECRYPT_MODE, key);
+      AESDecrypted = cipher.doFinal(raftGroupId.toByteString().toByteArray());
+    } catch (Exception ignored) {
+    }
+    String consensusGroupString = new String(AESDecrypted);
+    String[] items = consensusGroupString.split("-");
+    return new ConsensusGroupId(GroupType.valueOf(items[0]), Long.parseLong(items[1]));
+  }
+
+  public static ByteBuffer serializeTSStatus(TSStatus status) throws TException {
+    // TODO Pooling ByteBuffer
+    TByteBuffer byteBuffer = new TByteBuffer(ByteBuffer.allocate(1000));

Review comment:
       Do not use Magic Number, make the bufferSize a Constant at least...

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(
+      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;

Review comment:
       why assert this?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {

Review comment:
       All method should be thread-safe. Please check all the functions carefully.

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(

Review comment:
       too heavy for `synchronized`? We have to support the simultaneous reading of different consensus groups

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(
+      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+
+      RaftClientRequest clientRequest =
+          RaftClientRequest.newBuilder()
+              .setServerId(server.getId())
+              .setClientId(localFakeId)
+              .setGroupId(Utils.toRatisGroupId(groupId))
+              .setCallId(localFakeCallId.incrementAndGet())
+              .setMessage(Message.valueOf(ByteString.copyFrom(request.getContent())))
+              .setType(RaftClientRequest.staleReadRequestType(0))
+              .build();
+
+      reply = server.submitClientRequest(clientRequest);
+    } catch (IOException e) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+
+    Message ret = reply.getMessage();
+    assert ret instanceof ReadLocalMessage;
+    ReadLocalMessage readLocalMessage = (ReadLocalMessage) ret;
+
+    return ConsensusReadResponse.newBuilder().setDataSet(readLocalMessage.getDataSet()).build();
   }
 
+  /**
+   * Add this IConsensus Peer into ConsensusGroup(groupId, peers) Caller's responsibility to call
+   * addConsensusGroup to every peer of this group and ensure the group is all up
+   *
+   * <p>underlying Ratis will 1. initialize a RaftServer instance 2. call GroupManagementApi to
+   * register self to the RaftGroup
+   */
   @Override
   public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroup group = buildRaftGroup(groupId, peers);
+    // pre-conditions: myself in this new group
+    if (!group.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    raftGroupMap.put(group.getGroupId(), group);
+
+    // build and store the corresponding client
+    RaftClient client = buildClientAndCache(group);
+
+    // add RaftPeer myself to this RaftGroup
+    RaftClientReply reply = null;
+    try {
+      reply = client.getGroupManagementApi(myself.getId()).add(group);
+    } catch (IOException e) {
+      return failed(new RatisRequestFailedException());
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
+  /**
+   * Remove this IConsensus Peer out of ConsensusGroup(groupId, peers) Caller's responsibility to
+   * call removeConsensusGroup to every peer of this group and ensure the group is fully removed
+   *
+   * <p>underlying Ratis will 1. call GroupManagementApi to unregister self off the RaftGroup 2.
+   * clean up
+   */
   @Override
   public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
+
+    // pre-conditions: group exists and myself in this group
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failed(new PeerNotInGroupException(groupId));
+    }
+
+    RaftClient client = clientMap.get(raftGroupId);
+    // send remove group to myself
+    RaftClientReply reply = null;
+    try {
+      reply = client.getGroupManagementApi(myself.getId()).remove(raftGroupId, false, false);
+    } catch (IOException e) {
+      return failed(new RatisRequestFailedException());
+    }
+
+    if (reply.isSuccess()) {
+      // delete Group information and its corresponding client
+      raftGroupMap.remove(raftGroupId);
+      clientMap.remove(raftGroupId);
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
+  /**
+   * Add a new IConsensus Peer into ConsensusGroup with groupId
+   *
+   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
+   * change
+   */
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    try {
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+    RaftGroup group = raftGroupMap.get(raftGroupId);
+    RaftPeer peerToAdd = Utils.toRaftPeer(peer);
+
+    // pre-conditions: group exists and myself in this group
+    if (group == null || !group.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+
+    // pre-condition: peer not in this group
+    if (group.getPeers().contains(peerToAdd)) {
+      return failed(new PeerAlreadyInGroupException(groupId, peer));
+    }
+
+    List<RaftPeer> newConfig = new ArrayList<>(group.getPeers());
+    newConfig.add(peerToAdd);
+
+    RaftClientReply reply;
+    try {
+      reply = sendReconfiguration(raftGroupId, newConfig);
+
+      // sync again
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
+  /**
+   * Remove IConsensus Peer from ConsensusGroup with groupId
+   *
+   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
+   * change
+   */
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    try {
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+    RaftGroup group = raftGroupMap.get(raftGroupId);
+    RaftPeer peerToRemove = Utils.toRaftPeer(peer);
+
+    // pre-conditions: group exists and myself in this group
+    if (group == null || !group.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    // pre-condition: peer is a member of groupId
+    if (!group.getPeers().contains(peerToRemove)) {
+      return failed(new PeerNotInGroupException(groupId));
+    }
+
+    // update group peer information
+    List<RaftPeer> newConfig =
+        group.getPeers().stream()
+            .filter(raftPeer -> !raftPeer.equals(peerToRemove))
+            .collect(Collectors.toList());
+
+    RaftClientReply reply;
+    try {
+      reply = sendReconfiguration(raftGroupId, newConfig);
+      // sync again
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   @Override
   public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroup raftGroup = buildRaftGroup(groupId, newPeers);
+
+    // pre-conditions: myself in this group
+    if (!raftGroup.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    raftGroupMap.put(raftGroup.getGroupId(), raftGroup);
+
+    // build the client and store it
+    buildClientAndCache(raftGroup);
+
+    // add RaftPeer myself to this RaftGroup
+    RaftClientReply reply = null;
+    try {
+      reply = sendReconfiguration(raftGroup.getGroupId(), new ArrayList<>(raftGroup.getPeers()));
+      // sync again
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException | RatisRequestFailedException e) {
+      return failed(new RatisRequestFailedException());
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   @Override
   public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient client = clientMap.getOrDefault(raftGroupId, null);
+    if (client == null) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader);
+
+    RaftClientReply reply = null;
+    try {
+      // TODO tuning for timeoutMs
+      reply = client.admin().transferLeadership(newRaftLeader.getId(), 2000);
+    } catch (IOException e) {
+      return failed(new RatisRequestFailedException());
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   @Override
   public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
     return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
+
+  private ConsensusGenericResponse failed(ConsensusException e) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).setException(e).build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
+
+  private RaftClient buildClientAndCache(RaftGroup group) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(group)
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    RaftClient client = builder.build();
+    clientMap.put(group.getGroupId(), client);
+    return client;
+  }
+
+  private RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
+      throws IOException {
+
+    this.clientMap = new ConcurrentHashMap<>();
+    this.raftGroupMap = new HashMap<>();

Review comment:
       Why use `HashMap`?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(

Review comment:
       we may set `ConsensusGroupNotExistException` to response in this function but I do not see it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1073417244


   > PTAL.
   > There is still a lot of work to be done
   
   Thanks for the elaborate reviews. I made changes according to your advices and left TODOs on those future optimizations.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832361144



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+  private static final int tempBufferSize = 1024;
+  private static final String IOTDB_RATIS_KEY = "iotdb@_ratis_key";
+  private static final SecretKeySpec key = new SecretKeySpec(IOTDB_RATIS_KEY.getBytes(), "AES");
+
+  public static String IP_PORT(Endpoint endpoint) {
+    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
+  }
+
+  public static String groupFullName(ConsensusGroupId consensusGroupId) {
+    return String.format("%s-%d", consensusGroupId.getType().toString(), consensusGroupId.getId());
+  }
+
+  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+    String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+    return RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+  }
+
+  public static RaftPeer toRaftPeer(Peer peer) {
+    return toRaftPeer(peer.getEndpoint());
+  }
+
+  public static Endpoint getEndPoint(RaftPeer raftPeer) {
+    String address = raftPeer.getAddress(); // ip:port
+    String[] split = address.split(":");
+    return new Endpoint(split[0], Integer.parseInt(split[1]));
+  }
+
+  /**
+   * Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme:
+   * AES/ECB/PKCS5Padding of (GroupType-Id), key = iotdb@_ratis_key
+   */
+  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+    String groupFullName = groupFullName(consensusGroupId);
+
+    byte[] AESEncrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.ENCRYPT_MODE, key);

Review comment:
       Changed to simple padding scheme




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832322085



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,521 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
+import org.apache.iotdb.consensus.exception.RatisRequestFailedException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will

Review comment:
       Spotless eat my newline  :(




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832324012



##########
File path: consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.*;

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] CRZbulabula commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
CRZbulabula commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830725517



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,457 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException(e);
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException(e))
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
   public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+
+    RaftGroup group = raftGroupMap.get(Utils.toRatisGroupId(groupId));
+    if (group == null || !group.getPeers().contains(myself)) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new ConsensusGroupNotExistException(groupId))
+          .build();
+    }
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+
+      RaftClientRequest clientRequest =
+          RaftClientRequest.newBuilder()
+              .setServerId(server.getId())
+              .setClientId(localFakeId)
+              .setGroupId(Utils.toRatisGroupId(groupId))
+              .setCallId(localFakeCallId.incrementAndGet())
+              .setMessage(Message.valueOf(ByteString.copyFrom(request.getContent())))
+              .setType(RaftClientRequest.staleReadRequestType(0))
+              .build();
+
+      reply = server.submitClientRequest(clientRequest);
+    } catch (IOException e) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new RatisRequestFailedException(e))
+          .build();
+    }
+
+    Message ret = reply.getMessage();
+    assert ret instanceof ReadLocalMessage;

Review comment:
       Use if statement and throw an exception~




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] CRZbulabula commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
CRZbulabula commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830723645



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,457 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;

Review comment:
       Should better not import *. You can change the config of IDEA.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832317930



##########
File path: consensus/pom.xml
##########
@@ -35,7 +35,16 @@
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-server</artifactId>
             <version>2.2.0</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>

Review comment:
       Ratis requires a metric dependency to monitor its internal metrics




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830710898



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {

Review comment:
       According to previous research(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=199534813#Codebase为Java的Raft库集成方案调研-Snapshot管理), snapshot will be triggered when log number reaches a maximum of 400000. Since we haven't implemented snapshot management, Ratis will simply make a snapshot of RaftLog without StateMachine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830711595



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ReadLocalMessage.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+public class ReadLocalMessage implements Message {

Review comment:
       That's true. It can be implemented together with method call optimization




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47458338/badge)](https://coveralls.io/builds/47458338)
   
   Coverage increased (+0.1%) to 65.449% when pulling **877bdcbcf1d05fb8db0506cc7cd4d015e20cbe3e on SzyWilliam:consensus_ratis** into **5edf9e966693f0f1be4f0d8e11ee0c6dde5eb02d on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832297023



##########
File path: consensus/pom.xml
##########
@@ -35,7 +35,16 @@
             <groupId>org.apache.ratis</groupId>
             <artifactId>ratis-server</artifactId>
             <version>2.2.0</version>
-            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.ratis</groupId>
+            <artifactId>ratis-grpc</artifactId>
+            <version>2.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>io.dropwizard.metrics</groupId>

Review comment:
       The Ratis requires this dependency, otherwise it won't work




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] coveralls edited a comment on pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
coveralls edited a comment on pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#issuecomment-1068885795


   
   [![Coverage Status](https://coveralls.io/builds/47413585/badge)](https://coveralls.io/builds/47413585)
   
   Coverage increased (+0.04%) to 65.362% when pulling **8b166ca3a92992fb75eb506fe693aaaeb04142a3 on SzyWilliam:consensus_ratis** into **5edf9e966693f0f1be4f0d8e11ee0c6dde5eb02d on apache:master**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832301795



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+  private final Logger logger = LoggerFactory.getLogger(ApplicationStateMachineProxy.class);
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest applicationRequest = null;
+
+    // if this server is leader
+    // it will first try to obtain applicationRequest from transaction context
+    if (trx.getClientRequest() != null
+        && trx.getClientRequest().getMessage() instanceof RequestMessage) {
+      RequestMessage requestMessage = (RequestMessage) trx.getClientRequest().getMessage();
+      applicationRequest = requestMessage.getActualRequest();
+    } else {
+      applicationRequest =
+          new ByteBufferConsensusRequest(
+              log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    }
+
+    assert applicationRequest != null;

Review comment:
       OK, I'll return an error TSStatus here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832298575



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerNotInConsensusGroupException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class PeerNotInConsensusGroupException extends ConsensusException {
+  public PeerNotInConsensusGroupException(ConsensusGroupId groupId) {
+    super(String.format("peer is not in group %d", groupId.getId()));

Review comment:
       ok, add it to the parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r832298575



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerNotInConsensusGroupException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class PeerNotInConsensusGroupException extends ConsensusException {
+  public PeerNotInConsensusGroupException(ConsensusGroupId groupId) {
+    super(String.format("peer is not in group %d", groupId.getId()));

Review comment:
       ok, add it to the parameter




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830710241



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =

Review comment:
       Yeah, I'll add TODO here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [iotdb] SzyWilliam commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

Posted by GitBox <gi...@apache.org>.
SzyWilliam commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r833008364



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;

Review comment:
       OK, I'll fix it in next PR

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,41 +21,147 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;

Review comment:
       OK, I'll fix it in next PR




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org