You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/18 09:03:20 UTC

[iotdb] branch master updated: [IOTDB-3213] Apply visitor pattern for DataRegionStateMachine (#5944)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 80475f77d0 [IOTDB-3213] Apply visitor pattern for DataRegionStateMachine (#5944)
80475f77d0 is described below

commit 80475f77d0a0cd111591cd064e11b3e40d818415
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Wed May 18 17:03:15 2022 +0800

    [IOTDB-3213] Apply visitor pattern for DataRegionStateMachine (#5944)
---
 .../statemachine/DataRegionStateMachine.java       |  40 +------
 .../statemachine/visitor/DataExecutionVisitor.java | 116 +++++++++++++++++++++
 .../db/mpp/plan/planner/plan/node/PlanVisitor.java |  25 +++++
 3 files changed, 143 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7fe44559f7..89110c4700 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -20,28 +20,17 @@
 package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
-import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.rpc.RpcUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.util.Arrays;
 
 public class DataRegionStateMachine extends BaseStateMachine {
 
@@ -73,32 +62,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
     PlanNode planNode = fragmentInstance.getFragment().getRoot();
-    try {
-      if (planNode instanceof InsertRowNode) {
-        region.insert((InsertRowNode) planNode);
-      } else if (planNode instanceof InsertTabletNode) {
-        region.insertTablet((InsertTabletNode) planNode);
-      } else if (planNode instanceof InsertRowsNode) {
-        region.insert((InsertRowsNode) planNode);
-      } else if (planNode instanceof InsertMultiTabletsNode) {
-        region.insertTablets((InsertMultiTabletsNode) (planNode));
-      } else if (planNode instanceof InsertRowsOfOneDeviceNode) {
-        region.insert((InsertRowsOfOneDeviceNode) planNode);
-      } else if (planNode instanceof DeleteRegionNode) {
-        region.syncDeleteDataFiles();
-        StorageEngineV2.getInstance()
-            .deleteDataRegion((DataRegionId) ((DeleteRegionNode) planNode).getConsensusGroupId());
-      } else {
-        logger.error("Unsupported plan node for writing to data region : {}", planNode);
-        return StatusUtils.UNSUPPORTED_OPERATION;
-      }
-    } catch (BatchProcessException e) {
-      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
-    } catch (Exception e) {
-      logger.error("Error in executing plan node: {}", planNode);
-      return StatusUtils.EXECUTE_STATEMENT_ERROR;
-    }
-    return StatusUtils.OK;
+    return planNode.accept(new DataExecutionVisitor(), region);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
new file mode 100644
index 0000000000..05a303c40f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.consensus.statemachine.visitor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(DataExecutionVisitor.class);
+
+  @Override
+  public TSStatus visitPlan(PlanNode node, DataRegion context) {
+    return null;
+  }
+
+  @Override
+  public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
+    try {
+      dataRegion.insert(node);
+      return StatusUtils.OK;
+    } catch (WriteProcessException | TriggerExecutionException e) {
+      LOGGER.error("Error in executing plan node: {}", node, e);
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
+    }
+  }
+
+  @Override
+  public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion dataRegion) {
+    try {
+      dataRegion.insertTablet(node);
+      return StatusUtils.OK;
+    } catch (TriggerExecutionException e) {
+      LOGGER.error("Error in executing plan node: {}", node, e);
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
+    } catch (BatchProcessException e) {
+      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+    }
+  }
+
+  @Override
+  public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
+    try {
+      dataRegion.insert(node);
+      return StatusUtils.OK;
+    } catch (BatchProcessException e) {
+      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+    }
+  }
+
+  @Override
+  public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, DataRegion dataRegion) {
+    try {
+      dataRegion.insertTablets(node);
+      return StatusUtils.OK;
+    } catch (BatchProcessException e) {
+      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+    }
+  }
+
+  @Override
+  public TSStatus visitInsertRowsOfOneDevice(
+      InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
+    try {
+      dataRegion.insert(node);
+      return StatusUtils.OK;
+    } catch (WriteProcessException | TriggerExecutionException e) {
+      LOGGER.error("Error in executing plan node: {}", node, e);
+      return StatusUtils.EXECUTE_STATEMENT_ERROR;
+    } catch (BatchProcessException e) {
+      return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+    }
+  }
+
+  @Override
+  public TSStatus visitDeleteRegion(DeleteRegionNode node, DataRegion dataRegion) {
+    dataRegion.syncDeleteDataFiles();
+    StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) node.getConsensusGroupId());
+    return StatusUtils.OK;
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 61470a7f20..4d552df27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -52,6 +52,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
 import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
 
 public abstract class PlanVisitor<R, C> {
 
@@ -200,4 +205,24 @@ public abstract class PlanVisitor<R, C> {
   public R visitDeleteRegion(DeleteRegionNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitInsertRow(InsertRowNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitInsertTablet(InsertTabletNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitInsertRows(InsertRowsNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitInsertMultiTablets(InsertMultiTabletsNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitInsertRowsOfOneDevice(InsertRowsOfOneDeviceNode node, C context) {
+    return visitPlan(node, context);
+  }
 }