You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/18 10:41:21 UTC
[iotdb] branch master updated: [IOTDB-2764] Refine the consensus layer framework and add examples (#5277)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b3fff9f [IOTDB-2764] Refine the consensus layer framework and add examples (#5277)
b3fff9f is described below
commit b3fff9f604ef7077d3b52655e9eadfde7f95e92e
Author: Potato <TX...@gmail.com>
AuthorDate: Fri Mar 18 05:40:09 2022 -0500
[IOTDB-2764] Refine the consensus layer framework and add examples (#5277)
---
.../org/apache/iotdb/consensus/IConsensus.java | 1 +
...equest.java => ByteBufferConsensusRequest.java} | 21 +++++-
.../common/request/IConsensusRequest.java | 2 -
.../consensus/standalone/StandAloneServerImpl.java | 8 ++-
.../consensus/statemachine/EmptyStateMachine.java | 2 +-
.../standalone/StandAloneConsensusTest.java | 30 +++++----
.../{ConsensusMain.java => ConsensusExample.java} | 32 ++++++++--
.../consensus/statemachine/BaseStateMachine.java | 74 ++++++++++++++++++++++
.../DataRegionStateMachine.java} | 23 +++----
.../SchemaRegionStateMachine.java} | 23 +++----
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 9 ---
11 files changed, 164 insertions(+), 61 deletions(-)
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 6e29654..baef53d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -29,6 +29,7 @@ import java.util.List;
/** Consensus module base class. Each method should be thread-safe */
public interface IConsensus {
+
void start();
void stop();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
similarity index 57%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
index 1bf8f47..19ec789 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
@@ -21,9 +21,24 @@ package org.apache.iotdb.consensus.common.request;
import java.nio.ByteBuffer;
-public interface IConsensusRequest {
+/*
+In general, for the requests from the leader, we can directly strong-cast the class to reduce
+the cost of deserialization during the execution of the leader state machine. For the requests
+received by the followers, the responsibility of deserialization can generally be transferred
+to the state machine layer
+*/
+public class ByteBufferConsensusRequest implements IConsensusRequest {
+
+ private final ByteBuffer byteBuffer;
+
+ public ByteBufferConsensusRequest(ByteBuffer byteBuffer) {
+ this.byteBuffer = byteBuffer;
+ }
- void serializeRequest(ByteBuffer buffer);
+ @Override
+ public void serializeRequest(ByteBuffer buffer) {}
- void deserializeRequest(ByteBuffer buffer) throws Exception;
+ public ByteBuffer getContent() {
+ return byteBuffer;
+ }
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index 1bf8f47..0a5aacf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -24,6 +24,4 @@ import java.nio.ByteBuffer;
public interface IConsensusRequest {
void serializeRequest(ByteBuffer buffer);
-
- void deserializeRequest(ByteBuffer buffer) throws Exception;
}
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index 8b50cd9..ee66afc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -44,10 +44,14 @@ public class StandAloneServerImpl implements IStateMachine {
}
@Override
- public void start() {}
+ public void start() {
+ stateMachine.start();
+ }
@Override
- public void stop() {}
+ public void stop() {
+ stateMachine.stop();
+ }
@Override
public TSStatus write(IConsensusRequest request) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
index 0505508..95767c6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -33,7 +33,7 @@ public class EmptyStateMachine implements IStateMachine {
@Override
public TSStatus write(IConsensusRequest IConsensusRequest) {
- return new TSStatus();
+ return new TSStatus(0);
}
@Override
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5a8ca14..84e0050 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.GroupType;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -52,14 +53,16 @@ import static org.junit.Assert.assertTrue;
public class StandAloneConsensusTest {
private IConsensus consensusImpl;
- private final TestEntry entry = new TestEntry(0);
+ private final TestEntry entry1 = new TestEntry(0);
+ private final ByteBufferConsensusRequest entry2 =
+ new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4]));
private final ConsensusGroupId dataRegionId = new ConsensusGroupId(GroupType.DataRegion, 0);
private final ConsensusGroupId schemaRegionId = new ConsensusGroupId(GroupType.SchemaRegion, 1);
private final ConsensusGroupId configId = new ConsensusGroupId(GroupType.Config, 2);
private static class TestEntry implements IConsensusRequest {
- private int num;
+ private final int num;
public TestEntry(int num) {
this.num = num;
@@ -69,11 +72,6 @@ public class StandAloneConsensusTest {
public void serializeRequest(ByteBuffer buffer) {
buffer.putInt(num);
}
-
- @Override
- public void deserializeRequest(ByteBuffer buffer) throws Exception {
- num = buffer.getInt();
- }
}
private static class TestStateMachine implements IStateMachine {
@@ -92,7 +90,9 @@ public class StandAloneConsensusTest {
@Override
public TSStatus write(IConsensusRequest request) {
- if (request instanceof TestEntry) {
+ if (request instanceof ByteBufferConsensusRequest) {
+ return new TSStatus(((ByteBufferConsensusRequest) request).getContent().getInt());
+ } else if (request instanceof TestEntry) {
return new TSStatus(
direction ? ((TestEntry) request).num + 1 : ((TestEntry) request).num - 1);
}
@@ -237,18 +237,26 @@ public class StandAloneConsensusTest {
assertTrue(response3.isSuccess());
assertNull(response3.getException());
- ConsensusWriteResponse response4 = consensusImpl.write(dataRegionId, entry);
+ // test new TestStateMachine(true), should return 1;
+ ConsensusWriteResponse response4 = consensusImpl.write(dataRegionId, entry1);
assertNull(response4.getException());
assertNotNull(response4.getStatus());
assertEquals(-1, response4.getStatus().getCode());
- ConsensusWriteResponse response5 = consensusImpl.write(schemaRegionId, entry);
+ // test new TestStateMachine(false), should return -1;
+ ConsensusWriteResponse response5 = consensusImpl.write(schemaRegionId, entry1);
assertNull(response5.getException());
assertNotNull(response5.getStatus());
assertEquals(1, response5.getStatus().getCode());
- ConsensusWriteResponse response6 = consensusImpl.write(configId, entry);
+ // test new EmptyStateMachine(), should return 0;
+ ConsensusWriteResponse response6 = consensusImpl.write(configId, entry1);
assertNull(response6.getException());
assertEquals(0, response6.getStatus().getCode());
+
+ // test ByteBufferConsensusRequest, should return 0;
+ ConsensusWriteResponse response7 = consensusImpl.write(dataRegionId, entry2);
+ assertNull(response7.getException());
+ assertEquals(0, response7.getStatus().getCode());
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
index 35cac1a..4350163 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
@@ -24,34 +24,38 @@ import org.apache.iotdb.consensus.common.ConsensusGroupId;
import org.apache.iotdb.consensus.common.Endpoint;
import org.apache.iotdb.consensus.common.GroupType;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
-import org.apache.iotdb.db.consensus.ratis.RatisDataRegionStateMachine;
-import org.apache.iotdb.db.consensus.ratis.RatisSchemaRegionStateMachine;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Collections;
-public class ConsensusMain {
-
- public static void main(String[] args) throws IllegalPathException {
+public class ConsensusExample {
+ public static void main(String[] args) throws IllegalPathException, IOException {
IConsensus consensusImpl =
new StandAloneConsensus(
id -> {
switch (id.getType()) {
case SchemaRegion:
- return new RatisSchemaRegionStateMachine();
+ return new SchemaRegionStateMachine();
case DataRegion:
- return new RatisDataRegionStateMachine();
+ return new DataRegionStateMachine();
}
return new EmptyStateMachine();
});
consensusImpl.start();
InsertRowPlan plan = getInsertRowPlan();
+
ConsensusGroupId dataRegionId = new ConsensusGroupId(GroupType.DataRegion, 0);
ConsensusGroupId schemaRegionId = new ConsensusGroupId(GroupType.SchemaRegion, 1);
consensusImpl.addConsensusGroup(
@@ -60,8 +64,22 @@ public class ConsensusMain {
consensusImpl.addConsensusGroup(
schemaRegionId,
Collections.singletonList(new Peer(schemaRegionId, new Endpoint("0.0.0.0", 6667))));
+
+ // The leader node can pass memory structures directly to the consensus layer
consensusImpl.write(dataRegionId, plan);
consensusImpl.write(schemaRegionId, plan);
+
+ // TODO pooling to reduce GC overhead
+ ByteBuffer buffer =
+ ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
+ plan.serialize(buffer);
+ buffer.flip();
+
+ // the follower node can pass ByteBuffer into the consensus layer without deserializing it
+ consensusImpl.write(dataRegionId, new ByteBufferConsensusRequest(buffer));
+ buffer.flip();
+ consensusImpl.write(schemaRegionId, new ByteBufferConsensusRequest(buffer));
+
consensusImpl.stop();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
new file mode 100644
index 0000000..af40bb8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class BaseStateMachine implements IStateMachine {
+
+ private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
+
+ @Override
+ public TSStatus write(IConsensusRequest request) {
+ PhysicalPlan plan;
+ if (request instanceof ByteBufferConsensusRequest) {
+ try {
+ plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
+ } catch (IOException | IllegalPathException e) {
+ logger.error("Deserialization error for write plan : {}", request);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ } else if (request instanceof PhysicalPlan) {
+ plan = (PhysicalPlan) request;
+ } else {
+ logger.error("Unexpected write plan : {}", request);
+ return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ }
+ return write(plan);
+ }
+
+ protected abstract TSStatus write(PhysicalPlan plan);
+
+ @Override
+ public DataSet read(IConsensusRequest request) {
+ PhysicalPlan plan;
+ if (request instanceof PhysicalPlan) {
+ plan = (PhysicalPlan) request;
+ } else {
+ logger.error("Unexpected read plan : {}", request);
+ return null;
+ }
+ return read(plan);
+ }
+
+ protected abstract DataSet read(PhysicalPlan plan);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index ac4a133..0372635 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -17,20 +17,19 @@
* under the License.
*/
-package org.apache.iotdb.db.consensus.ratis;
+package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RatisDataRegionStateMachine implements IStateMachine {
+public class DataRegionStateMachine extends BaseStateMachine {
- private static final Logger logger = LoggerFactory.getLogger(RatisDataRegionStateMachine.class);
+ private static final Logger logger = LoggerFactory.getLogger(DataRegionStateMachine.class);
@Override
public void start() {}
@@ -39,16 +38,14 @@ public class RatisDataRegionStateMachine implements IStateMachine {
public void stop() {}
@Override
- public TSStatus write(IConsensusRequest request) {
- if (request instanceof InsertRowPlan) {
- logger.info("Execute write plan : {}", request);
- }
- return new TSStatus(200);
+ protected TSStatus write(PhysicalPlan plan) {
+ logger.info("Execute write plan in DataRegionStateMachine : {}", plan);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
- public DataSet read(IConsensusRequest request) {
- logger.info("Execute read plan : {}", request);
+ protected DataSet read(PhysicalPlan plan) {
+ logger.info("Execute read plan in DataRegionStateMachine: {}", plan);
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 41e4e72..b6bc2e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -17,20 +17,19 @@
* under the License.
*/
-package org.apache.iotdb.db.consensus.ratis;
+package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class RatisSchemaRegionStateMachine implements IStateMachine {
+public class SchemaRegionStateMachine extends BaseStateMachine {
- private static final Logger logger = LoggerFactory.getLogger(RatisSchemaRegionStateMachine.class);
+ private static final Logger logger = LoggerFactory.getLogger(SchemaRegionStateMachine.class);
@Override
public void start() {}
@@ -39,16 +38,14 @@ public class RatisSchemaRegionStateMachine implements IStateMachine {
public void stop() {}
@Override
- public TSStatus write(IConsensusRequest request) {
- if (request instanceof InsertRowPlan) {
- logger.info("Execute write plan : {}", request);
- }
- return new TSStatus(200);
+ protected TSStatus write(PhysicalPlan plan) {
+ logger.info("Execute write plan in SchemaRegionStateMachine : {}", plan);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
@Override
- public DataSet read(IConsensusRequest request) {
- logger.info("Execute read plan : {}", request);
+ protected DataSet read(PhysicalPlan plan) {
+ logger.info("Execute read plan in SchemaRegionStateMachine: {}", plan);
return null;
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 63d56cb..2183348 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -196,15 +196,6 @@ public abstract class PhysicalPlan implements IConsensusRequest {
serialize(buffer);
}
- @Override
- public void deserializeRequest(ByteBuffer buffer) throws Exception {
- try {
- deserialize(buffer);
- } catch (IllegalPathException | IOException e) {
- throw new Exception(e);
- }
- }
-
/**
* Serialize the plan into the given buffer. This is provided for WAL, so fields that can be
* recovered will not be serialized. If error occurs when serializing this plan, the buffer will