You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2022/03/11 13:45:55 UTC
[iotdb] 01/01: init the consensus framework
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch consensus
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9d52c2d57da5a0a762410f55b85e87be008aa596
Author: LebronAl <TX...@gmail.com>
AuthorDate: Fri Mar 11 21:44:03 2022 +0800
init the consensus framework
---
consensus/pom.xml | 30 +++
.../org/apache/iotdb/consensus/IConsensus.java | 56 +++++
.../iotdb/consensus/common/ConsensusGroup.java | 60 +++++
.../iotdb/consensus/common/ConsensusGroupId.java | 64 ++++++
.../org/apache/iotdb/consensus/common/DataSet.java | 22 ++
.../apache/iotdb/consensus/common/Endpoint.java | 59 +++++
.../apache/iotdb/consensus/common/GroupType.java | 27 +++
.../org/apache/iotdb/consensus/common/Peer.java | 59 +++++
.../common/request/IConsensusRequest.java | 29 +++
.../common/response/ConsensusGenericResponse.java | 64 ++++++
.../common/response/ConsensusReadResponse.java | 65 ++++++
.../common/response/ConsensusResponse.java | 39 ++++
.../common/response/ConsensusWriteResponse.java | 65 ++++++
.../consensus/exception/ConsensusException.java | 27 +++
.../ConsensusGroupAlreadyExistException.java | 36 +++
.../exception/ConsensusGroupNotExistException.java | 36 +++
.../exception/IllegalPeerNumException.java | 29 +++
.../iotdb/consensus/ratis/RatisConsensus.java | 85 +++++++
.../consensus/standalone/StandAloneConsensus.java | 160 ++++++++++++++
.../consensus/standalone/StandAloneServerImpl.java | 61 +++++
.../consensus/statemachine/EmptyStateMachine.java | 43 ++++
.../consensus/statemachine/IStateMachine.java | 40 ++++
.../standalone/StandAloneConsensusTest.java | 245 +++++++++++++++++++++
pom.xml | 1 +
server/pom.xml | 5 +
.../apache/iotdb/db/consensus/ConsensusMain.java | 95 ++++++++
.../ratis/RatisDataRegionStateMachine.java | 54 +++++
.../ratis/RatisSchemaRegionStateMachine.java | 54 +++++
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 18 +-
29 files changed, 1627 insertions(+), 1 deletion(-)
diff --git a/consensus/pom.xml b/consensus/pom.xml
new file mode 100644
index 0000000..1681cf8
--- /dev/null
+++ b/consensus/pom.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>iotdb-parent</artifactId>
+ <groupId>org.apache.iotdb</groupId>
+ <version>0.13.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>iotdb-consensus</artifactId>
+ <name>IoTDB Consensus</name>
+ <dependencies>
+ <!-- https://mvnrepository.com/artifact/org.apache.ratis/ratis-server -->
+ <dependency>
+ <groupId>org.apache.ratis</groupId>
+ <artifactId>ratis-server</artifactId>
+ <version>2.2.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-thrift</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+</project>
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
new file mode 100644
index 0000000..a27e58f
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.consensus;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+
+import java.util.List;
+
+public interface IConsensus {
+ void start();
+
+ void stop();
+
+ // write API
+ ConsensusWriteResponse Write(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest);
+ // read API
+ ConsensusReadResponse Read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest);
+
+ // multi consensus group API
+ ConsensusGenericResponse AddConsensusGroup(ConsensusGroupId groupId, List<Peer> peers);
+
+ ConsensusGenericResponse RemoveConsensusGroup(ConsensusGroupId groupId);
+
+ // single consensus group API
+ ConsensusGenericResponse AddPeer(ConsensusGroupId groupId, Peer peer);
+
+ ConsensusGenericResponse RemovePeer(ConsensusGroupId groupId, Peer peer);
+
+ ConsensusGenericResponse ChangePeer(ConsensusGroupId groupId, List<Peer> peers);
+
+ // management API
+ ConsensusGenericResponse TransferLeader(ConsensusGroupId groupId, Peer newPeer);
+
+ ConsensusGenericResponse TriggerSnapshot(ConsensusGroupId groupId);
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/ConsensusGroup.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/ConsensusGroup.java
new file mode 100644
index 0000000..f22cc36
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/ConsensusGroup.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common;
+
+import java.util.List;
+import java.util.Objects;
+
+// TODO Use a mature IDL framework such as Protobuf to manage this structure
+public class ConsensusGroup {
+
+ private final ConsensusGroupId groupId;
+ private final List<Peer> peers;
+
+ public ConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
+ this.groupId = groupId;
+ this.peers = peers;
+ }
+
+ public ConsensusGroupId getGroupId() {
+ return groupId;
+ }
+
+ public List<Peer> getPeers() {
+ return peers;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConsensusGroup that = (ConsensusGroup) o;
+ return Objects.equals(groupId, that.groupId) && Objects.equals(peers, that.peers);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, peers);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/ConsensusGroupId.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/ConsensusGroupId.java
new file mode 100644
index 0000000..5b10c6a
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/ConsensusGroupId.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common;
+
+import java.util.Objects;
+
+// TODO Use a mature IDL framework such as Protobuf to manage this structure
+public class ConsensusGroupId {
+
+ private final GroupType type;
+ private final long id;
+
+ public ConsensusGroupId(GroupType type, long id) {
+ this.type = type;
+ this.id = id;
+ }
+
+ public GroupType getType() {
+ return type;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ConsensusGroupId that = (ConsensusGroupId) o;
+ return id == that.id && Objects.equals(type, that.type);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type, id);
+ }
+
+ @Override
+ public String toString() {
+ return "ConsensusGroupId{" + "type=" + type + ", id=" + id + '}';
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/DataSet.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/DataSet.java
new file mode 100644
index 0000000..660cbee
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/DataSet.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common;
+
+public interface DataSet {}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
new file mode 100644
index 0000000..34201d1
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Endpoint.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common;
+
+import java.util.Objects;
+
+// TODO Use a mature IDL framework such as Protobuf to manage this structure
+public class Endpoint {
+
+ private final String ip;
+ private final int port;
+
+ public Endpoint(String ip, int port) {
+ this.ip = ip;
+ this.port = port;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Endpoint endpoint = (Endpoint) o;
+ return port == endpoint.port && Objects.equals(ip, endpoint.ip);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, port);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/GroupType.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/GroupType.java
new file mode 100644
index 0000000..7fe047e
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/GroupType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common;
+
+// TODO Use a mature IDL framework such as Protobuf to manage this structure
+public enum GroupType {
+ Config,
+ DataRegion,
+ SchemaRegion
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
new file mode 100644
index 0000000..50988c1
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/Peer.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common;
+
+import java.util.Objects;
+
+// TODO Use a mature IDL framework such as Protobuf to manage this structure
+public class Peer {
+
+ private final ConsensusGroupId groupId;
+ private final Endpoint endpoint;
+
+ public Peer(ConsensusGroupId groupId, Endpoint endpoint) {
+ this.groupId = groupId;
+ this.endpoint = endpoint;
+ }
+
+ public ConsensusGroupId getGroupId() {
+ return groupId;
+ }
+
+ public Endpoint getEndpoint() {
+ return endpoint;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Peer peer = (Peer) o;
+ return Objects.equals(groupId, peer.groupId) && Objects.equals(endpoint, peer.endpoint);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(groupId, endpoint);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
new file mode 100644
index 0000000..1bf8f47
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+
+public interface IConsensusRequest {
+
+ void serializeRequest(ByteBuffer buffer);
+
+ void deserializeRequest(ByteBuffer buffer) throws Exception;
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusGenericResponse.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusGenericResponse.java
new file mode 100644
index 0000000..acf67f2
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusGenericResponse.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.response;
+
+import org.apache.iotdb.consensus.exception.ConsensusException;
+
+public class ConsensusGenericResponse extends ConsensusResponse {
+
+ private final boolean success;
+
+ public ConsensusGenericResponse(ConsensusException exception, boolean success) {
+ super(exception);
+ this.success = success;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsensusGenericResponse{" + "success=" + success + "} " + super.toString();
+ }
+
+ public static ConsensusGenericResponse.Builder newBuilder() {
+ return new ConsensusGenericResponse.Builder();
+ }
+
+ public static class Builder {
+ private boolean success;
+ private ConsensusException exception;
+
+ public ConsensusGenericResponse build() {
+ return new ConsensusGenericResponse(exception, success);
+ }
+
+ public ConsensusGenericResponse.Builder setException(ConsensusException exception) {
+ this.exception = exception;
+ return this;
+ }
+
+ public ConsensusGenericResponse.Builder setSuccess(boolean success) {
+ this.success = success;
+ return this;
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusReadResponse.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusReadResponse.java
new file mode 100644
index 0000000..a8bec60
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusReadResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.response;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+
+public class ConsensusReadResponse extends ConsensusResponse {
+
+ private final DataSet dataset;
+
+ public ConsensusReadResponse(ConsensusException exception, DataSet dataset) {
+ super(exception);
+ this.dataset = dataset;
+ }
+
+ public DataSet getDataset() {
+ return dataset;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsensusReadResponse{" + "dataset=" + dataset + "} " + super.toString();
+ }
+
+ public static ConsensusReadResponse.Builder newBuilder() {
+ return new ConsensusReadResponse.Builder();
+ }
+
+ public static class Builder {
+ private ConsensusException exception;
+ private DataSet dataset;
+
+ public ConsensusReadResponse build() {
+ return new ConsensusReadResponse(exception, dataset);
+ }
+
+ public ConsensusReadResponse.Builder setException(ConsensusException exception) {
+ this.exception = exception;
+ return this;
+ }
+
+ public ConsensusReadResponse.Builder setDataSet(DataSet dataset) {
+ this.dataset = dataset;
+ return this;
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusResponse.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusResponse.java
new file mode 100644
index 0000000..355b0d0
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.response;
+
+import org.apache.iotdb.consensus.exception.ConsensusException;
+
+public abstract class ConsensusResponse {
+ private final ConsensusException exception;
+
+ public ConsensusResponse(ConsensusException exception) {
+ this.exception = exception;
+ }
+
+ public ConsensusException getException() {
+ return exception;
+ }
+
+ @Override
+ public String toString() {
+ return "exception=" + exception;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java
new file mode 100644
index 0000000..f3b38ad
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/response/ConsensusWriteResponse.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.response;
+
+import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class ConsensusWriteResponse extends ConsensusResponse {
+
+ private final TSStatus status;
+
+ public ConsensusWriteResponse(ConsensusException exception, TSStatus status) {
+ super(exception);
+ this.status = status;
+ }
+
+ public TSStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ return "ConsensusWriteResponse{" + "status=" + status + "} " + super.toString();
+ }
+
+ public static ConsensusWriteResponse.Builder newBuilder() {
+ return new ConsensusWriteResponse.Builder();
+ }
+
+ public static class Builder {
+ private org.apache.iotdb.service.rpc.thrift.TSStatus status;
+ private ConsensusException exception;
+
+ public ConsensusWriteResponse build() {
+ return new ConsensusWriteResponse(exception, status);
+ }
+
+ public Builder setException(ConsensusException exception) {
+ this.exception = exception;
+ return this;
+ }
+
+ public Builder setStatus(org.apache.iotdb.service.rpc.thrift.TSStatus status) {
+ this.status = status;
+ return this;
+ }
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusException.java
new file mode 100644
index 0000000..7332879
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusException.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+public class ConsensusException extends Exception {
+
+ public ConsensusException(String message) {
+ super(message);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAlreadyExistException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAlreadyExistException.java
new file mode 100644
index 0000000..c0a4e0c
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupAlreadyExistException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusGroupAlreadyExistException extends ConsensusException {
+
+ private final ConsensusGroupId groupId;
+
+ public ConsensusGroupAlreadyExistException(ConsensusGroupId groupId) {
+ super(String.format("The consensus group %s already exists", groupId));
+ this.groupId = groupId;
+ }
+
+ public ConsensusGroupId getGroupId() {
+ return groupId;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
new file mode 100644
index 0000000..e2b2abd
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/ConsensusGroupNotExistException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+
+public class ConsensusGroupNotExistException extends ConsensusException {
+
+ private final ConsensusGroupId groupId;
+
+ public ConsensusGroupNotExistException(ConsensusGroupId groupId) {
+ super(String.format("The consensus group %s doesn't exist", groupId));
+ this.groupId = groupId;
+ }
+
+ public ConsensusGroupId getGroupId() {
+ return groupId;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerNumException.java b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerNumException.java
new file mode 100644
index 0000000..d2026a5
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/exception/IllegalPeerNumException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.exception;
+
+public class IllegalPeerNumException extends ConsensusException {
+
+ public IllegalPeerNumException(int size) {
+ super(
+ String.format(
+ "Illegal Peer num %d, only support one peer in StandAloneConsensus Mode", size));
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
new file mode 100644
index 0000000..22f5690
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.ratis;
+
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+
+import java.util.List;
+
+public class RatisConsensus implements IConsensus {
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public ConsensusWriteResponse Write(
+ ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+ return null;
+ }
+
+ @Override
+ public ConsensusReadResponse Read(ConsensusGroupId groupId, IConsensusRequest IConsensusRequest) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse AddConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse RemoveConsensusGroup(ConsensusGroupId groupId) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse AddPeer(ConsensusGroupId groupId, Peer peer) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse RemovePeer(ConsensusGroupId groupId, Peer peer) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse ChangePeer(ConsensusGroupId groupId, List<Peer> peers) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse TransferLeader(ConsensusGroupId groupId, Peer newPeer) {
+ return null;
+ }
+
+ @Override
+ public ConsensusGenericResponse TriggerSnapshot(ConsensusGroupId groupId) {
+ return null;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
new file mode 100644
index 0000000..ef8e2a4
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneConsensus.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.standalone;
+
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class StandAloneConsensus implements IConsensus {
+
+ private final IStateMachine.Registry registry;
+ private final Map<ConsensusGroupId, StandAloneServerImpl> stateMachineMap;
+
+ public StandAloneConsensus(IStateMachine.Registry registry) {
+ this.registry = registry;
+ this.stateMachineMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public ConsensusWriteResponse Write(ConsensusGroupId groupId, IConsensusRequest request) {
+ AtomicReference<TSStatus> result = new AtomicReference<>();
+ stateMachineMap.computeIfPresent(
+ groupId,
+ (k, v) -> {
+ result.set(v.Write(request));
+ return v;
+ });
+ if (result.get() == null) {
+ return ConsensusWriteResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return ConsensusWriteResponse.newBuilder().setStatus(result.get()).build();
+ }
+
+ @Override
+ public ConsensusReadResponse Read(ConsensusGroupId groupId, IConsensusRequest request) {
+ AtomicReference<DataSet> result = new AtomicReference<>();
+ stateMachineMap.computeIfPresent(
+ groupId,
+ (k, v) -> {
+ result.set(v.Read(request));
+ return v;
+ });
+ if (result.get() == null) {
+ return ConsensusReadResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return ConsensusReadResponse.newBuilder().setDataSet(result.get()).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse AddConsensusGroup(ConsensusGroupId groupId, List<Peer> peers) {
+ int consensusGroupSize = peers.size();
+ if (consensusGroupSize != 1) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new IllegalPeerNumException(consensusGroupSize))
+ .build();
+ }
+ AtomicBoolean exist = new AtomicBoolean(true);
+ stateMachineMap.computeIfAbsent(
+ groupId,
+ (k) -> {
+ exist.set(false);
+ StandAloneServerImpl impl =
+ new StandAloneServerImpl(peers.get(0), registry.apply(groupId));
+ impl.start();
+ return impl;
+ });
+ if (exist.get()) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupAlreadyExistException(groupId))
+ .build();
+ }
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse RemoveConsensusGroup(ConsensusGroupId groupId) {
+ AtomicBoolean exist = new AtomicBoolean(false);
+ stateMachineMap.computeIfPresent(
+ groupId,
+ (k, v) -> {
+ exist.set(true);
+ v.stop();
+ return null;
+ });
+ if (!exist.get()) {
+ return ConsensusGenericResponse.newBuilder()
+ .setException(new ConsensusGroupNotExistException(groupId))
+ .build();
+ }
+ return ConsensusGenericResponse.newBuilder().setSuccess(true).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse AddPeer(ConsensusGroupId groupId, Peer peer) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse RemovePeer(ConsensusGroupId groupId, Peer peer) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse ChangePeer(ConsensusGroupId groupId, List<Peer> peers) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse TransferLeader(ConsensusGroupId groupId, Peer newPeer) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+
+ @Override
+ public ConsensusGenericResponse TriggerSnapshot(ConsensusGroupId groupId) {
+ return ConsensusGenericResponse.newBuilder().setSuccess(false).build();
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
new file mode 100644
index 0000000..00ee181
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.standalone;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class StandAloneServerImpl implements IStateMachine {
+
+ private final Peer peer;
+ private final IStateMachine stateMachine;
+
+ public StandAloneServerImpl(Peer peer, IStateMachine stateMachine) {
+ this.peer = peer;
+ this.stateMachine = stateMachine;
+ }
+
+ public Peer getPeer() {
+ return peer;
+ }
+
+ public IStateMachine getStateMachine() {
+ return stateMachine;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus Write(IConsensusRequest request) {
+ return stateMachine.Write(request);
+ }
+
+ @Override
+ public DataSet Read(IConsensusRequest request) {
+ return stateMachine.Read(request);
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
new file mode 100644
index 0000000..ab2206f
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.statemachine;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+public class EmptyStateMachine implements IStateMachine {
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus Write(IConsensusRequest IConsensusRequest) {
+ return new TSStatus();
+ }
+
+ @Override
+ public DataSet Read(IConsensusRequest IConsensusRequest) {
+ return null;
+ }
+}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
new file mode 100644
index 0000000..f4259b0
--- /dev/null
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.statemachine;
+
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import java.util.function.Function;
+
+public interface IStateMachine {
+
+ interface Registry extends Function<ConsensusGroupId, IStateMachine> {}
+
+ void start();
+
+ void stop();
+
+ TSStatus Write(IConsensusRequest IConsensusRequest);
+
+ DataSet Read(IConsensusRequest IConsensusRequest);
+}
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
new file mode 100644
index 0000000..9497409
--- /dev/null
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.standalone;
+
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
+import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
+import org.apache.iotdb.consensus.exception.ConsensusGroupAlreadyExistException;
+import org.apache.iotdb.consensus.exception.ConsensusGroupNotExistException;
+import org.apache.iotdb.consensus.exception.IllegalPeerNumException;
+import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StandAloneConsensusTest {
+
+ private IConsensus consensusImpl;
+ private final TestEntry entry = new TestEntry(0);
+ private final ConsensusGroupId dataRegionId = new ConsensusGroupId(GroupType.DataRegion, 0);
+ private final ConsensusGroupId schemaRegionId = new ConsensusGroupId(GroupType.SchemaRegion, 1);
+ private final ConsensusGroupId configId = new ConsensusGroupId(GroupType.Config, 2);
+
+ private static class TestEntry implements IConsensusRequest {
+
+ private int num;
+
+ public TestEntry(int num) {
+ this.num = num;
+ }
+
+ @Override
+ public void serializeRequest(ByteBuffer buffer) {
+ buffer.putInt(num);
+ }
+
+ @Override
+ public void deserializeRequest(ByteBuffer buffer) throws Exception {
+ num = buffer.getInt();
+ }
+ }
+
+ private static class TestStateMachine implements IStateMachine {
+
+ private final boolean direction;
+
+ public TestStateMachine(boolean direction) {
+ this.direction = direction;
+ }
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus Write(IConsensusRequest request) {
+ if (request instanceof TestEntry) {
+ return new TSStatus(
+ direction ? ((TestEntry) request).num + 1 : ((TestEntry) request).num - 1);
+ }
+ return new TSStatus();
+ }
+
+ @Override
+ public DataSet Read(IConsensusRequest request) {
+ return null;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ consensusImpl =
+ new StandAloneConsensus(
+ gid -> {
+ switch (gid.getType()) {
+ case SchemaRegion:
+ return new TestStateMachine(true);
+ case DataRegion:
+ return new TestStateMachine(false);
+ }
+ return new EmptyStateMachine();
+ });
+ consensusImpl.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ consensusImpl.stop();
+ }
+
+ @Test
+ public void addConsensusGroup() {
+ ConsensusGenericResponse response1 =
+ consensusImpl.AddConsensusGroup(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667))));
+ assertTrue(response1.isSuccess());
+ assertNull(response1.getException());
+
+ ConsensusGenericResponse response2 =
+ consensusImpl.AddConsensusGroup(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667))));
+ assertFalse(response2.isSuccess());
+ assertTrue(response2.getException() instanceof ConsensusGroupAlreadyExistException);
+
+ ConsensusGenericResponse response3 =
+ consensusImpl.AddConsensusGroup(
+ dataRegionId,
+ Arrays.asList(
+ new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667)),
+ new Peer(dataRegionId, new Endpoint("0.0.0.1", 6667))));
+ assertFalse(response3.isSuccess());
+ assertTrue(response3.getException() instanceof IllegalPeerNumException);
+
+ ConsensusGenericResponse response4 =
+ consensusImpl.AddConsensusGroup(
+ schemaRegionId,
+ Collections.singletonList(new Peer(schemaRegionId, new Endpoint("0.0.0.0", 6667))));
+ assertTrue(response4.isSuccess());
+ assertNull(response4.getException());
+ }
+
+ @Test
+ public void removeConsensusGroup() {
+ ConsensusGenericResponse response1 = consensusImpl.RemoveConsensusGroup(dataRegionId);
+ assertFalse(response1.isSuccess());
+ assertTrue(response1.getException() instanceof ConsensusGroupNotExistException);
+
+ ConsensusGenericResponse response2 =
+ consensusImpl.AddConsensusGroup(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667))));
+ assertTrue(response2.isSuccess());
+ assertNull(response2.getException());
+
+ ConsensusGenericResponse response3 = consensusImpl.RemoveConsensusGroup(dataRegionId);
+ assertTrue(response3.isSuccess());
+ assertNull(response3.getException());
+ }
+
+ @Test
+ public void addPeer() {
+ ConsensusGenericResponse response =
+ consensusImpl.AddPeer(dataRegionId, new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667)));
+ assertFalse(response.isSuccess());
+ }
+
+ @Test
+ public void removePeer() {
+ ConsensusGenericResponse response =
+ consensusImpl.RemovePeer(
+ dataRegionId, new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667)));
+ assertFalse(response.isSuccess());
+ }
+
+ @Test
+ public void transferLeader() {
+ ConsensusGenericResponse response =
+ consensusImpl.TransferLeader(
+ dataRegionId, new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667)));
+ assertFalse(response.isSuccess());
+ }
+
+ @Test
+ public void triggerSnapshot() {
+ ConsensusGenericResponse response = consensusImpl.TriggerSnapshot(dataRegionId);
+ assertFalse(response.isSuccess());
+ }
+
+ @Test
+ public void write() {
+ ConsensusGenericResponse response1 =
+ consensusImpl.AddConsensusGroup(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667))));
+ assertTrue(response1.isSuccess());
+ assertNull(response1.getException());
+
+ ConsensusGenericResponse response2 =
+ consensusImpl.AddConsensusGroup(
+ schemaRegionId,
+ Collections.singletonList(new Peer(schemaRegionId, new Endpoint("0.0.0.0", 6667))));
+ assertTrue(response2.isSuccess());
+ assertNull(response2.getException());
+
+ ConsensusGenericResponse response3 =
+ consensusImpl.AddConsensusGroup(
+ configId, Collections.singletonList(new Peer(configId, new Endpoint("0.0.0.0", 6667))));
+ assertTrue(response3.isSuccess());
+ assertNull(response3.getException());
+
+ ConsensusWriteResponse response4 = consensusImpl.Write(dataRegionId, entry);
+ assertNull(response4.getException());
+ assertNotNull(response4.getStatus());
+ assertEquals(-1, response4.getStatus().getCode());
+
+ ConsensusWriteResponse response5 = consensusImpl.Write(schemaRegionId, entry);
+ assertNull(response5.getException());
+ assertNotNull(response5.getStatus());
+ assertEquals(1, response5.getStatus().getCode());
+
+ ConsensusWriteResponse response6 = consensusImpl.Write(configId, entry);
+ assertNull(response6.getException());
+ assertEquals(0, response6.getStatus().getCode());
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9ce80c2..87ea63f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -111,6 +111,7 @@
<module>client-cpp</module>
<module>metrics</module>
<module>integration</module>
+ <module>consensus</module>
<!-- <module>library-udf</module>-->
</modules>
<!-- Properties Management -->
diff --git a/server/pom.xml b/server/pom.xml
index 6a83ac9..af955ee 100644
--- a/server/pom.xml
+++ b/server/pom.xml
@@ -47,6 +47,11 @@
</dependency>
<dependency>
<groupId>org.apache.iotdb</groupId>
+ <artifactId>iotdb-consensus</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iotdb</groupId>
<artifactId>tsfile</artifactId>
<version>${project.version}</version>
<exclusions>
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java
new file mode 100644
index 0000000..c6d7ee2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus;
+
+import org.apache.iotdb.consensus.IConsensus;
+import org.apache.iotdb.consensus.common.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.Endpoint;
+import org.apache.iotdb.consensus.common.GroupType;
+import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
+import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
+import org.apache.iotdb.db.consensus.ratis.RatisDataRegionStateMachine;
+import org.apache.iotdb.db.consensus.ratis.RatisSchemaRegionStateMachine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+
+import java.util.Collections;
+
+public class ConsensusMain {
+
+ public static void main(String[] args) throws IllegalPathException {
+
+ IConsensus consensusImpl =
+ new StandAloneConsensus(
+ id -> {
+ switch (id.getType()) {
+ case SchemaRegion:
+ return new RatisSchemaRegionStateMachine();
+ case DataRegion:
+ return new RatisDataRegionStateMachine();
+ }
+ return new EmptyStateMachine();
+ });
+ consensusImpl.start();
+ InsertRowPlan plan = getInsertRowPlan();
+ ConsensusGroupId dataRegionId = new ConsensusGroupId(GroupType.DataRegion, 0);
+ ConsensusGroupId schemaRegionId = new ConsensusGroupId(GroupType.SchemaRegion, 1);
+ consensusImpl.AddConsensusGroup(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, new Endpoint("0.0.0.0", 6667))));
+ consensusImpl.AddConsensusGroup(
+ schemaRegionId,
+ Collections.singletonList(new Peer(schemaRegionId, new Endpoint("0.0.0.0", 6667))));
+ consensusImpl.Write(dataRegionId, plan);
+ consensusImpl.Write(schemaRegionId, plan);
+ consensusImpl.stop();
+ }
+
+ private static InsertRowPlan getInsertRowPlan() throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ String[] columns = new String[6];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+ columns[3] = 100 + "";
+ columns[4] = false + "";
+ columns[5] = "hh" + 0;
+
+ return new InsertRowPlan(
+ new PartialPath("root.isp.d1"),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ columns);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java
new file mode 100644
index 0000000..103ba04
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RatisDataRegionStateMachine implements IStateMachine {
+
+ private static final Logger logger = LoggerFactory.getLogger(RatisDataRegionStateMachine.class);
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus Write(IConsensusRequest request) {
+ if (request instanceof InsertRowPlan) {
+ logger.info("Execute write plan : {}", request);
+ }
+ return new TSStatus(200);
+ }
+
+ @Override
+ public DataSet Read(IConsensusRequest request) {
+ logger.info("Execute read plan : {}", request);
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java
new file mode 100644
index 0000000..99c4810
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.ratis;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RatisSchemaRegionStateMachine implements IStateMachine {
+
+ private static final Logger logger = LoggerFactory.getLogger(RatisSchemaRegionStateMachine.class);
+
+ @Override
+ public void start() {}
+
+ @Override
+ public void stop() {}
+
+ @Override
+ public TSStatus Write(IConsensusRequest request) {
+ if (request instanceof InsertRowPlan) {
+ logger.info("Execute write plan : {}", request);
+ }
+ return new TSStatus(200);
+ }
+
+ @Override
+ public DataSet Read(IConsensusRequest request) {
+ logger.info("Execute read plan : {}", request);
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 0267b03..9115df1 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.qp.physical;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -87,12 +88,13 @@ import java.util.Collections;
import java.util.List;
/** This class is a abstract class for all type of PhysicalPlan. */
-public abstract class PhysicalPlan {
+public abstract class PhysicalPlan implements IConsensusRequest {
private static final Logger logger = LoggerFactory.getLogger(PhysicalPlan.class);
private static final String SERIALIZATION_UNIMPLEMENTED = "serialization unimplemented";
private boolean isQuery = false;
+
private Operator.OperatorType operatorType;
private static final int NULL_VALUE_LEN = -1;
@@ -190,6 +192,20 @@ public abstract class PhysicalPlan {
throw new UnsupportedOperationException(SERIALIZATION_UNIMPLEMENTED);
}
+ @Override
+ public void serializeRequest(ByteBuffer buffer) {
+ serialize(buffer);
+ }
+
+ @Override
+ public void deserializeRequest(ByteBuffer buffer) throws Exception {
+ try {
+ deserialize(buffer);
+ } catch (IllegalPathException | IOException e) {
+ throw new Exception(e);
+ }
+ }
+
/**
* Serialize the plan into the given buffer. This is provided for WAL, so fields that can be
* recovered will not be serialized. If error occurs when serializing this plan, the buffer will