You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2022/03/18 10:41:21 UTC

[iotdb] branch master updated: [IOTDB-2764] Refine the consensus layer framework and add examples (#5277)

This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b3fff9f  [IOTDB-2764] Refine the consensus layer framework and add examples (#5277)
b3fff9f is described below

commit b3fff9f604ef7077d3b52655e9eadfde7f95e92e
Author: Potato <TX...@gmail.com>
AuthorDate: Fri Mar 18 05:40:09 2022 -0500

    [IOTDB-2764] Refine the consensus layer framework and add examples (#5277)
---
 .../org/apache/iotdb/consensus/IConsensus.java     |  1 +
 ...equest.java => ByteBufferConsensusRequest.java} | 21 +++++-
 .../common/request/IConsensusRequest.java          |  2 -
 .../consensus/standalone/StandAloneServerImpl.java |  8 ++-
 .../consensus/statemachine/EmptyStateMachine.java  |  2 +-
 .../standalone/StandAloneConsensusTest.java        | 30 +++++----
 .../{ConsensusMain.java => ConsensusExample.java}  | 32 ++++++++--
 .../consensus/statemachine/BaseStateMachine.java   | 74 ++++++++++++++++++++++
 .../DataRegionStateMachine.java}                   | 23 +++----
 .../SchemaRegionStateMachine.java}                 | 23 +++----
 .../apache/iotdb/db/qp/physical/PhysicalPlan.java  |  9 ---
 11 files changed, 164 insertions(+), 61 deletions(-)

diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
index 6e29654..baef53d 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/IConsensus.java
@@ -29,6 +29,7 @@ import java.util.List;
 
 /** Consensus module base class. Each method should be thread-safe */
 public interface IConsensus {
+
   void start();
 
   void stop();
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
similarity index 57%
copy from consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
copy to consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
index 1bf8f47..19ec789 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/ByteBufferConsensusRequest.java
@@ -21,9 +21,24 @@ package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
 
-public interface IConsensusRequest {
+/*
+In general, for the requests from the leader, we can directly strong-cast the class to reduce
+the cost of deserialization during the execution of the leader state machine. For the requests
+received by the followers, the responsibility of deserialization can generally be transferred
+to the state machine layer
+*/
+public class ByteBufferConsensusRequest implements IConsensusRequest {
+
+  private final ByteBuffer byteBuffer;
+
+  public ByteBufferConsensusRequest(ByteBuffer byteBuffer) {
+    this.byteBuffer = byteBuffer;
+  }
 
-  void serializeRequest(ByteBuffer buffer);
+  @Override
+  public void serializeRequest(ByteBuffer buffer) {}
 
-  void deserializeRequest(ByteBuffer buffer) throws Exception;
+  public ByteBuffer getContent() {
+    return byteBuffer;
+  }
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
index 1bf8f47..0a5aacf 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IConsensusRequest.java
@@ -24,6 +24,4 @@ import java.nio.ByteBuffer;
 public interface IConsensusRequest {
 
   void serializeRequest(ByteBuffer buffer);
-
-  void deserializeRequest(ByteBuffer buffer) throws Exception;
 }
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index 8b50cd9..ee66afc 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -44,10 +44,14 @@ public class StandAloneServerImpl implements IStateMachine {
   }
 
   @Override
-  public void start() {}
+  public void start() {
+    stateMachine.start();
+  }
 
   @Override
-  public void stop() {}
+  public void stop() {
+    stateMachine.stop();
+  }
 
   @Override
   public TSStatus write(IConsensusRequest request) {
diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
index 0505508..95767c6 100644
--- a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -33,7 +33,7 @@ public class EmptyStateMachine implements IStateMachine {
 
   @Override
   public TSStatus write(IConsensusRequest IConsensusRequest) {
-    return new TSStatus();
+    return new TSStatus(0);
   }
 
   @Override
diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 5a8ca14..84e0050 100644
--- a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.GroupType;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
 import org.apache.iotdb.consensus.common.response.ConsensusWriteResponse;
@@ -52,14 +53,16 @@ import static org.junit.Assert.assertTrue;
 public class StandAloneConsensusTest {
 
   private IConsensus consensusImpl;
-  private final TestEntry entry = new TestEntry(0);
+  private final TestEntry entry1 = new TestEntry(0);
+  private final ByteBufferConsensusRequest entry2 =
+      new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4]));
   private final ConsensusGroupId dataRegionId = new ConsensusGroupId(GroupType.DataRegion, 0);
   private final ConsensusGroupId schemaRegionId = new ConsensusGroupId(GroupType.SchemaRegion, 1);
   private final ConsensusGroupId configId = new ConsensusGroupId(GroupType.Config, 2);
 
   private static class TestEntry implements IConsensusRequest {
 
-    private int num;
+    private final int num;
 
     public TestEntry(int num) {
       this.num = num;
@@ -69,11 +72,6 @@ public class StandAloneConsensusTest {
     public void serializeRequest(ByteBuffer buffer) {
       buffer.putInt(num);
     }
-
-    @Override
-    public void deserializeRequest(ByteBuffer buffer) throws Exception {
-      num = buffer.getInt();
-    }
   }
 
   private static class TestStateMachine implements IStateMachine {
@@ -92,7 +90,9 @@ public class StandAloneConsensusTest {
 
     @Override
     public TSStatus write(IConsensusRequest request) {
-      if (request instanceof TestEntry) {
+      if (request instanceof ByteBufferConsensusRequest) {
+        return new TSStatus(((ByteBufferConsensusRequest) request).getContent().getInt());
+      } else if (request instanceof TestEntry) {
         return new TSStatus(
             direction ? ((TestEntry) request).num + 1 : ((TestEntry) request).num - 1);
       }
@@ -237,18 +237,26 @@ public class StandAloneConsensusTest {
     assertTrue(response3.isSuccess());
     assertNull(response3.getException());
 
-    ConsensusWriteResponse response4 = consensusImpl.write(dataRegionId, entry);
+    // test new TestStateMachine(true), should return 1;
+    ConsensusWriteResponse response4 = consensusImpl.write(dataRegionId, entry1);
     assertNull(response4.getException());
     assertNotNull(response4.getStatus());
     assertEquals(-1, response4.getStatus().getCode());
 
-    ConsensusWriteResponse response5 = consensusImpl.write(schemaRegionId, entry);
+    // test new TestStateMachine(false), should return -1;
+    ConsensusWriteResponse response5 = consensusImpl.write(schemaRegionId, entry1);
     assertNull(response5.getException());
     assertNotNull(response5.getStatus());
     assertEquals(1, response5.getStatus().getCode());
 
-    ConsensusWriteResponse response6 = consensusImpl.write(configId, entry);
+    // test new EmptyStateMachine(), should return 0;
+    ConsensusWriteResponse response6 = consensusImpl.write(configId, entry1);
     assertNull(response6.getException());
     assertEquals(0, response6.getStatus().getCode());
+
+    // test ByteBufferConsensusRequest, should return 0;
+    ConsensusWriteResponse response7 = consensusImpl.write(dataRegionId, entry2);
+    assertNull(response7.getException());
+    assertEquals(0, response7.getStatus().getCode());
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
similarity index 75%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
index 35cac1a..4350163 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusMain.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/ConsensusExample.java
@@ -24,34 +24,38 @@ import org.apache.iotdb.consensus.common.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.Endpoint;
 import org.apache.iotdb.consensus.common.GroupType;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.standalone.StandAloneConsensus;
 import org.apache.iotdb.consensus.statemachine.EmptyStateMachine;
-import org.apache.iotdb.db.consensus.ratis.RatisDataRegionStateMachine;
-import org.apache.iotdb.db.consensus.ratis.RatisSchemaRegionStateMachine;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.consensus.statemachine.DataRegionStateMachine;
+import org.apache.iotdb.db.consensus.statemachine.SchemaRegionStateMachine;
 import org.apache.iotdb.db.exception.metadata.IllegalPathException;
 import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 
-public class ConsensusMain {
-
-  public static void main(String[] args) throws IllegalPathException {
+public class ConsensusExample {
 
+  public static void main(String[] args) throws IllegalPathException, IOException {
     IConsensus consensusImpl =
         new StandAloneConsensus(
             id -> {
               switch (id.getType()) {
                 case SchemaRegion:
-                  return new RatisSchemaRegionStateMachine();
+                  return new SchemaRegionStateMachine();
                 case DataRegion:
-                  return new RatisDataRegionStateMachine();
+                  return new DataRegionStateMachine();
               }
               return new EmptyStateMachine();
             });
     consensusImpl.start();
     InsertRowPlan plan = getInsertRowPlan();
+
     ConsensusGroupId dataRegionId = new ConsensusGroupId(GroupType.DataRegion, 0);
     ConsensusGroupId schemaRegionId = new ConsensusGroupId(GroupType.SchemaRegion, 1);
     consensusImpl.addConsensusGroup(
@@ -60,8 +64,22 @@ public class ConsensusMain {
     consensusImpl.addConsensusGroup(
         schemaRegionId,
         Collections.singletonList(new Peer(schemaRegionId, new Endpoint("0.0.0.0", 6667))));
+
+    // The leader node can pass memory structures directly to the consensus layer
     consensusImpl.write(dataRegionId, plan);
     consensusImpl.write(schemaRegionId, plan);
+
+    // TODO pooling to reduce GC overhead
+    ByteBuffer buffer =
+        ByteBuffer.allocate(IoTDBDescriptor.getInstance().getConfig().getWalBufferSize());
+    plan.serialize(buffer);
+    buffer.flip();
+
+    // the follower node can pass ByteBuffer into the consensus layer without deserializing it
+    consensusImpl.write(dataRegionId, new ByteBufferConsensusRequest(buffer));
+    buffer.flip();
+    consensusImpl.write(schemaRegionId, new ByteBufferConsensusRequest(buffer));
+
     consensusImpl.stop();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
new file mode 100644
index 0000000..af40bb8
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/BaseStateMachine.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.consensus.statemachine;
+
+import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
+import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.service.rpc.thrift.TSStatus;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public abstract class BaseStateMachine implements IStateMachine {
+
+  private static final Logger logger = LoggerFactory.getLogger(BaseStateMachine.class);
+
+  @Override
+  public TSStatus write(IConsensusRequest request) {
+    PhysicalPlan plan;
+    if (request instanceof ByteBufferConsensusRequest) {
+      try {
+        plan = PhysicalPlan.Factory.create(((ByteBufferConsensusRequest) request).getContent());
+      } catch (IOException | IllegalPathException e) {
+        logger.error("Deserialization error for write plan : {}", request);
+        return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      }
+    } else if (request instanceof PhysicalPlan) {
+      plan = (PhysicalPlan) request;
+    } else {
+      logger.error("Unexpected write plan : {}", request);
+      return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+    }
+    return write(plan);
+  }
+
+  protected abstract TSStatus write(PhysicalPlan plan);
+
+  @Override
+  public DataSet read(IConsensusRequest request) {
+    PhysicalPlan plan;
+    if (request instanceof PhysicalPlan) {
+      plan = (PhysicalPlan) request;
+    } else {
+      logger.error("Unexpected read plan : {}", request);
+      return null;
+    }
+    return read(plan);
+  }
+
+  protected abstract DataSet read(PhysicalPlan plan);
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index ac4a133..0372635 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisDataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -17,20 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.consensus.ratis;
+package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RatisDataRegionStateMachine implements IStateMachine {
+public class DataRegionStateMachine extends BaseStateMachine {
 
-  private static final Logger logger = LoggerFactory.getLogger(RatisDataRegionStateMachine.class);
+  private static final Logger logger = LoggerFactory.getLogger(DataRegionStateMachine.class);
 
   @Override
   public void start() {}
@@ -39,16 +38,14 @@ public class RatisDataRegionStateMachine implements IStateMachine {
   public void stop() {}
 
   @Override
-  public TSStatus write(IConsensusRequest request) {
-    if (request instanceof InsertRowPlan) {
-      logger.info("Execute write plan : {}", request);
-    }
-    return new TSStatus(200);
+  protected TSStatus write(PhysicalPlan plan) {
+    logger.info("Execute write plan in DataRegionStateMachine : {}", plan);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
-  public DataSet read(IConsensusRequest request) {
-    logger.info("Execute read plan : {}", request);
+  protected DataSet read(PhysicalPlan plan) {
+    logger.info("Execute read plan in DataRegionStateMachine: {}", plan);
     return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
similarity index 61%
rename from server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java
rename to server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index 41e4e72..b6bc2e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/ratis/RatisSchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -17,20 +17,19 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.consensus.ratis;
+package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
-import org.apache.iotdb.consensus.statemachine.IStateMachine;
-import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.service.rpc.thrift.TSStatus;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RatisSchemaRegionStateMachine implements IStateMachine {
+public class SchemaRegionStateMachine extends BaseStateMachine {
 
-  private static final Logger logger = LoggerFactory.getLogger(RatisSchemaRegionStateMachine.class);
+  private static final Logger logger = LoggerFactory.getLogger(SchemaRegionStateMachine.class);
 
   @Override
   public void start() {}
@@ -39,16 +38,14 @@ public class RatisSchemaRegionStateMachine implements IStateMachine {
   public void stop() {}
 
   @Override
-  public TSStatus write(IConsensusRequest request) {
-    if (request instanceof InsertRowPlan) {
-      logger.info("Execute write plan : {}", request);
-    }
-    return new TSStatus(200);
+  protected TSStatus write(PhysicalPlan plan) {
+    logger.info("Execute write plan in SchemaRegionStateMachine : {}", plan);
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   @Override
-  public DataSet read(IConsensusRequest request) {
-    logger.info("Execute read plan : {}", request);
+  protected DataSet read(PhysicalPlan plan) {
+    logger.info("Execute read plan in SchemaRegionStateMachine: {}", plan);
     return null;
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 63d56cb..2183348 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -196,15 +196,6 @@ public abstract class PhysicalPlan implements IConsensusRequest {
     serialize(buffer);
   }
 
-  @Override
-  public void deserializeRequest(ByteBuffer buffer) throws Exception {
-    try {
-      deserialize(buffer);
-    } catch (IllegalPathException | IOException e) {
-      throw new Exception(e);
-    }
-  }
-
   /**
    * Serialize the plan into the given buffer. This is provided for WAL, so fields that can be
    * recovered will not be serialized. If error occurs when serializing this plan, the buffer will