You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2022/05/18 09:03:20 UTC
[iotdb] branch master updated: [IOTDB-3213] Apply visitor pattern for DataRegionStateMachine (#5944)
This is an automated email from the ASF dual-hosted git repository.
haonan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 80475f77d0 [IOTDB-3213] Apply visitor pattern for DataRegionStateMachine (#5944)
80475f77d0 is described below
commit 80475f77d0a0cd111591cd064e11b3e40d818415
Author: Liu Xuxin <37...@users.noreply.github.com>
AuthorDate: Wed May 18 17:03:15 2022 +0800
[IOTDB-3213] Apply visitor pattern for DataRegionStateMachine (#5944)
---
.../statemachine/DataRegionStateMachine.java | 40 +------
.../statemachine/visitor/DataExecutionVisitor.java | 116 +++++++++++++++++++++
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 25 +++++
3 files changed, 143 insertions(+), 38 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 7fe44559f7..89110c4700 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -20,28 +20,17 @@
package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.consensus.DataRegionId;
-import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
-import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.plan.planner.plan.FragmentInstance;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
-import org.apache.iotdb.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
-import java.util.Arrays;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -73,32 +62,7 @@ public class DataRegionStateMachine extends BaseStateMachine {
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
PlanNode planNode = fragmentInstance.getFragment().getRoot();
- try {
- if (planNode instanceof InsertRowNode) {
- region.insert((InsertRowNode) planNode);
- } else if (planNode instanceof InsertTabletNode) {
- region.insertTablet((InsertTabletNode) planNode);
- } else if (planNode instanceof InsertRowsNode) {
- region.insert((InsertRowsNode) planNode);
- } else if (planNode instanceof InsertMultiTabletsNode) {
- region.insertTablets((InsertMultiTabletsNode) (planNode));
- } else if (planNode instanceof InsertRowsOfOneDeviceNode) {
- region.insert((InsertRowsOfOneDeviceNode) planNode);
- } else if (planNode instanceof DeleteRegionNode) {
- region.syncDeleteDataFiles();
- StorageEngineV2.getInstance()
- .deleteDataRegion((DataRegionId) ((DeleteRegionNode) planNode).getConsensusGroupId());
- } else {
- logger.error("Unsupported plan node for writing to data region : {}", planNode);
- return StatusUtils.UNSUPPORTED_OPERATION;
- }
- } catch (BatchProcessException e) {
- return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
- } catch (Exception e) {
- logger.error("Error in executing plan node: {}", planNode);
- return StatusUtils.EXECUTE_STATEMENT_ERROR;
- }
- return StatusUtils.OK;
+ return planNode.accept(new DataExecutionVisitor(), region);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
new file mode 100644
index 0000000000..05a303c40f
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/visitor/DataExecutionVisitor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.consensus.statemachine.visitor;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.db.engine.StorageEngineV2;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.exception.BatchProcessException;
+import org.apache.iotdb.db.exception.TriggerExecutionException;
+import org.apache.iotdb.db.exception.WriteProcessException;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.DeleteRegionNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
+import org.apache.iotdb.rpc.RpcUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class DataExecutionVisitor extends PlanVisitor<TSStatus, DataRegion> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DataExecutionVisitor.class);
+
+ @Override
+ public TSStatus visitPlan(PlanNode node, DataRegion context) {
+ return null;
+ }
+
+ @Override
+ public TSStatus visitInsertRow(InsertRowNode node, DataRegion dataRegion) {
+ try {
+ dataRegion.insert(node);
+ return StatusUtils.OK;
+ } catch (WriteProcessException | TriggerExecutionException e) {
+ LOGGER.error("Error in executing plan node: {}", node, e);
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
+ }
+ }
+
+ @Override
+ public TSStatus visitInsertTablet(InsertTabletNode node, DataRegion dataRegion) {
+ try {
+ dataRegion.insertTablet(node);
+ return StatusUtils.OK;
+ } catch (TriggerExecutionException e) {
+ LOGGER.error("Error in executing plan node: {}", node, e);
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
+ } catch (BatchProcessException e) {
+ return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ }
+ }
+
+ @Override
+ public TSStatus visitInsertRows(InsertRowsNode node, DataRegion dataRegion) {
+ try {
+ dataRegion.insert(node);
+ return StatusUtils.OK;
+ } catch (BatchProcessException e) {
+ return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ }
+ }
+
+ @Override
+ public TSStatus visitInsertMultiTablets(InsertMultiTabletsNode node, DataRegion dataRegion) {
+ try {
+ dataRegion.insertTablets(node);
+ return StatusUtils.OK;
+ } catch (BatchProcessException e) {
+ return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ }
+ }
+
+ @Override
+ public TSStatus visitInsertRowsOfOneDevice(
+ InsertRowsOfOneDeviceNode node, DataRegion dataRegion) {
+ try {
+ dataRegion.insert(node);
+ return StatusUtils.OK;
+ } catch (WriteProcessException | TriggerExecutionException e) {
+ LOGGER.error("Error in executing plan node: {}", node, e);
+ return StatusUtils.EXECUTE_STATEMENT_ERROR;
+ } catch (BatchProcessException e) {
+ return RpcUtils.getStatus(Arrays.asList(e.getFailingStatus()));
+ }
+ }
+
+ @Override
+ public TSStatus visitDeleteRegion(DeleteRegionNode node, DataRegion dataRegion) {
+ dataRegion.syncDeleteDataFiles();
+ StorageEngineV2.getInstance().deleteDataRegion((DataRegionId) node.getConsensusGroupId());
+ return StatusUtils.OK;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 61470a7f20..4d552df27b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -52,6 +52,11 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesAggreg
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.AlignedSeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertMultiTabletsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertRowsOfOneDeviceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertTabletNode;
public abstract class PlanVisitor<R, C> {
@@ -200,4 +205,24 @@ public abstract class PlanVisitor<R, C> {
public R visitDeleteRegion(DeleteRegionNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitInsertRow(InsertRowNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitInsertTablet(InsertTabletNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitInsertRows(InsertRowsNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitInsertMultiTablets(InsertMultiTabletsNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitInsertRowsOfOneDevice(InsertRowsOfOneDeviceNode node, C context) {
+ return visitPlan(node, context);
+ }
}