You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 08:44:32 UTC

[iotdb] branch master updated: [IOTDB-2803]Supplement the remaining metadata write plannode and change SchemaRegion to accept plannode parameter (#5529)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 380cda196c [IOTDB-2803]Supplement the remaining metadata write plannode and change SchemaRegion to accept plannode parameter (#5529)
380cda196c is described below

commit 380cda196c6829c4f3bcd7236fe6f716f5e664ad
Author: Yifu Zhou <ef...@outlook.com>
AuthorDate: Wed Apr 20 16:44:27 2022 +0800

    [IOTDB-2803]Supplement the remaining metadata write plannode and change SchemaRegion to accept plannode parameter (#5529)
---
 .../statemachine/SchemaRegionStateMachine.java     |   4 +-
 .../SchemaExecutionVisitor.java}                   |  59 ++++++++++-
 .../db/mpp/sql/planner/plan/node/PlanVisitor.java  |  13 ++-
 .../node/metedata/write/AlterTimeSeriesNode.java   |  41 +-------
 .../node/metedata/write/CreateTimeSeriesNode.java  |   1 +
 .../iotdb/db/service/InternalServiceImplTest.java  | 117 +++++++++++++++++++--
 6 files changed, 184 insertions(+), 51 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index d18a1276b7..26f24b03da 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.consensus.statemachine;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.SnapshotMeta;
-import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -70,7 +70,7 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
   protected TSStatus write(FragmentInstance fragmentInstance) {
     logger.info("Execute write plan in SchemaRegionStateMachine");
     PlanNode planNode = fragmentInstance.getFragment().getRoot();
-    TSStatus status = planNode.accept(new SchemaVisitor(), schemaRegion);
+    TSStatus status = planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
     return status;
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 5ed7cbf395..ce9a6dab83 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-package org.apache.iotdb.db.metadata.Executor;
+package org.apache.iotdb.db.metadata.visitor;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -37,9 +38,11 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+
 /** Schema write PlanNode visitor */
-public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
-  private static final Logger logger = LoggerFactory.getLogger(SchemaVisitor.class);
+public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
+  private static final Logger logger = LoggerFactory.getLogger(SchemaExecutionVisitor.class);
 
   @Override
   public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
@@ -53,11 +56,61 @@ public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
     return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
   }
 
+  @Override
+  public TSStatus visitCreateAlignedTimeSeries(
+      CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
+    try {
+      PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+      schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+    } catch (MetadataException e) {
+      logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+  }
+
+  @Override
+  public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
+    try {
+      switch (node.getAlterType()) {
+        case RENAME:
+          String beforeName = node.getAlterMap().keySet().iterator().next();
+          String currentName = node.getAlterMap().get(beforeName);
+          schemaRegion.renameTagOrAttributeKey(beforeName, currentName, node.getPath());
+          break;
+        case SET:
+          schemaRegion.setTagsOrAttributesValue(node.getAlterMap(), node.getPath());
+          break;
+        case DROP:
+          schemaRegion.dropTagsOrAttributes(node.getAlterMap().keySet(), node.getPath());
+          break;
+        case ADD_TAGS:
+          schemaRegion.addTags(node.getAlterMap(), node.getPath());
+          break;
+        case ADD_ATTRIBUTES:
+          schemaRegion.addAttributes(node.getAlterMap(), node.getPath());
+          break;
+        case UPSERT:
+          schemaRegion.upsertTagsAndAttributes(
+              node.getAlias(), node.getTagsMap(), node.getAttributesMap(), node.getPath());
+          break;
+      }
+    } catch (MetadataException e) {
+      logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
+    } catch (IOException e) {
+      logger.error("{}: IO error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+      return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+    }
+    return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+  }
+
   @Override
   public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
     return null;
   }
 
+  // TODO need remove
   private static class PhysicalPlanTransformer
       extends PlanVisitor<PhysicalPlan, TransformerContext> {
     @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 4b6a4f6758..b526c4da97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNo
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
@@ -108,10 +109,6 @@ public abstract class PlanVisitor<R, C> {
     return visitPlan(node, context);
   }
 
-  public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
-    return visitPlan(node, context);
-  }
-
   public R visitTimeSeriesSchemaScan(TimeSeriesSchemaScanNode node, C context) {
     return visitPlan(node, context);
   }
@@ -131,4 +128,12 @@ public abstract class PlanVisitor<R, C> {
   public R visitSchemaFetch(SchemaFetchNode node, C context) {
     return visitPlan(node, context);
   }
+
+  public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
+
+  public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
+    return visitPlan(node, context);
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index b1fda2fb3b..c38d0e1452 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
 import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
 import org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -238,42 +239,10 @@ public class AlterTimeSeriesNode extends PlanNode {
         new PlanNodeId(id), path, alterType, alterMap, alias, tagsMap, attributesMap);
   }
 
-  //  @Override
-  //  public void executeOn(SchemaRegion schemaRegion) throws MetadataException,
-  // QueryProcessException {
-  //    try {
-  //      switch (getAlterType()) {
-  //        case RENAME:
-  //          String beforeName = alterMap.keySet().iterator().next();
-  //          String currentName = alterMap.get(beforeName);
-  //          schemaRegion.renameTagOrAttributeKey(beforeName, currentName, path);
-  //          break;
-  //        case SET:
-  //          schemaRegion.setTagsOrAttributesValue(alterMap, path);
-  //          break;
-  //        case DROP:
-  //          schemaRegion.dropTagsOrAttributes(alterMap.keySet(), path);
-  //          break;
-  //        case ADD_TAGS:
-  //          schemaRegion.addTags(alterMap, path);
-  //          break;
-  //        case ADD_ATTRIBUTES:
-  //          schemaRegion.addAttributes(alterMap, path);
-  //          break;
-  //        case UPSERT:
-  //          schemaRegion.upsertTagsAndAttributes(getAlias(), getTagsMap(), getAttributesMap(),
-  // path);
-  //          break;
-  //      }
-  //    } catch (MetadataException e) {
-  //      throw new QueryProcessException(e);
-  //    } catch (IOException e) {
-  //      throw new QueryProcessException(
-  //          String.format(
-  //              "Something went wrong while read/write the [%s]'s tag/attribute info.",
-  //              path.getFullPath()));
-  //    }
-  //  }
+  @Override
+  public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+    return visitor.visitAlterTimeSeries(this, schemaRegion);
+  }
 
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index c22136ad04..e5d1959dc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
 
 import org.apache.iotdb.commons.partition.RegionReplicaSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 3ffc8ddc55..5def9d7f05 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.common.Peer;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.exception.StorageEngineException;
 import org.apache.iotdb.db.exception.metadata.MetadataException;
 import org.apache.iotdb.db.metadata.LocalConfigNode;
 import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
 import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
 import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
 import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -49,27 +51,35 @@ import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
 
 import org.apache.ratis.util.FileUtils;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 public class InternalServiceImplTest {
   private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
   InternalServiceImpl internalServiceImpl;
-  LocalConfigNode configNode;
+  static LocalConfigNode configNode;
 
-  @Before
-  public void setUp() throws Exception {
+  @BeforeClass
+  public static void setUpBeforeClass() throws IOException, MetadataException {
     IoTDB.configManager.init();
     configNode = LocalConfigNode.getInstance();
     configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
     ConsensusImpl.getInstance().start();
+  }
+
+  @Before
+  public void setUp() throws Exception {
     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     ConsensusImpl.getInstance()
         .addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
@@ -78,17 +88,20 @@ public class InternalServiceImplTest {
 
   @After
   public void tearDown() throws Exception {
-    IoTDB.configManager.clear();
     RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
     ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+    FileUtils.deleteFully(new File(conf.getConsensusDir()));
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws IOException, StorageEngineException {
     ConsensusImpl.getInstance().stop();
+    IoTDB.configManager.clear();
     EnvironmentUtils.cleanEnv();
-    FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
   }
 
   @Test
   public void createTimeseriesTest() throws MetadataException {
-    configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
     CreateTimeSeriesNode createTimeSeriesNode =
         new CreateTimeSeriesNode(
             new PlanNodeId("0"),
@@ -147,6 +160,98 @@ public class InternalServiceImplTest {
     Assert.assertTrue(response.accepted);
   }
 
+  @Test
+  public void createAlignedTimeseriesTest() throws MetadataException {
+    CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode =
+        new CreateAlignedTimeSeriesNode(
+            new PlanNodeId("0"),
+            new PartialPath("root.ln.wf01.GPS"),
+            new ArrayList<String>() {
+              {
+                add("latitude");
+                add("longitude");
+              }
+            },
+            new ArrayList<TSDataType>() {
+              {
+                add(TSDataType.FLOAT);
+                add(TSDataType.FLOAT);
+              }
+            },
+            new ArrayList<TSEncoding>() {
+              {
+                add(TSEncoding.PLAIN);
+                add(TSEncoding.PLAIN);
+              }
+            },
+            new ArrayList<CompressionType>() {
+              {
+                add(CompressionType.SNAPPY);
+                add(CompressionType.SNAPPY);
+              }
+            },
+            new ArrayList<String>() {
+              {
+                add("meter1");
+                add(null);
+              }
+            },
+            new ArrayList<Map<String, String>>() {
+              {
+                add(
+                    new HashMap<String, String>() {
+                      {
+                        put("tag1", "t1");
+                      }
+                    });
+                add(null);
+              }
+            },
+            new ArrayList<Map<String, String>>() {
+              {
+                add(
+                    new HashMap<String, String>() {
+                      {
+                        put("tag1", "t1");
+                      }
+                    });
+                add(null);
+              }
+            });
+
+    RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+    PlanFragment planFragment =
+        new PlanFragment(new PlanFragmentId("2", 3), createAlignedTimeSeriesNode);
+    FragmentInstance fragmentInstance =
+        new FragmentInstance(
+            planFragment,
+            planFragment.getId().genFragmentInstanceId(),
+            new GroupByFilter(1, 2, 3, 4),
+            QueryType.WRITE);
+    fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+
+    // serialize fragmentInstance
+    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+    fragmentInstance.serializeRequest(byteBuffer);
+    byteBuffer.flip();
+
+    // put serialized fragmentInstance to TSendFragmentInstanceReq
+    TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+    TFragmentInstance tFragmentInstance = new TFragmentInstance();
+    tFragmentInstance.setBody(byteBuffer);
+    request.setFragmentInstance(tFragmentInstance);
+    request.setConsensusGroupId(
+        new TConsensusGroupId(
+            regionReplicaSet.getConsensusGroupId().getId(),
+            regionReplicaSet.getConsensusGroupId().getType().toString()));
+    request.setQueryType(QueryType.WRITE.toString());
+
+    // Use consensus layer to execute request
+    TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+    Assert.assertTrue(response.accepted);
+  }
+
   private RegionReplicaSet genRegionReplicaSet() {
     List<DataNodeLocation> dataNodeList = new ArrayList<>();
     dataNodeList.add(