You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@iotdb.apache.org by GitBox <gi...@apache.org> on 2022/03/20 14:22:25 UTC

[GitHub] [iotdb] OneSizeFitsQuorum commented on a change in pull request #5255: [IOTDB-2674] Multi-Raft Consensus Implementation based on Apache Ratis

OneSizeFitsQuorum commented on a change in pull request #5255:
URL: https://github.com/apache/iotdb/pull/5255#discussion_r830620194



##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusWrongGroupException.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusWrongGroupException extends ConsensusException {
+
+  public ConsensusWrongGroupException(ConsensusGroupId groupId) {
+    super(String.format("peer don't server group %d", groupId.getId()));

Review comment:
       Typo `server`

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?

Review comment:
       I think `add IOException to method signature` is fine

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =
+        new ByteBufferConsensusRequest(
+            log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    TSStatus result = applicationStateMachine.write(request);
+
+    Message ret = null;
+    try {
+      ByteBuffer serializedStatus = Utils.serializeTSStatus(result);
+      ret = Message.valueOf(ByteString.copyFrom(serializedStatus));
+    } catch (TException ignored) {
+    }
+
+    return CompletableFuture.completedFuture(ret);
+  }
+
+  @Override
+  public CompletableFuture<Message> query(Message request) {
+    IConsensusRequest req =
+        new ByteBufferConsensusRequest(request.getContent().asReadOnlyByteBuffer());

Review comment:
       Maybe we can do some optimization to avoid deserialization in some scenarios

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerNotInGroupException.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class PeerNotInGroupException extends ConsensusException {

Review comment:
       How about using`PeerNotInConsensusGroupException` to unify current naming?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerAlreadyInGroupException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class PeerAlreadyInGroupException extends ConsensusException {

Review comment:
       How about using `PeerAlreadyInConsensusGroupException` to unify current naming?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/RatisRequestFailedException.java
##########
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+public class RatisRequestFailedException extends ConsensusException {
+
+  public RatisRequestFailedException() {
+    super("ratis client request failed (IOException)");

Review comment:
       It is better to use this constructor(`Exception(String message, Throwable cause)`) to wrap the exception of ratis to locate the problem, otherwise the exception msg is lost
   
   

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =

Review comment:
       Maybe we can do some optimization to avoid deserialization in some scenarios

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {

Review comment:
       What is the current Snapshot strategy and is there a problem with long-term running?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/PeerAlreadyInGroupException.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+
+public class PeerAlreadyInGroupException extends ConsensusException {
+  public PeerAlreadyInGroupException(ConsensusGroupId groupId, Peer peer) {
+    super(
+        String.format(
+            "peer %s:%d already in group %d",

Review comment:
       lack of verb...

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusWrongGroupException.java
##########
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusWrongGroupException extends ConsensusException {

Review comment:
       Sounds weird for `ConsensusWrongGroupException`...

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ApplicationStateMachineProxy.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
+public class ApplicationStateMachineProxy extends BaseStateMachine {
+  private final IStateMachine applicationStateMachine;
+
+  public ApplicationStateMachineProxy(IStateMachine stateMachine) {
+    applicationStateMachine = stateMachine;
+    applicationStateMachine.start();
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    applicationStateMachine.stop();
+  }
+
+  @Override
+  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+    RaftProtos.LogEntryProto log = trx.getLogEntry();
+    updateLastAppliedTermIndex(log.getTerm(), log.getIndex());
+
+    IConsensusRequest request =
+        new ByteBufferConsensusRequest(
+            log.getStateMachineLogEntry().getLogData().asReadOnlyByteBuffer());
+    TSStatus result = applicationStateMachine.write(request);
+
+    Message ret = null;
+    try {
+      ByteBuffer serializedStatus = Utils.serializeTSStatus(result);
+      ret = Message.valueOf(ByteString.copyFrom(serializedStatus));
+    } catch (TException ignored) {
+    }

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC

Review comment:
       Remove TODO?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;

Review comment:
       need to do optimization

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/ReadLocalMessage.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+
+public class ReadLocalMessage implements Message {

Review comment:
       I think we also need to extend a class that contains `IClientRequest` for write requests to avoid deserialization every time

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {

Review comment:
       logger

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/Utils.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.transport.TByteBuffer;
+
+import javax.crypto.Cipher;
+import javax.crypto.spec.SecretKeySpec;
+
+import java.nio.ByteBuffer;
+
+public class Utils {
+  private static final String IOTDB_RATIS_KEY = "iotdb@_ratis_key";
+  private static final SecretKeySpec key = new SecretKeySpec(IOTDB_RATIS_KEY.getBytes(), "AES");
+
+  public static String IP_PORT(Endpoint endpoint) {
+    return String.format("%s:%d", endpoint.getIp(), endpoint.getPort());
+  }
+
+  public static String groupFullName(ConsensusGroupId consensusGroupId) {
+    return String.format("%s-%d", consensusGroupId.getType().toString(), consensusGroupId.getId());
+  }
+
+  public static RaftPeer toRaftPeer(Endpoint endpoint) {
+    String Id = String.format("%s-%d", endpoint.getIp(), endpoint.getPort());
+    return RaftPeer.newBuilder().setId(Id).setAddress(IP_PORT(endpoint)).build();
+  }
+
+  public static RaftPeer toRaftPeer(Peer peer) {
+    return toRaftPeer(peer.getEndpoint());
+  }
+
+  /**
+   * Given ConsensusGroupId, generate a deterministic RaftGroupId current scheme:
+   * AES/ECB/PKCS5Padding of (GroupType-Id), key = iotdb@_ratis_key
+   */
+  public static RaftGroupId toRatisGroupId(ConsensusGroupId consensusGroupId) {
+    String groupFullName = groupFullName(consensusGroupId);
+
+    byte[] AESEncrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.ENCRYPT_MODE, key);
+      AESEncrypted = cipher.doFinal(groupFullName.getBytes());
+    } catch (Exception ignored) {
+    }
+
+    return RaftGroupId.valueOf(ByteString.copyFrom(AESEncrypted));
+  }
+
+  /** Given raftGroupId, decrypt ConsensusGroupId out of it */
+  public static ConsensusGroupId toConsensusGroupId(RaftGroupId raftGroupId) {
+    byte[] AESDecrypted = new byte[] {};
+    try {
+      Cipher cipher = Cipher.getInstance("AES/ECB/PKCS5Padding");
+      cipher.init(Cipher.DECRYPT_MODE, key);
+      AESDecrypted = cipher.doFinal(raftGroupId.toByteString().toByteArray());
+    } catch (Exception ignored) {
+    }
+    String consensusGroupString = new String(AESDecrypted);
+    String[] items = consensusGroupString.split("-");
+    return new ConsensusGroupId(GroupType.valueOf(items[0]), Long.parseLong(items[1]));
+  }
+
+  public static ByteBuffer serializeTSStatus(TSStatus status) throws TException {
+    // TODO Pooling ByteBuffer
+    TByteBuffer byteBuffer = new TByteBuffer(ByteBuffer.allocate(1000));

Review comment:
       Do not use Magic Number, make the bufferSize a Constant at least...

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(
+      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;

Review comment:
       why assert this?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {

Review comment:
       All method should be thread-safe. Please check all the functions carefully.

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(

Review comment:
       too heavy for `synchronized`? We have to support the simultaneous reading of different consensus groups

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(
+      ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+
+    RaftClientReply reply = null;
+    try {
+      assert IConsensusRequest instanceof ByteBufferConsensusRequest;
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+
+      RaftClientRequest clientRequest =
+          RaftClientRequest.newBuilder()
+              .setServerId(server.getId())
+              .setClientId(localFakeId)
+              .setGroupId(Utils.toRatisGroupId(groupId))
+              .setCallId(localFakeCallId.incrementAndGet())
+              .setMessage(Message.valueOf(ByteString.copyFrom(request.getContent())))
+              .setType(RaftClientRequest.staleReadRequestType(0))
+              .build();
+
+      reply = server.submitClientRequest(clientRequest);
+    } catch (IOException e) {
+      return ConsensusReadResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+
+    Message ret = reply.getMessage();
+    assert ret instanceof ReadLocalMessage;
+    ReadLocalMessage readLocalMessage = (ReadLocalMessage) ret;
+
+    return ConsensusReadResponse.newBuilder().setDataSet(readLocalMessage.getDataSet()).build();
   }
 
+  /**
+   * Add this IConsensus Peer into ConsensusGroup(groupId, peers) Caller's responsibility to call
+   * addConsensusGroup to every peer of this group and ensure the group is all up
+   *
+   * <p>underlying Ratis will 1. initialize a RaftServer instance 2. call GroupManagementApi to
+   * register self to the RaftGroup
+   */
   @Override
   public ConsensusGenericResponse addConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroup group = buildRaftGroup(groupId, peers);
+    // pre-conditions: myself in this new group
+    if (!group.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    raftGroupMap.put(group.getGroupId(), group);
+
+    // build and store the corresponding client
+    RaftClient client = buildClientAndCache(group);
+
+    // add RaftPeer myself to this RaftGroup
+    RaftClientReply reply = null;
+    try {
+      reply = client.getGroupManagementApi(myself.getId()).add(group);
+    } catch (IOException e) {
+      return failed(new RatisRequestFailedException());
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
+  /**
+   * Remove this IConsensus Peer out of ConsensusGroup(groupId, peers) Caller's responsibility to
+   * call removeConsensusGroup to every peer of this group and ensure the group is fully removed
+   *
+   * <p>underlying Ratis will 1. call GroupManagementApi to unregister self off the RaftGroup 2.
+   * clean up
+   */
   @Override
   public ConsensusGenericResponse removeConsensusGroup(ConsensusGroupId groupId) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftGroup raftGroup = raftGroupMap.get(raftGroupId);
+
+    // pre-conditions: group exists and myself in this group
+    if (raftGroup == null || !raftGroup.getPeers().contains(myself)) {
+      return failed(new PeerNotInGroupException(groupId));
+    }
+
+    RaftClient client = clientMap.get(raftGroupId);
+    // send remove group to myself
+    RaftClientReply reply = null;
+    try {
+      reply = client.getGroupManagementApi(myself.getId()).remove(raftGroupId, false, false);
+    } catch (IOException e) {
+      return failed(new RatisRequestFailedException());
+    }
+
+    if (reply.isSuccess()) {
+      // delete Group information and its corresponding client
+      raftGroupMap.remove(raftGroupId);
+      clientMap.remove(raftGroupId);
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
+  /**
+   * Add a new IConsensus Peer into ConsensusGroup with groupId
+   *
+   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
+   * change
+   */
   @Override
   public ConsensusGenericResponse addPeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    try {
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+    RaftGroup group = raftGroupMap.get(raftGroupId);
+    RaftPeer peerToAdd = Utils.toRaftPeer(peer);
+
+    // pre-conditions: group exists and myself in this group
+    if (group == null || !group.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+
+    // pre-condition: peer not in this group
+    if (group.getPeers().contains(peerToAdd)) {
+      return failed(new PeerAlreadyInGroupException(groupId, peer));
+    }
+
+    List<RaftPeer> newConfig = new ArrayList<>(group.getPeers());
+    newConfig.add(peerToAdd);
+
+    RaftClientReply reply;
+    try {
+      reply = sendReconfiguration(raftGroupId, newConfig);
+
+      // sync again
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
+  /**
+   * Remove IConsensus Peer from ConsensusGroup with groupId
+   *
+   * <p>underlying Ratis will 1. call the AdminApi to notify group leader of this configuration
+   * change
+   */
   @Override
   public ConsensusGenericResponse removePeer(ConsensusGroupId groupId, Peer peer) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    try {
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+    RaftGroup group = raftGroupMap.get(raftGroupId);
+    RaftPeer peerToRemove = Utils.toRaftPeer(peer);
+
+    // pre-conditions: group exists and myself in this group
+    if (group == null || !group.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    // pre-condition: peer is a member of groupId
+    if (!group.getPeers().contains(peerToRemove)) {
+      return failed(new PeerNotInGroupException(groupId));
+    }
+
+    // update group peer information
+    List<RaftPeer> newConfig =
+        group.getPeers().stream()
+            .filter(raftPeer -> !raftPeer.equals(peerToRemove))
+            .collect(Collectors.toList());
+
+    RaftClientReply reply;
+    try {
+      reply = sendReconfiguration(raftGroupId, newConfig);
+      // sync again
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (RatisRequestFailedException | ConsensusGroupNotExistException e) {
+      return failed(e);
+    }
+
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   @Override
   public ConsensusGenericResponse changePeer(ConsensusGroupId groupId, List<Peer> newPeers) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroup raftGroup = buildRaftGroup(groupId, newPeers);
+
+    // pre-conditions: myself in this group
+    if (!raftGroup.getPeers().contains(myself)) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    raftGroupMap.put(raftGroup.getGroupId(), raftGroup);
+
+    // build the client and store it
+    buildClientAndCache(raftGroup);
+
+    // add RaftPeer myself to this RaftGroup
+    RaftClientReply reply = null;
+    try {
+      reply = sendReconfiguration(raftGroup.getGroupId(), new ArrayList<>(raftGroup.getPeers()));
+      // sync again
+      syncGroupInfoAndRebuildClient(groupId);
+    } catch (ConsensusGroupNotExistException | RatisRequestFailedException e) {
+      return failed(new RatisRequestFailedException());
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   @Override
   public ConsensusGenericResponse transferLeader(ConsensusGroupId groupId, Peer newLeader) {
-    return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient client = clientMap.getOrDefault(raftGroupId, null);
+    if (client == null) {
+      return failed(new ConsensusWrongGroupException(groupId));
+    }
+    RaftPeer newRaftLeader = Utils.toRaftPeer(newLeader);
+
+    RaftClientReply reply = null;
+    try {
+      // TODO tuning for timeoutMs
+      reply = client.admin().transferLeadership(newRaftLeader.getId(), 2000);
+    } catch (IOException e) {
+      return failed(new RatisRequestFailedException());
+    }
+    return ConsensusGenericResponse.newBuilder().setSuccess(reply.isSuccess()).build();
   }
 
   @Override
   public ConsensusGenericResponse triggerSnapshot(ConsensusGroupId groupId) {
     return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
   }
+
+  private ConsensusGenericResponse failed(ConsensusException e) {
+    return ConsensusGenericResponse.newBuilder().setSuccess(false).setException(e).build();
+  }
+
+  private RaftGroup buildRaftGroup(ConsensusGroupId groupId, List<Peer> peers) {
+    List<RaftPeer> raftPeers = peers.stream().map(Utils::toRaftPeer).collect(Collectors.toList());
+    return RaftGroup.valueOf(Utils.toRatisGroupId(groupId), raftPeers);
+  }
+
+  private RaftClient buildClientAndCache(RaftGroup group) {
+    RaftProperties raftProperties = new RaftProperties();
+    RaftClient.Builder builder =
+        RaftClient.newBuilder()
+            .setProperties(raftProperties)
+            .setRaftGroup(group)
+            .setClientRpc(
+                new GrpcFactory(new Parameters())
+                    .newRaftClientRpc(ClientId.randomId(), raftProperties));
+    RaftClient client = builder.build();
+    clientMap.put(group.getGroupId(), client);
+    return client;
+  }
+
+  private RatisConsensus(Endpoint endpoint, File ratisStorageDir, IStateMachine.Registry registry)
+      throws IOException {
+
+    this.clientMap = new ConcurrentHashMap<>();
+    this.raftGroupMap = new HashMap<>();

Review comment:
       Why use `HashMap`?

##########
File path: consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
##########
@@ -21,70 +21,452 @@
 
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.*;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
-import java.util.List;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.Parameters;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.grpc.GrpcFactory;
+import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.NetUtils;
+import org.apache.thrift.TException;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * A multi-raft consensus implementation based on Ratis, currently still under development.
  *
  * <p>See jira [IOTDB-2674](https://issues.apache.org/jira/browse/IOTDB-2674) for more details.
  */
 public class RatisConsensus implements IConsensus {
+  // the unique net communication endpoint
+  private final RaftPeer myself;
+
+  private final RaftServer server;
+
+  // TODO when comm with myself, use method call instead of RPC
+  private final Map<RaftGroupId, RaftClient> clientMap;
+  private final Map<RaftGroupId, RaftGroup> raftGroupMap;
+
+  private ClientId localFakeId;
+  private AtomicLong localFakeCallId;
+  /**
+   * This function will use the previous client for groupId to query the latest group info it will
+   * update the new group info into the groupMap and rebuild its client
+   *
+   * @throws ConsensusGroupNotExistException when cannot get the group info
+   */
+  private void syncGroupInfoAndRebuildClient(ConsensusGroupId groupId)
+      throws ConsensusGroupNotExistException {
+    RaftGroupId raftGroupId = Utils.toRatisGroupId(groupId);
+    RaftClient current = clientMap.get(raftGroupId);
+    try {
+      GroupInfoReply reply = current.getGroupManagementApi(myself.getId()).info(raftGroupId);
+
+      if (!reply.isSuccess()) {
+        throw new ConsensusGroupNotExistException(groupId);
+      }
+
+      raftGroupMap.put(raftGroupId, reply.getGroup());
+      buildClientAndCache(raftGroupMap.get(raftGroupId));
+    } catch (IOException e) {
+      throw new ConsensusGroupNotExistException(groupId);
+    }
+  }
+
+  private RaftClientReply sendReconfiguration(RaftGroupId raftGroupId, List<RaftPeer> peers)
+      throws RatisRequestFailedException {
+    RaftClient client = clientMap.get(raftGroupId);
+    // notify the group leader of configuration change
+    RaftClientReply reply = null;
+    try {
+      reply = client.admin().setConfiguration(peers);
+    } catch (IOException e) {
+      throw new RatisRequestFailedException();
+    }
+    return reply;
+  }
 
   @Override
-  public void start() {}
+  public void start() {
+    // TODO awkward, should we add IOException to method signature?
+    try {
+      server.start();
+    } catch (IOException ignored) {
+      try {
+        server.close();
+      } catch (IOException ignored1) {
+      }
+    }
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    try {
+      server.close();
+    } catch (IOException ignored) {
+    }
+  }
 
   @Override
   public ConsensusWriteResponse write(
       ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusWriteResponse.newBuilder().build();
+
+    RaftClient client = clientMap.get(Utils.toRatisGroupId(groupId));
+    TSStatus writeResult = null;
+    try {
+      ByteBufferConsensusRequest request = (ByteBufferConsensusRequest) IConsensusRequest;
+      Message message = Message.valueOf(ByteString.copyFrom(request.getContent()));
+      RaftClientReply reply = client.io().send(message);
+      writeResult = Utils.deserializeFrom(reply.getMessage().getContent().asReadOnlyByteBuffer());
+    } catch (IOException | TException e) {
+      return ConsensusWriteResponse.newBuilder()
+          .setException(new RatisRequestFailedException())
+          .build();
+    }
+    return ConsensusWriteResponse.newBuilder().setStatus(writeResult).build();
   }
 
+  /** Read directly from LOCAL COPY notice: May read stale data (not linearizable) */
   @Override
-  public ConsensusReadResponse read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
-    return ConsensusReadResponse.newBuilder().build();
+  public synchronized ConsensusReadResponse read(

Review comment:
       we may set `ConsensusGroupNotExistException` to response in this function but I do not see it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@iotdb.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org