You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2022/04/20 08:44:32 UTC
[iotdb] branch master updated: [IOTDB-2803]Supplement the remaining metadata write plannode and change SchemaRegion to accept plannode parameter (#5529)
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 380cda196c [IOTDB-2803]Supplement the remaining metadata write plannode and change SchemaRegion to accept plannode parameter (#5529)
380cda196c is described below
commit 380cda196c6829c4f3bcd7236fe6f716f5e664ad
Author: Yifu Zhou <ef...@outlook.com>
AuthorDate: Wed Apr 20 16:44:27 2022 +0800
[IOTDB-2803]Supplement the remaining metadata write plannode and change SchemaRegion to accept plannode parameter (#5529)
---
.../statemachine/SchemaRegionStateMachine.java | 4 +-
.../SchemaExecutionVisitor.java} | 59 ++++++++++-
.../db/mpp/sql/planner/plan/node/PlanVisitor.java | 13 ++-
.../node/metedata/write/AlterTimeSeriesNode.java | 41 +-------
.../node/metedata/write/CreateTimeSeriesNode.java | 1 +
.../iotdb/db/service/InternalServiceImplTest.java | 117 +++++++++++++++++++--
6 files changed, 184 insertions(+), 51 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index d18a1276b7..26f24b03da 100644
--- a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -22,8 +22,8 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.SnapshotMeta;
-import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
+import org.apache.iotdb.db.metadata.visitor.SchemaExecutionVisitor;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
@@ -70,7 +70,7 @@ public class SchemaRegionStateMachine extends BaseStateMachine {
protected TSStatus write(FragmentInstance fragmentInstance) {
logger.info("Execute write plan in SchemaRegionStateMachine");
PlanNode planNode = fragmentInstance.getFragment().getRoot();
- TSStatus status = planNode.accept(new SchemaVisitor(), schemaRegion);
+ TSStatus status = planNode.accept(new SchemaExecutionVisitor(), schemaRegion);
return status;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
rename to server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
index 5ed7cbf395..ce9a6dab83 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/Executor/SchemaVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/visitor/SchemaExecutionVisitor.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.metadata.Executor;
+package org.apache.iotdb.db.metadata.visitor;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -37,9 +38,11 @@ import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+
/** Schema write PlanNode visitor */
-public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
- private static final Logger logger = LoggerFactory.getLogger(SchemaVisitor.class);
+public class SchemaExecutionVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
+ private static final Logger logger = LoggerFactory.getLogger(SchemaExecutionVisitor.class);
@Override
public TSStatus visitCreateTimeSeries(CreateTimeSeriesNode node, ISchemaRegion schemaRegion) {
@@ -53,11 +56,61 @@ public class SchemaVisitor extends PlanVisitor<TSStatus, ISchemaRegion> {
return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
}
+ @Override
+ public TSStatus visitCreateAlignedTimeSeries(
+ CreateAlignedTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ try {
+ PhysicalPlan plan = node.accept(new PhysicalPlanTransformer(), new TransformerContext());
+ schemaRegion.createAlignedTimeSeries((CreateAlignedTimeSeriesPlan) plan);
+ } catch (MetadataException e) {
+ logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+ }
+
+ @Override
+ public TSStatus visitAlterTimeSeries(AlterTimeSeriesNode node, ISchemaRegion schemaRegion) {
+ try {
+ switch (node.getAlterType()) {
+ case RENAME:
+ String beforeName = node.getAlterMap().keySet().iterator().next();
+ String currentName = node.getAlterMap().get(beforeName);
+ schemaRegion.renameTagOrAttributeKey(beforeName, currentName, node.getPath());
+ break;
+ case SET:
+ schemaRegion.setTagsOrAttributesValue(node.getAlterMap(), node.getPath());
+ break;
+ case DROP:
+ schemaRegion.dropTagsOrAttributes(node.getAlterMap().keySet(), node.getPath());
+ break;
+ case ADD_TAGS:
+ schemaRegion.addTags(node.getAlterMap(), node.getPath());
+ break;
+ case ADD_ATTRIBUTES:
+ schemaRegion.addAttributes(node.getAlterMap(), node.getPath());
+ break;
+ case UPSERT:
+ schemaRegion.upsertTagsAndAttributes(
+ node.getAlias(), node.getTagsMap(), node.getAttributesMap(), node.getPath());
+ break;
+ }
+ } catch (MetadataException e) {
+ logger.error("{}: MetaData error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.METADATA_ERROR, e.getMessage());
+ } catch (IOException e) {
+ logger.error("{}: IO error: ", IoTDBConstant.GLOBAL_DB_NAME, e);
+ return RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ return RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS, "Execute successfully");
+ }
+
@Override
public TSStatus visitPlan(PlanNode node, ISchemaRegion context) {
return null;
}
+ // TODO need remove
private static class PhysicalPlanTransformer
extends PlanVisitor<PhysicalPlan, TransformerContext> {
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
index 4b6a4f6758..b526c4da97 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/PlanVisitor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaFetchNo
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaMergeNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.SchemaScanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.read.TimeSeriesSchemaScanNode;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.AlterTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.process.AggregateNode;
@@ -108,10 +109,6 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
- public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
- return visitPlan(node, context);
- }
-
public R visitTimeSeriesSchemaScan(TimeSeriesSchemaScanNode node, C context) {
return visitPlan(node, context);
}
@@ -131,4 +128,12 @@ public abstract class PlanVisitor<R, C> {
public R visitSchemaFetch(SchemaFetchNode node, C context) {
return visitPlan(node, context);
}
+
+ public R visitCreateAlignedTimeSeries(CreateAlignedTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitAlterTimeSeries(AlterTimeSeriesNode node, C context) {
+ return visitPlan(node, context);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
index b1fda2fb3b..c38d0e1452 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/AlterTimeSeriesNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.mpp.sql.statement.metadata.AlterTimeSeriesStatement.AlterType;
import org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
@@ -238,42 +239,10 @@ public class AlterTimeSeriesNode extends PlanNode {
new PlanNodeId(id), path, alterType, alterMap, alias, tagsMap, attributesMap);
}
- // @Override
- // public void executeOn(SchemaRegion schemaRegion) throws MetadataException,
- // QueryProcessException {
- // try {
- // switch (getAlterType()) {
- // case RENAME:
- // String beforeName = alterMap.keySet().iterator().next();
- // String currentName = alterMap.get(beforeName);
- // schemaRegion.renameTagOrAttributeKey(beforeName, currentName, path);
- // break;
- // case SET:
- // schemaRegion.setTagsOrAttributesValue(alterMap, path);
- // break;
- // case DROP:
- // schemaRegion.dropTagsOrAttributes(alterMap.keySet(), path);
- // break;
- // case ADD_TAGS:
- // schemaRegion.addTags(alterMap, path);
- // break;
- // case ADD_ATTRIBUTES:
- // schemaRegion.addAttributes(alterMap, path);
- // break;
- // case UPSERT:
- // schemaRegion.upsertTagsAndAttributes(getAlias(), getTagsMap(), getAttributesMap(),
- // path);
- // break;
- // }
- // } catch (MetadataException e) {
- // throw new QueryProcessException(e);
- // } catch (IOException e) {
- // throw new QueryProcessException(
- // String.format(
- // "Something went wrong while read/write the [%s]'s tag/attribute info.",
- // path.getFullPath()));
- // }
- // }
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C schemaRegion) {
+ return visitor.visitAlterTimeSeries(this, schemaRegion);
+ }
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
index c22136ad04..e5d1959dc3 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/sql/planner/plan/node/metedata/write/CreateTimeSeriesNode.java
@@ -16,6 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write;
import org.apache.iotdb.commons.partition.RegionReplicaSet;
diff --git a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
index 3ffc8ddc55..5def9d7f05 100644
--- a/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/service/InternalServiceImplTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.consensus.ConsensusImpl;
+import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.metadata.LocalConfigNode;
import org.apache.iotdb.db.metadata.path.PartialPath;
@@ -35,6 +36,7 @@ import org.apache.iotdb.db.mpp.sql.analyze.QueryType;
import org.apache.iotdb.db.mpp.sql.planner.plan.FragmentInstance;
import org.apache.iotdb.db.mpp.sql.planner.plan.PlanFragment;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateAlignedTimeSeriesNode;
import org.apache.iotdb.db.mpp.sql.planner.plan.node.metedata.write.CreateTimeSeriesNode;
import org.apache.iotdb.db.service.thrift.impl.InternalServiceImpl;
import org.apache.iotdb.db.utils.EnvironmentUtils;
@@ -49,27 +51,35 @@ import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
import org.apache.ratis.util.FileUtils;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import java.io.File;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class InternalServiceImplTest {
private static final IoTDBConfig conf = IoTDBDescriptor.getInstance().getConfig();
InternalServiceImpl internalServiceImpl;
- LocalConfigNode configNode;
+ static LocalConfigNode configNode;
- @Before
- public void setUp() throws Exception {
+ @BeforeClass
+ public static void setUpBeforeClass() throws IOException, MetadataException {
IoTDB.configManager.init();
configNode = LocalConfigNode.getInstance();
configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
ConsensusImpl.getInstance().start();
+ }
+
+ @Before
+ public void setUp() throws Exception {
RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
ConsensusImpl.getInstance()
.addConsensusGroup(regionReplicaSet.getConsensusGroupId(), genPeerList(regionReplicaSet));
@@ -78,17 +88,20 @@ public class InternalServiceImplTest {
@After
public void tearDown() throws Exception {
- IoTDB.configManager.clear();
RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
ConsensusImpl.getInstance().removeConsensusGroup(regionReplicaSet.getConsensusGroupId());
+ FileUtils.deleteFully(new File(conf.getConsensusDir()));
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws IOException, StorageEngineException {
ConsensusImpl.getInstance().stop();
+ IoTDB.configManager.clear();
EnvironmentUtils.cleanEnv();
- FileUtils.deleteFully(new File("data" + File.separator + "consensus"));
}
@Test
public void createTimeseriesTest() throws MetadataException {
- configNode.getBelongedSchemaRegionIdWithAutoCreate(new PartialPath("root.ln"));
CreateTimeSeriesNode createTimeSeriesNode =
new CreateTimeSeriesNode(
new PlanNodeId("0"),
@@ -147,6 +160,98 @@ public class InternalServiceImplTest {
Assert.assertTrue(response.accepted);
}
+ @Test
+ public void createAlignedTimeseriesTest() throws MetadataException {
+ CreateAlignedTimeSeriesNode createAlignedTimeSeriesNode =
+ new CreateAlignedTimeSeriesNode(
+ new PlanNodeId("0"),
+ new PartialPath("root.ln.wf01.GPS"),
+ new ArrayList<String>() {
+ {
+ add("latitude");
+ add("longitude");
+ }
+ },
+ new ArrayList<TSDataType>() {
+ {
+ add(TSDataType.FLOAT);
+ add(TSDataType.FLOAT);
+ }
+ },
+ new ArrayList<TSEncoding>() {
+ {
+ add(TSEncoding.PLAIN);
+ add(TSEncoding.PLAIN);
+ }
+ },
+ new ArrayList<CompressionType>() {
+ {
+ add(CompressionType.SNAPPY);
+ add(CompressionType.SNAPPY);
+ }
+ },
+ new ArrayList<String>() {
+ {
+ add("meter1");
+ add(null);
+ }
+ },
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("tag1", "t1");
+ }
+ });
+ add(null);
+ }
+ },
+ new ArrayList<Map<String, String>>() {
+ {
+ add(
+ new HashMap<String, String>() {
+ {
+ put("tag1", "t1");
+ }
+ });
+ add(null);
+ }
+ });
+
+ RegionReplicaSet regionReplicaSet = genRegionReplicaSet();
+ PlanFragment planFragment =
+ new PlanFragment(new PlanFragmentId("2", 3), createAlignedTimeSeriesNode);
+ FragmentInstance fragmentInstance =
+ new FragmentInstance(
+ planFragment,
+ planFragment.getId().genFragmentInstanceId(),
+ new GroupByFilter(1, 2, 3, 4),
+ QueryType.WRITE);
+ fragmentInstance.setDataRegionAndHost(regionReplicaSet);
+
+ // serialize fragmentInstance
+ ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+ fragmentInstance.serializeRequest(byteBuffer);
+ byteBuffer.flip();
+
+ // put serialized fragmentInstance to TSendFragmentInstanceReq
+ TSendFragmentInstanceReq request = new TSendFragmentInstanceReq();
+ TFragmentInstance tFragmentInstance = new TFragmentInstance();
+ tFragmentInstance.setBody(byteBuffer);
+ request.setFragmentInstance(tFragmentInstance);
+ request.setConsensusGroupId(
+ new TConsensusGroupId(
+ regionReplicaSet.getConsensusGroupId().getId(),
+ regionReplicaSet.getConsensusGroupId().getType().toString()));
+ request.setQueryType(QueryType.WRITE.toString());
+
+ // Use consensus layer to execute request
+ TSendFragmentInstanceResp response = internalServiceImpl.sendFragmentInstance(request);
+
+ Assert.assertTrue(response.accepted);
+ }
+
private RegionReplicaSet genRegionReplicaSet() {
List<DataNodeLocation> dataNodeList = new ArrayList<>();
dataNodeList.add(