You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ta...@apache.org on 2021/07/05 14:26:47 UTC
[iotdb] 01/01: nearly finish
This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch support_template_0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c05a8cdda56dc4fa8b5c57b1a679e12922957ef3
Author: LebronAl <TX...@gmail.com>
AuthorDate: Mon Jul 5 22:26:08 2021 +0800
nearly finish
---
.../iotdb/cluster/coordinator/Coordinator.java | 25 +-
.../iotdb/cluster/log/applier/BaseApplier.java | 4 +-
.../apache/iotdb/cluster/metadata/CMManager.java | 34 ++-
.../cluster/server/member/DataGroupMember.java | 4 +-
.../iotdb/cluster/server/member/RaftMember.java | 10 +
.../apache/iotdb/cluster/utils/PartitionUtils.java | 8 +-
.../apache/iotdb/cluster/utils/StatusUtils.java | 3 +-
.../org/apache/iotdb/db/engine/StorageEngine.java | 3 +-
.../db/engine/querycontext/QueryDataSource.java | 12 +-
.../engine/storagegroup/StorageGroupProcessor.java | 30 +-
.../metadata/DuplicatedTemplateException.java | 31 +++
.../metadata/UndefinedTemplateException.java | 31 +++
.../org/apache/iotdb/db/metadata/MManager.java | 230 ++++++++++++++--
.../java/org/apache/iotdb/db/metadata/MTree.java | 177 ++++++++++--
.../iotdb/db/metadata/logfile/MLogWriter.java | 21 ++
.../org/apache/iotdb/db/metadata/mnode/MNode.java | 69 +++++
.../iotdb/db/metadata/template/Template.java | 147 ++++++++++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 38 ++-
.../org/apache/iotdb/db/qp/logical/Operator.java | 4 +
.../apache/iotdb/db/qp/physical/PhysicalPlan.java | 59 ++--
.../db/qp/physical/crud/CreateTemplatePlan.java | 270 ++++++++++++++++++
.../db/qp/physical/crud/SetDeviceTemplatePlan.java | 94 +++++++
.../qp/physical/sys/AutoCreateDeviceMNodePlan.java | 87 ++++++
.../physical/sys/SetUsingDeviceTemplatePlan.java | 83 ++++++
.../db/query/dataset/AlignByDeviceDataSet.java | 14 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 83 ++++++
.../storagegroup/StorageGroupProcessorTest.java | 18 +-
.../iotdb/db/engine/storagegroup/TTLTest.java | 6 +-
.../iotdb/db/metadata/MManagerBasicTest.java | 302 +++++++++++++++++++++
.../iotdb/db/metadata/MManagerImproveTest.java | 17 +-
.../iotdb/db/qp/physical/InsertRowPlanTest.java | 200 ++++++++++++++
.../iotdb/db/qp/physical/InsertTabletPlanTest.java | 156 +++++++++++
.../reader/series/SeriesAggregateReaderTest.java | 6 +-
.../reader/series/SeriesReaderByTimestampTest.java | 6 +-
.../main/java/org/apache/iotdb/rpc/RpcUtils.java | 1 +
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 3 +
thrift/src/main/thrift/rpc.thrift | 20 ++
37 files changed, 2138 insertions(+), 168 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
index 6332d3d..492b89d 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/coordinator/Coordinator.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@ -47,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
@@ -183,12 +185,31 @@ public class Coordinator {
metaGroupMember.waitLeader();
return metaGroupMember.forwardPlan(plan, metaGroupMember.getLeader(), null);
}
-
+ try {
+ createSchemaIfNecessary(plan);
+ } catch (MetadataException | CheckConsistencyException e) {
+ logger.error("{}: Cannot find storage groups for {}", name, plan);
+ return StatusUtils.NO_STORAGE_GROUP;
+ }
List<PartitionGroup> globalGroups = metaGroupMember.getPartitionTable().getGlobalGroups();
logger.debug("Forwarding global data plan {} to {} groups", plan, globalGroups.size());
return forwardPlan(globalGroups, plan);
}
+ public void createSchemaIfNecessary(PhysicalPlan plan)
+ throws MetadataException, CheckConsistencyException {
+ if (plan instanceof SetDeviceTemplatePlan) {
+ try {
+ IoTDB.metaManager.getStorageGroupPath(
+ new PartialPath(((SetDeviceTemplatePlan) plan).getPrefixPath()));
+ } catch (IllegalPathException e) {
+ // the plan has been checked
+ } catch (StorageGroupNotSetException e) {
+ ((CMManager) IoTDB.metaManager).createSchema(plan);
+ }
+ }
+ }
+
/**
* A partitioned plan (like batch insertion) will be split into several sub-plans, each belongs to
* a data group. And these sub-plans will be sent to and executed on the corresponding groups
@@ -276,6 +297,8 @@ public class Coordinator {
status = forwardPlan(plan, partitionGroup);
}
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
+ && !(plan instanceof SetDeviceTemplatePlan
+ && status.getCode() == TSStatusCode.DUPLICATED_TEMPLATE.getStatusCode())
&& (!(plan instanceof DeleteTimeSeriesPlan)
|| status.getCode() != TSStatusCode.TIMESERIES_NOT_EXIST.getStatusCode())) {
// execution failed, record the error message
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
index 7b6c7e4..63231e3 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/log/applier/BaseApplier.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
@@ -77,7 +78,8 @@ abstract class BaseApplier implements LogApplier {
} catch (BatchProcessException e) {
handleBatchProcessException(e, plan);
} catch (QueryProcessException e) {
- if (e.getCause() instanceof StorageGroupNotSetException) {
+ if (e.getCause() instanceof StorageGroupNotSetException
+ || e.getCause() instanceof UndefinedTemplateException) {
executeAfterSync(plan);
} else {
throw e;
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 2a6ab47..54ddcf6 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -47,6 +47,7 @@ import org.apache.iotdb.db.metadata.MetaUtils;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.BatchPlan;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -55,6 +56,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateMultiTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
@@ -331,7 +333,8 @@ public class CMManager extends MManager {
}
@Override
- public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException {
+ public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
+ throws MetadataException, IOException {
MeasurementMNode[] measurementMNodes = new MeasurementMNode[plan.getMeasurements().length];
int nonExistSchemaIndex =
getMNodesLocally(plan.getDeviceId(), plan.getMeasurements(), measurementMNodes);
@@ -404,6 +407,16 @@ public class CMManager extends MManager {
}
}
+ @Override
+ public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path)
+ throws MetadataException, IOException {
+ return getDeviceNodeWithAutoCreate(
+ path,
+ ClusterDescriptor.getInstance().getConfig().isEnableAutoCreateSchema(),
+ false,
+ config.getDefaultStorageGroupLevel());
+ }
+
private static class RemoteMetaCache extends LRUCache<PartialPath, MeasurementMNode> {
RemoteMetaCache(int cacheSize) {
@@ -451,23 +464,14 @@ public class CMManager extends MManager {
} else if (plan instanceof InsertRowPlan || plan instanceof InsertTabletPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((InsertPlan) plan).getDeviceId())));
- } else if (plan instanceof InsertRowsPlan) {
- storageGroups.addAll(
- getStorageGroups(
- ((InsertRowsPlan) plan)
- .getInsertRowPlanList().stream()
- .map(InsertPlan::getDeviceId)
- .collect(Collectors.toList())));
- } else if (plan instanceof InsertMultiTabletPlan) {
- storageGroups.addAll(
- getStorageGroups(
- ((InsertMultiTabletPlan) plan)
- .getInsertTabletPlanList().stream()
- .map(InsertPlan::getDeviceId)
- .collect(Collectors.toList())));
} else if (plan instanceof CreateTimeSeriesPlan) {
storageGroups.addAll(
getStorageGroups(Collections.singletonList(((CreateTimeSeriesPlan) plan).getPath())));
+ } else if (plan instanceof SetDeviceTemplatePlan) {
+ storageGroups.addAll(
+ getStorageGroups(
+ Collections.singletonList(
+ new PartialPath(((SetDeviceTemplatePlan) plan).getPrefixPath()))));
} else {
storageGroups.addAll(getStorageGroups(plan.getPaths()));
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
index 2b2b646..6f6936a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/DataGroupMember.java
@@ -74,6 +74,7 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor.TimePartiti
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -701,7 +702,8 @@ public class DataGroupMember extends RaftMember {
return StatusUtils.OK;
} catch (Exception e) {
Throwable cause = IOUtils.getRootCause(e);
- if (cause instanceof StorageGroupNotSetException) {
+ if (cause instanceof StorageGroupNotSetException
+ || cause instanceof UndefinedTemplateException) {
try {
metaGroupMember.syncLeaderWithConsistencyCheck(true);
if (plan instanceof InsertPlan && ((InsertPlan) plan).getFailedMeasurements() != null) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
index 9890f6b..f2187f9 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/server/member/RaftMember.java
@@ -62,10 +62,12 @@ import org.apache.iotdb.cluster.utils.PlanSerializer;
import org.apache.iotdb.cluster.utils.StatusUtils;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.exception.IoTDBException;
+import org.apache.iotdb.db.exception.metadata.DuplicatedTemplateException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -1485,6 +1487,14 @@ public abstract class RaftMember {
if (cause instanceof BatchProcessException) {
return RpcUtils.getStatus(Arrays.asList(((BatchProcessException) cause).getFailingStatus()));
}
+ if (cause instanceof DuplicatedTemplateException) {
+ return StatusUtils.DUPLICATED_TEMPLATE.deepCopy().setMessage(cause.getMessage());
+ }
+ if (cause instanceof StorageGroupNotSetException) {
+ TSStatus status = StatusUtils.getStatus(TSStatusCode.STORAGE_GROUP_NOT_EXIST);
+ status.setMessage(cause.getMessage());
+ return status;
+ }
TSStatus tsStatus =
StatusUtils.getStatus(StatusUtils.EXECUTE_STATEMENT_ERROR, cause.getMessage());
if (cause instanceof RuntimeException) {
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
index 3e583bd..9bcd9ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/PartitionUtils.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.cluster.partition.PartitionTable;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
import org.apache.iotdb.db.qp.physical.sys.DataAuthPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
@@ -97,7 +99,8 @@ public class PartitionUtils {
|| plan instanceof AuthorPlan
|| plan instanceof DeleteStorageGroupPlan
// DataAuthPlan is global because all nodes must have all user info
- || plan instanceof DataAuthPlan;
+ || plan instanceof DataAuthPlan
+ || plan instanceof CreateTemplatePlan;
}
/**
@@ -112,7 +115,8 @@ public class PartitionUtils {
plan instanceof DeletePlan
|| plan instanceof DeleteTimeSeriesPlan
|| plan instanceof MergePlan
- || plan instanceof FlushPlan;
+ || plan instanceof FlushPlan
+ || plan instanceof SetDeviceTemplatePlan;
}
public static int calculateStorageGroupSlotByTime(
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
index 52231d7..719e52a 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/StatusUtils.java
@@ -45,8 +45,9 @@ public class StatusUtils {
public static final TSStatus TIMESERIES_NOT_EXIST_ERROR =
getStatus(TSStatusCode.TIMESERIES_NOT_EXIST);
public static final TSStatus NO_CONNECTION = getStatus(TSStatusCode.NO_CONNECTION);
+ public static final TSStatus DUPLICATED_TEMPLATE = getStatus(TSStatusCode.DUPLICATED_TEMPLATE);
- private static TSStatus getStatus(TSStatusCode statusCode) {
+ public static TSStatus getStatus(TSStatusCode statusCode) {
TSStatus status = new TSStatus();
status.setCode(statusCode.getStatusCode());
switch (statusCode) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 4727b7f..02ea6bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -713,10 +713,9 @@ public class StorageEngine implements IService {
throws StorageEngineException, QueryProcessException {
PartialPath fullPath = (PartialPath) seriesExpression.getSeriesPath();
PartialPath deviceId = fullPath.getDevicePath();
- String measurementId = seriesExpression.getSeriesPath().getMeasurement();
StorageGroupProcessor storageGroupProcessor = getProcessor(deviceId);
return storageGroupProcessor.query(
- deviceId, measurementId, context, filePathsManager, seriesExpression.getFilter());
+ fullPath, context, filePathsManager, seriesExpression.getFilter());
}
/**
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
index b3dfb98..60a6de5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/querycontext/QueryDataSource.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.engine.querycontext;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
-import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
@@ -28,26 +27,17 @@ import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
import java.util.List;
public class QueryDataSource {
- private PartialPath seriesPath;
private List<TsFileResource> seqResources;
private List<TsFileResource> unseqResources;
/** data older than currentTime - dataTTL should be ignored. */
private long dataTTL = Long.MAX_VALUE;
- public QueryDataSource(
- PartialPath seriesPath,
- List<TsFileResource> seqResources,
- List<TsFileResource> unseqResources) {
- this.seriesPath = seriesPath;
+ public QueryDataSource(List<TsFileResource> seqResources, List<TsFileResource> unseqResources) {
this.seqResources = seqResources;
this.unseqResources = unseqResources;
}
- public PartialPath getSeriesPath() {
- return seriesPath;
- }
-
public List<TsFileResource> getSeqResources() {
return seqResources;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 9461bac..6e589d3 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -1533,8 +1533,7 @@ public class StorageGroupProcessor {
// TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSource query(
- PartialPath deviceId,
- String measurementId,
+ PartialPath fullPath,
QueryContext context,
QueryFileManager filePathsManager,
Filter timeFilter)
@@ -1545,8 +1544,7 @@ public class StorageGroupProcessor {
getFileResourceListForQuery(
tsFileManagement.getTsFileList(true),
upgradeSeqFileList,
- deviceId,
- measurementId,
+ fullPath,
context,
timeFilter,
true);
@@ -1554,12 +1552,11 @@ public class StorageGroupProcessor {
getFileResourceListForQuery(
tsFileManagement.getTsFileList(false),
upgradeUnseqFileList,
- deviceId,
- measurementId,
+ fullPath,
context,
timeFilter,
false);
- QueryDataSource dataSource = new QueryDataSource(deviceId, seqResources, unseqResources);
+ QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources);
// used files should be added before mergeLock is unlocked, or they may be deleted by
// running merge
// is null only in tests
@@ -1604,24 +1601,25 @@ public class StorageGroupProcessor {
private List<TsFileResource> getFileResourceListForQuery(
Collection<TsFileResource> tsFileResources,
List<TsFileResource> upgradeTsFileResources,
- PartialPath deviceId,
- String measurementId,
+ PartialPath fullPath,
QueryContext context,
Filter timeFilter,
boolean isSeq)
throws MetadataException {
+ String deviceId = fullPath.getDevice();
+
if (context.isDebug()) {
DEBUG_LOGGER.info(
"Path: {}.{}, get tsfile list: {} isSeq: {} timefilter: {}",
- deviceId.getFullPath(),
- measurementId,
+ deviceId,
+ fullPath.getMeasurement(),
tsFileResources,
isSeq,
(timeFilter == null ? "null" : timeFilter));
}
- MeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(deviceId, measurementId);
+ MeasurementSchema schema = IoTDB.metaManager.getSeriesSchema(fullPath);
List<TsFileResource> tsfileResourcesForQuery = new ArrayList<>();
long timeLowerBound =
@@ -1631,7 +1629,7 @@ public class StorageGroupProcessor {
// for upgrade files and old files must be closed
for (TsFileResource tsFileResource : upgradeTsFileResources) {
if (!tsFileResource.isSatisfied(
- deviceId.getFullPath(), timeFilter, isSeq, dataTTL, context.isDebug())) {
+ fullPath.getDevice(), timeFilter, isSeq, dataTTL, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1644,7 +1642,7 @@ public class StorageGroupProcessor {
for (TsFileResource tsFileResource : tsFileResources) {
if (!tsFileResource.isSatisfied(
- deviceId.getFullPath(), timeFilter, isSeq, dataTTL, context.isDebug())) {
+ fullPath.getDevice(), timeFilter, isSeq, dataTTL, context.isDebug())) {
continue;
}
closeQueryLock.readLock().lock();
@@ -1655,8 +1653,8 @@ public class StorageGroupProcessor {
tsFileResource
.getUnsealedFileProcessor()
.query(
- deviceId.getFullPath(),
- measurementId,
+ deviceId,
+ fullPath.getMeasurement(),
schema.getType(),
schema.getEncodingType(),
schema.getProps(),
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicatedTemplateException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicatedTemplateException.java
new file mode 100644
index 0000000..0ffee81
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/DuplicatedTemplateException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class DuplicatedTemplateException extends MetadataException {
+ public DuplicatedTemplateException(String path) {
+ super(
+ String.format("Failed to create duplicated template for path %s", path),
+ TSStatusCode.DUPLICATED_TEMPLATE.getStatusCode(),
+ true);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/exception/metadata/UndefinedTemplateException.java b/server/src/main/java/org/apache/iotdb/db/exception/metadata/UndefinedTemplateException.java
new file mode 100644
index 0000000..32c2729
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/exception/metadata/UndefinedTemplateException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.exception.metadata;
+
+import org.apache.iotdb.rpc.TSStatusCode;
+
+public class UndefinedTemplateException extends MetadataException {
+ public UndefinedTemplateException(String path) {
+ super(
+ String.format("Undefined template name: " + path),
+ TSStatusCode.UNDEFINED_TEMPLATE.getStatusCode(),
+ true);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index d34088c..22561b9 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -27,23 +27,29 @@ import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.exception.metadata.DeleteFailedException;
+import org.apache.iotdb.db.exception.metadata.DuplicatedTemplateException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.exception.metadata.UndefinedTemplateException;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.monitor.MonitorConstants;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -51,6 +57,7 @@ import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetUsingDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -134,7 +141,7 @@ public class MManager {
private TagLogFile tagLogFile;
private boolean isRecovering;
// device -> DeviceMNode
- private RandomDeleteCache<PartialPath, MNode> mNodeCache;
+ private RandomDeleteCache<PartialPath, Pair<MNode, Template>> mNodeCache;
// tag key -> tag value -> LeafMNode
private Map<String, Map<String, Set<MeasurementMNode>>> tagIndex = new ConcurrentHashMap<>();
@@ -155,6 +162,9 @@ public class MManager {
private static final int ESTIMATED_SERIES_SIZE = config.getEstimatedSeriesSize();
+ // template name -> template
+ private Map<String, Template> templateMap = new ConcurrentHashMap<>();
+
private static class MManagerHolder {
private MManagerHolder() {
@@ -185,10 +195,10 @@ public class MManager {
int cacheSize = config.getmManagerCacheSize();
mNodeCache =
- new RandomDeleteCache<PartialPath, MNode>(cacheSize) {
+ new RandomDeleteCache<PartialPath, Pair<MNode, Template>>(cacheSize) {
@Override
- public MNode loadObjectByKey(PartialPath key) throws CacheException {
+ public Pair<MNode, Template> loadObjectByKey(PartialPath key) throws CacheException {
try {
return mtree.getNodeByPathWithStorageGroupCheck(key);
} catch (MetadataException e) {
@@ -301,6 +311,7 @@ public class MManager {
this.mNodeCache.clear();
this.tagIndex.clear();
this.totalSeriesNumber.set(0);
+ this.templateMap.clear();
if (logWriter != null) {
logWriter.close();
logWriter = null;
@@ -355,6 +366,21 @@ public class MManager {
ChangeTagOffsetPlan changeTagOffsetPlan = (ChangeTagOffsetPlan) plan;
changeOffset(changeTagOffsetPlan.getPath(), changeTagOffsetPlan.getOffset());
break;
+ case CREATE_TEMPLATE:
+ CreateTemplatePlan createTemplatePlan = (CreateTemplatePlan) plan;
+ createDeviceTemplate(createTemplatePlan);
+ break;
+ case SET_DEVICE_TEMPLATE:
+ SetDeviceTemplatePlan setDeviceTemplatePlan = (SetDeviceTemplatePlan) plan;
+ setDeviceTemplate(setDeviceTemplatePlan);
+ break;
+ case SET_USING_DEVICE_TEMPLATE:
+ SetUsingDeviceTemplatePlan setUsingDeviceTemplatePlan = (SetUsingDeviceTemplatePlan) plan;
+ setUsingDeviceTemplate(setUsingDeviceTemplatePlan);
+ break;
+ case AUTO_CREATE_DEVICE_MNODE:
+ AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = (AutoCreateDeviceMNodePlan) plan;
+ autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
default:
logger.error("Unrecognizable command {}", plan.getOperatorType());
}
@@ -978,12 +1004,20 @@ public class MManager {
public MeasurementSchema getSeriesSchema(PartialPath device, String measurement)
throws MetadataException {
- MNode node = mtree.getNodeByPath(device);
- MNode leaf = node.getChild(measurement);
- if (leaf != null) {
- return ((MeasurementMNode) leaf).getSchema();
- }
- return null;
+ MNode deviceMNode = getDeviceNode(device);
+ MeasurementMNode measurementMNode = (MeasurementMNode) deviceMNode.getChild(measurement);
+ return measurementMNode.getSchema();
+ }
+
+ /**
+ * Get schema of paritialPath
+ *
+ * @param fullPath (may be ParitialPath or VectorPartialPath)
+ * @return MeasurementSchema or VectorMeasurementSchema
+ */
+ public MeasurementSchema getSeriesSchema(PartialPath fullPath) throws MetadataException {
+ MeasurementMNode leaf = (MeasurementMNode) mtree.getNodeByPath(fullPath);
+ return leaf.getSchema();
}
/**
@@ -1047,9 +1081,10 @@ public class MManager {
*
* @param path path
*/
- public MNode getDeviceNodeWithAutoCreate(PartialPath path, boolean autoCreateSchema, int sgLevel)
- throws MetadataException {
- MNode node;
+ public Pair<MNode, Template> getDeviceNodeWithAutoCreate(
+ PartialPath path, boolean autoCreateSchema, boolean allowCreateSg, int sgLevel)
+ throws IOException, MetadataException {
+ Pair<MNode, Template> node;
boolean shouldSetStorageGroup;
try {
node = mNodeCache.get(path);
@@ -1063,28 +1098,66 @@ public class MManager {
try {
if (shouldSetStorageGroup) {
- PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(path, sgLevel);
- setStorageGroup(storageGroupPath);
+ if (allowCreateSg) {
+ PartialPath storageGroupPath = MetaUtils.getStorageGroupPathByLevel(path, sgLevel);
+ setStorageGroup(storageGroupPath);
+ } else {
+ throw new StorageGroupNotSetException(path.getFullPath());
+ }
}
node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel);
+ if (!(node.left instanceof StorageGroupMNode)) {
+ logWriter.autoCreateDeviceMNode(new AutoCreateDeviceMNodePlan(node.left.getPartialPath()));
+ }
return node;
} catch (StorageGroupAlreadySetException e) {
// ignore set storage group concurrently
node = mtree.getDeviceNodeWithAutoCreating(path, sgLevel);
+ if (!(node.left instanceof StorageGroupMNode)) {
+ logWriter.autoCreateDeviceMNode(new AutoCreateDeviceMNodePlan(node.left.getPartialPath()));
+ }
return node;
}
}
/** !!!!!!Attention!!!!! must call the return node's readUnlock() if you call this method. */
- public MNode getDeviceNodeWithAutoCreate(PartialPath path) throws MetadataException {
+ public Pair<MNode, Template> getDeviceNodeWithAutoCreate(PartialPath path)
+ throws MetadataException, IOException {
return getDeviceNodeWithAutoCreate(
- path, config.isAutoCreateSchemaEnabled(), config.getDefaultStorageGroupLevel());
+ path, config.isAutoCreateSchemaEnabled(), true, config.getDefaultStorageGroupLevel());
+ }
+
+ @TestOnly
+ // attention: this path must be a device node
+ public List<MeasurementSchema> getAllMeasurementByDevicePath(PartialPath path)
+ throws PathNotExistException {
+ Set<MeasurementSchema> res = new HashSet<>();
+ try {
+ Pair<MNode, Template> mNodeTemplatePair = mNodeCache.get(path);
+ if (mNodeTemplatePair.left.getDeviceTemplate() != null) {
+ mNodeTemplatePair.right = mNodeTemplatePair.left.getDeviceTemplate();
+ }
+
+ for (MNode mNode : mNodeTemplatePair.left.getChildren().values()) {
+ MeasurementMNode measurementMNode = (MeasurementMNode) mNode;
+ res.add(measurementMNode.getSchema());
+ }
+
+ // template
+ if (mNodeTemplatePair.left.isUseTemplate() && mNodeTemplatePair.right != null) {
+ res.addAll(mNodeTemplatePair.right.getSchemaMap().values());
+ }
+ } catch (CacheException e) {
+ throw new PathNotExistException(path.getFullPath());
+ }
+
+ return new ArrayList<>(res);
}
public MNode getDeviceNode(PartialPath path) throws MetadataException {
MNode node;
try {
- node = mNodeCache.get(path);
+ node = mNodeCache.get(path).left;
return node;
} catch (CacheException e) {
throw new PathNotExistException(path.getFullPath());
@@ -1812,14 +1885,18 @@ public class MManager {
/** get schema for device. Attention!!! Only support insertPlan */
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
- public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan) throws MetadataException {
+ public MNode getSeriesSchemasAndReadLockDevice(InsertPlan plan)
+ throws MetadataException, IOException {
PartialPath deviceId = plan.getDeviceId();
String[] measurementList = plan.getMeasurements();
MeasurementMNode[] measurementMNodes = plan.getMeasurementMNodes();
// 1. get device node
- MNode deviceMNode = getDeviceNodeWithAutoCreate(deviceId);
+ Pair<MNode, Template> deviceMNode = getDeviceNodeWithAutoCreate(deviceId);
+ if (deviceMNode.left.getDeviceTemplate() != null) {
+ deviceMNode.right = deviceMNode.left.getDeviceTemplate();
+ }
// 2. get schema of each measurement
// if do not has measurement
@@ -1827,11 +1904,13 @@ public class MManager {
TSDataType dataType;
for (int i = 0; i < measurementList.length; i++) {
try {
- MNode child = getMNode(deviceMNode, measurementList[i]);
+ MNode child = getMNode(deviceMNode.left, measurementList[i]);
if (child instanceof MeasurementMNode) {
measurementMNode = (MeasurementMNode) child;
} else if (child instanceof StorageGroupMNode) {
throw new PathAlreadyExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
+ } else if ((measurementMNode = findTemplate(deviceMNode, measurementList[i])) != null) {
+ // empty
} else {
if (!config.isAutoCreateSchemaEnabled()) {
throw new PathNotExistException(deviceId + PATH_SEPARATOR + measurementList[i]);
@@ -1840,7 +1919,7 @@ public class MManager {
dataType = getTypeInLoc(plan, i);
// create it, may concurrent created by multiple thread
internalCreateTimeseries(deviceId.concatNode(measurementList[i]), dataType);
- measurementMNode = (MeasurementMNode) deviceMNode.getChild(measurementList[i]);
+ measurementMNode = (MeasurementMNode) deviceMNode.left.getChild(measurementList[i]);
}
}
@@ -1895,13 +1974,36 @@ public class MManager {
}
}
- return deviceMNode;
+ return deviceMNode.left;
}
public MNode getMNode(MNode deviceMNode, String measurementName) {
return deviceMNode.getChild(measurementName);
}
+ private MeasurementMNode findTemplate(Pair<MNode, Template> deviceMNode, String measurement)
+ throws MetadataException {
+ if (deviceMNode.right != null) {
+ Map<String, MeasurementSchema> curTemplateMap = deviceMNode.right.getSchemaMap();
+
+ MeasurementSchema schema = curTemplateMap.get(measurement);
+ if (!deviceMNode.left.isUseTemplate()) {
+ deviceMNode.left.setUseTemplate(true);
+ try {
+ logWriter.setUsingDeviceTemplate(deviceMNode.left.getPartialPath());
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ if (schema != null) {
+ return new MeasurementMNode(deviceMNode.left, measurement, schema, null);
+ }
+ return null;
+ }
+ return null;
+ }
+
/** create timeseries with ignore PathAlreadyExistException */
private void internalCreateTimeseries(PartialPath path, TSDataType dataType)
throws MetadataException {
@@ -1940,6 +2042,90 @@ public class MManager {
boolean satisfy(String storageGroup);
}
+ public void createDeviceTemplate(CreateTemplatePlan plan) throws MetadataException {
+ try {
+ Template template = new Template(plan);
+ if (templateMap.putIfAbsent(plan.getName(), template) != null) {
+ // already have template
+ throw new DuplicatedTemplateException(plan.getName());
+ }
+
+ // write wal
+ if (!isRecovering) {
+ logWriter.createDeviceTemplate(plan);
+ }
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ public void setDeviceTemplate(SetDeviceTemplatePlan plan) throws MetadataException {
+ try {
+ Template template = templateMap.get(plan.getTemplateName());
+
+ if (template == null) {
+ throw new UndefinedTemplateException(plan.getTemplateName());
+ }
+
+ // get mnode and update template should be atomic
+ synchronized (this) {
+ Pair<MNode, Template> node =
+ getDeviceNodeWithAutoCreate(new PartialPath(plan.getPrefixPath()));
+
+ if (node.left.getDeviceTemplate() != null) {
+ if (node.left.getDeviceTemplate().equals(template)) {
+ throw new DuplicatedTemplateException(template.getName());
+ } else {
+ throw new MetadataException("Specified node already has template");
+ }
+ }
+
+ if (!isTemplateCompatible(node.right, template)) {
+ throw new MetadataException("Incompatible template");
+ }
+
+ node.left.setDeviceTemplate(template);
+ }
+
+ // write wal
+ if (!isRecovering) {
+ logWriter.setDeviceTemplate(plan);
+ }
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
+ }
+
+ public boolean isTemplateCompatible(Template upper, Template current) {
+ if (upper == null) {
+ return true;
+ }
+
+ Map<String, MeasurementSchema> upperMap = new HashMap<>(upper.getSchemaMap());
+ Map<String, MeasurementSchema> currentMap = new HashMap<>(current.getSchemaMap());
+
+ for (String name : currentMap.keySet()) {
+ MeasurementSchema upperSchema = upperMap.remove(name);
+ if (upperSchema != null) {
+ MeasurementSchema currentSchema = currentMap.get(name);
+ if (!upperSchema.equals(currentSchema)) {
+ return false;
+ }
+ }
+ }
+
+ // current template must contains all measurements of upper template
+ return upperMap.isEmpty();
+ }
+
+ public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws MetadataException {
+ mtree.getDeviceNodeWithAutoCreating(plan.getPath(), config.getDefaultStorageGroupLevel());
+ }
+
+ private void setUsingDeviceTemplate(SetUsingDeviceTemplatePlan plan) throws MetadataException {
+ getDeviceNode(plan.getPrefixPath()).setUseTemplate(true);
+ }
+
public long getTotalSeriesNumber() {
return totalSeriesNumber.get();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
index 7dbf727..36f94d3 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java
@@ -22,15 +22,27 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
-import org.apache.iotdb.db.exception.metadata.*;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.IllegalParameterOfPathException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
-import org.apache.iotdb.db.qp.physical.sys.*;
+import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
@@ -53,14 +65,31 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import static org.apache.iotdb.db.conf.IoTDBConstant.LOSS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.SDT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.SDT_COMP_DEV;
+import static org.apache.iotdb.db.conf.IoTDBConstant.SDT_COMP_MAX_TIME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.SDT_COMP_MIN_TIME;
/** The hierarchical struct of the Metadata Tree is implemented in this class. */
public class MTree implements Serializable {
@@ -190,6 +219,7 @@ public class MTree implements Serializable {
checkTimeseries(path.getFullPath());
MNode cur = root;
boolean hasSetStorageGroup = false;
+ Template upperTemplate = cur.getDeviceTemplate();
// e.g, path = root.sg.d1.s1, create internal nodes and set cur to d1 node
for (int i = 1; i < nodeNames.length - 1; i++) {
String nodeName = nodeNames[i];
@@ -203,6 +233,15 @@ public class MTree implements Serializable {
cur.addChild(nodeName, new MNode(cur, nodeName));
}
cur = cur.getChild(nodeName);
+
+ if (cur.getDeviceTemplate() != null) {
+ upperTemplate = cur.getDeviceTemplate();
+ }
+ }
+
+ if (upperTemplate != null && !upperTemplate.isCompatible(path)) {
+ throw new PathAlreadyExistException(
+ path.getFullPath() + " ( which is incompatible with template )");
}
if (props != null && props.containsKey(LOSS) && props.get(LOSS).equals(SDT)) {
@@ -304,12 +343,14 @@ public class MTree implements Serializable {
*
* <p>e.g., get root.sg.d1, get or create all internal nodes and return the node of d1
*/
- MNode getDeviceNodeWithAutoCreating(PartialPath deviceId, int sgLevel) throws MetadataException {
+ Pair<MNode, Template> getDeviceNodeWithAutoCreating(PartialPath deviceId, int sgLevel)
+ throws MetadataException {
String[] nodeNames = deviceId.getNodes();
if (nodeNames.length <= 1 || !nodeNames[0].equals(root.getName())) {
throw new IllegalPathException(deviceId.getFullPath());
}
MNode cur = root;
+ Template upperTemplate = null;
for (int i = 1; i < nodeNames.length; i++) {
if (!cur.hasChild(nodeNames[i])) {
if (i == sgLevel) {
@@ -321,9 +362,11 @@ public class MTree implements Serializable {
cur.addChild(nodeNames[i], new MNode(cur, nodeNames[i]));
}
}
+ // update upper template
+ upperTemplate = cur.getDeviceTemplate() == null ? upperTemplate : cur.getDeviceTemplate();
cur = cur.getChild(nodeNames[i]);
}
- return cur;
+ return new Pair<>(cur, upperTemplate);
}
/**
@@ -505,7 +548,8 @@ public class MTree implements Serializable {
* Get node by path with storage group check If storage group is not set,
* StorageGroupNotSetException will be thrown
*/
- MNode getNodeByPathWithStorageGroupCheck(PartialPath path) throws MetadataException {
+ Pair<MNode, Template> getNodeByPathWithStorageGroupCheck(PartialPath path)
+ throws MetadataException {
boolean storageGroupChecked = false;
String[] nodes = path.getNodes();
if (nodes.length == 0 || !nodes[0].equals(root.getName())) {
@@ -513,7 +557,9 @@ public class MTree implements Serializable {
}
MNode cur = root;
+ Template upperTemplate = null;
for (int i = 1; i < nodes.length; i++) {
+ upperTemplate = cur.getDeviceTemplate() == null ? upperTemplate : cur.getDeviceTemplate();
cur = cur.getChild(nodes[i]);
if (cur == null) {
// not find
@@ -531,7 +577,7 @@ public class MTree implements Serializable {
if (!storageGroupChecked) {
throw new StorageGroupNotSetException(path.getFullPath());
}
- return cur;
+ return new Pair<>(cur, upperTemplate);
}
/**
@@ -583,11 +629,25 @@ public class MTree implements Serializable {
throw new IllegalPathException(path.getFullPath());
}
MNode cur = root;
+ Template upperTemplate = cur.getDeviceTemplate();
for (int i = 1; i < nodes.length; i++) {
- cur = cur.getChild(nodes[i]);
- if (cur == null) {
- throw new PathNotExistException(path.getFullPath(), true);
+ if (cur.getDeviceTemplate() != null) {
+ upperTemplate = cur.getDeviceTemplate();
}
+ MNode next = cur.getChild(nodes[i]);
+ if (next == null) {
+ if (upperTemplate == null) {
+ throw new PathNotExistException(path.getFullPath(), true);
+ }
+
+ String realName = nodes[i];
+ MeasurementSchema schema = upperTemplate.getSchemaMap().get(realName);
+ if (schema == null) {
+ throw new PathNotExistException(path.getFullPath(), true);
+ }
+ return new MeasurementMNode(cur, schema.getMeasurementId(), schema, null);
+ }
+ cur = next;
}
return cur;
}
@@ -1007,7 +1067,7 @@ public class MTree implements Serializable {
}
List<Pair<PartialPath, String[]>> allMatchedNodes = new ArrayList<>();
- findPath(root, nodes, 1, allMatchedNodes, false, true, queryContext);
+ findPath(root, nodes, 1, allMatchedNodes, false, true, queryContext, null);
Stream<Pair<PartialPath, String[]>> sortedStream =
allMatchedNodes.stream()
@@ -1046,7 +1106,7 @@ public class MTree implements Serializable {
offset.set(plan.getOffset());
curOffset.set(-1);
count.set(0);
- findPath(root, nodes, 1, res, offset.get() != 0 || limit.get() != 0, false, null);
+ findPath(root, nodes, 1, res, offset.get() != 0 || limit.get() != 0, false, null, null);
// avoid memory leaks
limit.remove();
offset.remove();
@@ -1072,7 +1132,8 @@ public class MTree implements Serializable {
List<Pair<PartialPath, String[]>> timeseriesSchemaList,
boolean hasLimit,
boolean needLast,
- QueryContext queryContext)
+ QueryContext queryContext,
+ Template upperTemplate)
throws MetadataException {
if (node instanceof MeasurementMNode && nodes.length <= idx) {
if (hasLimit) {
@@ -1082,41 +1143,97 @@ public class MTree implements Serializable {
}
}
- PartialPath nodePath = node.getPartialPath();
- String[] tsRow = new String[7];
- tsRow[0] = ((MeasurementMNode) node).getAlias();
- MeasurementSchema measurementSchema = ((MeasurementMNode) node).getSchema();
- tsRow[1] = getStorageGroupPath(nodePath).getFullPath();
- tsRow[2] = measurementSchema.getType().toString();
- tsRow[3] = measurementSchema.getEncodingType().toString();
- tsRow[4] = measurementSchema.getCompressor().toString();
- tsRow[5] = String.valueOf(((MeasurementMNode) node).getOffset());
- tsRow[6] =
- needLast ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext)) : null;
- Pair<PartialPath, String[]> temp = new Pair<>(nodePath, tsRow);
- timeseriesSchemaList.add(temp);
+ addMeasurementSchema(
+ node,
+ timeseriesSchemaList,
+ needLast,
+ queryContext,
+ ((MeasurementMNode) node).getSchema(),
+ "*");
if (hasLimit) {
count.set(count.get() + 1);
}
}
String nodeReg = MetaUtils.getNodeRegByIdx(idx, nodes);
+ if (node.getDeviceTemplate() != null) {
+ upperTemplate = node.getDeviceTemplate();
+ }
+
if (!nodeReg.contains(PATH_WILDCARD)) {
MNode next = node.getChild(nodeReg);
if (next != null) {
- findPath(next, nodes, idx + 1, timeseriesSchemaList, hasLimit, needLast, queryContext);
+ findPath(
+ next,
+ nodes,
+ idx + 1,
+ timeseriesSchemaList,
+ hasLimit,
+ needLast,
+ queryContext,
+ upperTemplate);
}
} else {
for (MNode child : node.getChildren().values()) {
if (!Pattern.matches(nodeReg.replace("*", ".*"), child.getName())) {
continue;
}
- findPath(child, nodes, idx + 1, timeseriesSchemaList, hasLimit, needLast, queryContext);
+ findPath(
+ child,
+ nodes,
+ idx + 1,
+ timeseriesSchemaList,
+ hasLimit,
+ needLast,
+ queryContext,
+ upperTemplate);
if (hasLimit && count.get().intValue() == limit.get().intValue()) {
return;
}
}
}
+
+ // template part
+ if (!(node instanceof MeasurementMNode) && node.isUseTemplate()) {
+ if (upperTemplate != null) {
+ HashSet<MeasurementSchema> set = new HashSet<>();
+ for (MeasurementSchema schema : upperTemplate.getSchemaMap().values()) {
+ if (set.add(schema)) {
+ addMeasurementSchema(
+ new MeasurementMNode(node, schema.getMeasurementId(), schema, null),
+ timeseriesSchemaList,
+ needLast,
+ queryContext,
+ schema,
+ nodeReg);
+ }
+ }
+ }
+ }
+ }
+
+ private void addMeasurementSchema(
+ MNode node,
+ List<Pair<PartialPath, String[]>> timeseriesSchemaList,
+ boolean needLast,
+ QueryContext queryContext,
+ MeasurementSchema measurementSchema,
+ String reg)
+ throws StorageGroupNotSetException {
+ if (Pattern.matches(reg.replace("*", ".*"), measurementSchema.getMeasurementId())) {
+ PartialPath nodePath = node.getPartialPath();
+ String[] tsRow = new String[7];
+ tsRow[0] = ((MeasurementMNode) node).getAlias();
+ tsRow[1] = getStorageGroupPath(nodePath).getFullPath();
+ tsRow[2] = measurementSchema.getType().toString();
+ tsRow[3] = measurementSchema.getEncodingType().toString();
+ tsRow[4] = measurementSchema.getCompressor().toString();
+ tsRow[5] = String.valueOf(((MeasurementMNode) node).getOffset());
+ tsRow[6] =
+ needLast ? String.valueOf(getLastTimeStamp((MeasurementMNode) node, queryContext)) : null;
+ Pair<PartialPath, String[]> temp = new Pair<>(nodePath, tsRow);
+ timeseriesSchemaList.add(temp);
+ }
}
/**
@@ -1196,7 +1313,7 @@ public class MTree implements Serializable {
* <p>e.g., MTree has [root.sg1.d1.s1, root.sg1.d1.s2, root.sg1.d2.s1] given path = root.sg1.d1
* return [s1, s2]
*
- * @param partial Path
+ * @param path Path
* @return All child nodes' seriesPath(s) of given seriesPath.
*/
Set<String> getChildNodeInNextLevel(PartialPath path) throws MetadataException {
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
index 8fda60d..ecd7d48 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/logfile/MLogWriter.java
@@ -28,6 +28,9 @@ import org.apache.iotdb.db.metadata.mnode.MNode;
import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
@@ -37,6 +40,7 @@ import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetUsingDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
import org.apache.iotdb.db.writelog.io.LogWriter;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
@@ -151,6 +155,18 @@ public class MLogWriter implements AutoCloseable {
putLog(plan);
}
+ public void createDeviceTemplate(CreateTemplatePlan plan) throws IOException {
+ putLog(plan);
+ }
+
+ public void setDeviceTemplate(SetDeviceTemplatePlan plan) throws IOException {
+ putLog(plan);
+ }
+
+ public void autoCreateDeviceMNode(AutoCreateDeviceMNodePlan plan) throws IOException {
+ putLog(plan);
+ }
+
public void serializeMNode(MNode node) throws IOException {
int childSize = 0;
if (node.getChildren() != null) {
@@ -181,6 +197,11 @@ public class MLogWriter implements AutoCloseable {
putLog(plan);
}
+ public void setUsingDeviceTemplate(PartialPath path) throws IOException {
+ SetUsingDeviceTemplatePlan plan = new SetUsingDeviceTemplatePlan(path);
+ putLog(plan);
+ }
+
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
public static void upgradeTxtToBin(
String schemaDir, String oldFileName, String newFileName, boolean isSnapshot)
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
index e5f03ae..d74dd9c 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MNode.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.metadata.mnode;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.rescon.CachedStringPool;
import java.io.IOException;
@@ -30,6 +31,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -67,6 +69,11 @@ public class MNode implements Serializable {
@SuppressWarnings("squid:S3077")
private transient volatile Map<String, MNode> aliasChildren = null;
+ // device template
+ protected Template deviceTemplate = null;
+
+ private volatile boolean useTemplate = false;
+
/** Constructor of MNode. */
public MNode(MNode parent, String name) {
this.parent = parent;
@@ -147,6 +154,14 @@ public class MNode implements Serializable {
}
}
+ public Template getDeviceTemplate() {
+ return deviceTemplate;
+ }
+
+ public void setDeviceTemplate(Template deviceTemplate) {
+ this.deviceTemplate = deviceTemplate;
+ }
+
/** get the child with the name */
public MNode getChild(String name) {
MNode child = null;
@@ -303,4 +318,58 @@ public class MNode implements Serializable {
this.deleteChild(measurement);
this.addChild(newChildNode.getName(), newChildNode);
}
+
+ public void setFullPath(String fullPath) {
+ this.fullPath = fullPath;
+ }
+
+ /**
+ * get upper template of this node, remember we get nearest template alone this node to root
+ *
+ * @return upper template
+ */
+ public Template getUpperTemplate() {
+ MNode cur = this;
+ while (cur != null) {
+ if (cur.getDeviceTemplate() != null) {
+ return cur.deviceTemplate;
+ }
+ cur = cur.parent;
+ }
+
+ return null;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MNode mNode = (MNode) o;
+ if (fullPath == null) {
+ return Objects.equals(getFullPath(), mNode.getFullPath());
+ } else {
+ return Objects.equals(fullPath, mNode.fullPath);
+ }
+ }
+
+ @Override
+ public int hashCode() {
+ if (fullPath == null) {
+ return Objects.hash(getFullPath());
+ } else {
+ return Objects.hash(fullPath);
+ }
+ }
+
+ public boolean isUseTemplate() {
+ return useTemplate;
+ }
+
+ public void setUseTemplate(boolean useTemplate) {
+ this.useTemplate = useTemplate;
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
new file mode 100644
index 0000000..627898d
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/template/Template.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.template;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class Template {
+ private String name;
+
+ private Map<String, MeasurementSchema> schemaMap = new HashMap<>();
+
+ /**
+ * build a template from a createTemplatePlan
+ *
+ * @param plan createTemplatePlan
+ */
+ public Template(CreateTemplatePlan plan) {
+ name = plan.getName();
+
+ // put measurement into a map
+ for (int i = 0; i < plan.getMeasurements().size(); i++) {
+ MeasurementSchema curSchema;
+
+ curSchema =
+ new MeasurementSchema(
+ plan.getMeasurements().get(i).get(0),
+ plan.getDataTypes().get(i).get(0),
+ plan.getEncodings().get(i).get(0),
+ plan.getCompressors().get(i));
+
+ String path = plan.getSchemaNames().get(i);
+ if (schemaMap.containsKey(path)) {
+ throw new IllegalArgumentException(
+ "Duplicate measurement name in create template plan. Name is :" + path);
+ }
+
+ schemaMap.put(path, curSchema);
+ }
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public Map<String, MeasurementSchema> getSchemaMap() {
+ return schemaMap;
+ }
+
+ public void setSchemaMap(Map<String, MeasurementSchema> schemaMap) {
+ this.schemaMap = schemaMap;
+ }
+
+ /**
+ * check whether a timeseries path is compatible with this template
+ *
+ * @param path timeseries path
+ * @return whether we can create this new timeseries (whether it's compatible with this template)
+ */
+ public boolean isCompatible(PartialPath path) {
+ return !schemaMap.containsKey(path.getMeasurement());
+ }
+
+ public List<MeasurementMNode> getMeasurementMNode() {
+ Set<MeasurementSchema> deduplicateSchema = new HashSet<>();
+ List<MeasurementMNode> res = new ArrayList<>();
+
+ for (MeasurementSchema measurementSchema : schemaMap.values()) {
+ if (deduplicateSchema.add(measurementSchema)) {
+ MeasurementMNode measurementMNode = null;
+ measurementMNode =
+ new MeasurementMNode(
+ null, measurementSchema.getMeasurementId(), measurementSchema, null);
+
+ res.add(measurementMNode);
+ }
+ }
+
+ return res;
+ }
+
+ public String getMeasurementNodeName(String measurementName) {
+ return schemaMap.get(measurementName).getMeasurementId();
+ }
+
+ /**
+ * get all path in this template (to support aligned by device query)
+ *
+ * @return a hash map looks like below {vector -> [s1, s2, s3] normal_timeseries -> []}
+ */
+ public HashMap<String, List<String>> getAllPath() {
+ HashMap<String, List<String>> res = new HashMap<>();
+ for (Map.Entry<String, MeasurementSchema> schemaEntry : schemaMap.entrySet()) {
+ res.put(schemaEntry.getKey(), new ArrayList<>());
+ }
+
+ return res;
+ }
+
+ @Override
+ public boolean equals(Object t) {
+ if (this == t) {
+ return true;
+ }
+ if (t == null || getClass() != t.getClass()) {
+ return false;
+ }
+ Template that = (Template) t;
+ return this.name.equals(that.name) && this.schemaMap.equals(that.schemaMap);
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37).append(name).append(schemaMap).toHashCode();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 073e718..9066921 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -56,6 +56,7 @@ import org.apache.iotdb.db.qp.logical.sys.AuthorOperator.AuthorType;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePartitionPlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
@@ -71,6 +72,7 @@ import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
@@ -343,12 +345,36 @@ public class PlanExecutor implements IPlanExecutor {
throw new QueryProcessException(e.getMessage());
}
return true;
+ case CREATE_TEMPLATE:
+ return createDeviceTemplate((CreateTemplatePlan) plan);
+ case SET_DEVICE_TEMPLATE:
+ return setDeviceTemplate((SetDeviceTemplatePlan) plan);
default:
throw new UnsupportedOperationException(
String.format("operation %s is not supported", plan.getOperatorType()));
}
}
+ private boolean createDeviceTemplate(CreateTemplatePlan createTemplatePlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.createDeviceTemplate(createTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
+ private boolean setDeviceTemplate(SetDeviceTemplatePlan setDeviceTemplatePlan)
+ throws QueryProcessException {
+ try {
+ IoTDB.metaManager.setDeviceTemplate(setDeviceTemplatePlan);
+ } catch (MetadataException e) {
+ throw new QueryProcessException(e);
+ }
+ return true;
+ }
+
private boolean operateCreateFunction(CreateFunctionPlan plan) throws UDFRegistrationException {
UDFRegistrationService.getInstance()
.register(plan.getUdfName(), plan.getClassName(), plan.isTemporary(), true);
@@ -1055,7 +1081,7 @@ public class PlanExecutor implements IPlanExecutor {
List<ChunkGroupMetadata> chunkGroupMetadataList,
Map<Path, MeasurementSchema> knownSchemas,
int sgLevel)
- throws QueryProcessException, MetadataException {
+ throws QueryProcessException, MetadataException, IOException {
if (chunkGroupMetadataList.isEmpty()) {
return;
}
@@ -1064,7 +1090,9 @@ public class PlanExecutor implements IPlanExecutor {
for (ChunkGroupMetadata chunkGroupMetadata : chunkGroupMetadataList) {
String device = chunkGroupMetadata.getDevice();
MNode node =
- IoTDB.metaManager.getDeviceNodeWithAutoCreate(new PartialPath(device), true, sgLevel);
+ IoTDB.metaManager.getDeviceNodeWithAutoCreate(
+ new PartialPath(device), true, true, sgLevel)
+ .left;
for (ChunkMetadata chunkMetadata : chunkGroupMetadata.getChunkMetadataList()) {
PartialPath series =
new PartialPath(
@@ -1159,7 +1187,11 @@ public class PlanExecutor implements IPlanExecutor {
}
protected MNode getSeriesSchemas(InsertPlan insertPlan) throws MetadataException {
- return IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan);
+ try {
+ return IoTDB.metaManager.getSeriesSchemasAndReadLockDevice(insertPlan);
+ } catch (IOException e) {
+ throw new MetadataException(e);
+ }
}
private void checkFailedMeasurments(InsertPlan plan)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
index cfa981e..2171b07 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/Operator.java
@@ -124,6 +124,9 @@ public abstract class Operator {
GROUP_BY_FILL,
ALTER_TIMESERIES,
FLUSH,
+ CREATE_TEMPLATE,
+ SET_DEVICE_TEMPLATE,
+ SET_USING_DEVICE_TEMPLATE,
MERGE,
FULL_MERGE,
CLEAR_CACHE,
@@ -145,6 +148,7 @@ public abstract class Operator {
MNODE,
MEASUREMENT_MNODE,
STORAGE_GROUP_MNODE,
+ AUTO_CREATE_DEVICE_MNODE,
BATCH_INSERT_ONE_DEVICE,
MULTI_BATCH_INSERT,
BATCH_INSERT_ROWS,
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
index 16719c4..fd73dba 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/PhysicalPlan.java
@@ -23,13 +23,16 @@ import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowsPlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
import org.apache.iotdb.db.qp.physical.sys.ChangeTagOffsetPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateIndexPlan;
@@ -45,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.sys.MNodePlan;
import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetUsingDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan;
@@ -226,151 +230,128 @@ public abstract class PhysicalPlan {
switch (type) {
case INSERT:
plan = new InsertRowPlan();
- plan.deserialize(buffer);
break;
case BATCHINSERT:
plan = new InsertTabletPlan();
- plan.deserialize(buffer);
break;
case MULTI_BATCH_INSERT:
plan = new InsertMultiTabletPlan();
- plan.deserialize(buffer);
break;
case DELETE:
plan = new DeletePlan();
- plan.deserialize(buffer);
break;
case SET_STORAGE_GROUP:
plan = new SetStorageGroupPlan();
- plan.deserialize(buffer);
break;
case CREATE_TIMESERIES:
plan = new CreateTimeSeriesPlan();
- plan.deserialize(buffer);
break;
case DELETE_TIMESERIES:
plan = new DeleteTimeSeriesPlan();
- plan.deserialize(buffer);
break;
case CREATE_INDEX:
plan = new CreateIndexPlan();
- plan.deserialize(buffer);
break;
case DROP_INDEX:
plan = new DropIndexPlan();
- plan.deserialize(buffer);
break;
case TTL:
plan = new SetTTLPlan();
- plan.deserialize(buffer);
break;
case GRANT_WATERMARK_EMBEDDING:
plan = new DataAuthPlan(OperatorType.GRANT_WATERMARK_EMBEDDING);
- plan.deserialize(buffer);
break;
case REVOKE_WATERMARK_EMBEDDING:
plan = new DataAuthPlan(OperatorType.REVOKE_WATERMARK_EMBEDDING);
- plan.deserialize(buffer);
break;
case CREATE_ROLE:
plan = new AuthorPlan(OperatorType.CREATE_ROLE);
- plan.deserialize(buffer);
break;
case DELETE_ROLE:
plan = new AuthorPlan(OperatorType.DELETE_ROLE);
- plan.deserialize(buffer);
break;
case CREATE_USER:
plan = new AuthorPlan(OperatorType.CREATE_USER);
- plan.deserialize(buffer);
break;
case REVOKE_USER_ROLE:
plan = new AuthorPlan(OperatorType.REVOKE_USER_ROLE);
- plan.deserialize(buffer);
break;
case REVOKE_ROLE_PRIVILEGE:
plan = new AuthorPlan(OperatorType.REVOKE_ROLE_PRIVILEGE);
- plan.deserialize(buffer);
break;
case REVOKE_USER_PRIVILEGE:
plan = new AuthorPlan(OperatorType.REVOKE_USER_PRIVILEGE);
- plan.deserialize(buffer);
break;
case GRANT_ROLE_PRIVILEGE:
plan = new AuthorPlan(OperatorType.GRANT_ROLE_PRIVILEGE);
- plan.deserialize(buffer);
break;
case GRANT_USER_PRIVILEGE:
plan = new AuthorPlan(OperatorType.GRANT_USER_PRIVILEGE);
- plan.deserialize(buffer);
break;
case GRANT_USER_ROLE:
plan = new AuthorPlan(OperatorType.GRANT_USER_ROLE);
- plan.deserialize(buffer);
break;
case MODIFY_PASSWORD:
plan = new AuthorPlan(OperatorType.MODIFY_PASSWORD);
- plan.deserialize(buffer);
break;
case DELETE_USER:
plan = new AuthorPlan(OperatorType.DELETE_USER);
- plan.deserialize(buffer);
break;
case DELETE_STORAGE_GROUP:
plan = new DeleteStorageGroupPlan();
- plan.deserialize(buffer);
break;
case SHOW_TIMESERIES:
plan = new ShowTimeSeriesPlan();
- plan.deserialize(buffer);
break;
case SHOW_DEVICES:
plan = new ShowDevicesPlan();
- plan.deserialize(buffer);
break;
case LOAD_CONFIGURATION:
plan = new LoadConfigurationPlan();
- plan.deserialize(buffer);
break;
case ALTER_TIMESERIES:
plan = new AlterTimeSeriesPlan();
- plan.deserialize(buffer);
break;
case FLUSH:
plan = new FlushPlan();
- plan.deserialize(buffer);
break;
case CREATE_MULTI_TIMESERIES:
plan = new CreateMultiTimeSeriesPlan();
- plan.deserialize(buffer);
break;
case CHANGE_ALIAS:
plan = new ChangeAliasPlan();
- plan.deserialize(buffer);
break;
case CHANGE_TAG_OFFSET:
plan = new ChangeTagOffsetPlan();
- plan.deserialize(buffer);
break;
case MNODE:
plan = new MNodePlan();
- plan.deserialize(buffer);
break;
case MEASUREMENT_MNODE:
plan = new MeasurementMNodePlan();
- plan.deserialize(buffer);
break;
case STORAGE_GROUP_MNODE:
plan = new StorageGroupMNodePlan();
- plan.deserialize(buffer);
break;
case BATCH_INSERT_ROWS:
plan = new InsertRowsPlan();
- plan.deserialize(buffer);
+ break;
+ case CREATE_TEMPLATE:
+ plan = new CreateTemplatePlan();
+ break;
+ case SET_DEVICE_TEMPLATE:
+ plan = new SetDeviceTemplatePlan();
+ break;
+ case SET_USING_DEVICE_TEMPLATE:
+ plan = new SetUsingDeviceTemplatePlan();
+ break;
+ case AUTO_CREATE_DEVICE_MNODE:
+ plan = new AutoCreateDeviceMNodePlan();
break;
default:
throw new IOException("unrecognized log type " + type);
}
+ plan.deserialize(buffer);
return plan;
}
}
@@ -412,7 +393,11 @@ public abstract class PhysicalPlan {
BATCH_INSERT_ONE_DEVICE,
MULTI_BATCH_INSERT,
BATCH_INSERT_ROWS,
- SHOW_DEVICES
+ SHOW_DEVICES,
+ CREATE_TEMPLATE,
+ SET_DEVICE_TEMPLATE,
+ SET_USING_DEVICE_TEMPLATE,
+ AUTO_CREATE_DEVICE_MNODE,
}
public long getIndex() {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java
new file mode 100644
index 0000000..82c45b4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/CreateTemplatePlan.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+public class CreateTemplatePlan extends PhysicalPlan {
+
+ String name;
+ List<String> schemaNames;
+ List<List<String>> measurements;
+ List<List<TSDataType>> dataTypes;
+ List<List<TSEncoding>> encodings;
+ List<CompressionType> compressors;
+
+ public List<String> getSchemaNames() {
+ return schemaNames;
+ }
+
+ public void setSchemaNames(List<String> schemaNames) {
+ this.schemaNames = schemaNames;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public List<List<String>> getMeasurements() {
+ return measurements;
+ }
+
+ public void setMeasurements(List<List<String>> measurements) {
+ this.measurements = measurements;
+ }
+
+ public List<List<TSDataType>> getDataTypes() {
+ return dataTypes;
+ }
+
+ public void setDataTypes(List<List<TSDataType>> dataTypes) {
+ this.dataTypes = dataTypes;
+ }
+
+ public List<List<TSEncoding>> getEncodings() {
+ return encodings;
+ }
+
+ public void setEncodings(List<List<TSEncoding>> encodings) {
+ this.encodings = encodings;
+ }
+
+ public List<CompressionType> getCompressors() {
+ return compressors;
+ }
+
+ public void setCompressors(List<CompressionType> compressors) {
+ this.compressors = compressors;
+ }
+
+ public CreateTemplatePlan() {
+ super(false, OperatorType.CREATE_TEMPLATE);
+ }
+
+ public CreateTemplatePlan(
+ String name,
+ List<String> schemaNames,
+ List<List<String>> measurements,
+ List<List<TSDataType>> dataTypes,
+ List<List<TSEncoding>> encodings,
+ List<CompressionType> compressors) {
+ super(false, OperatorType.CREATE_TEMPLATE);
+ this.name = name;
+ this.schemaNames = schemaNames;
+ this.measurements = measurements;
+ this.dataTypes = dataTypes;
+ this.encodings = encodings;
+ this.compressors = compressors;
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.CREATE_TEMPLATE.ordinal());
+
+ ReadWriteIOUtils.write(name, buffer);
+
+ // schema names
+ ReadWriteIOUtils.write(schemaNames.size(), buffer);
+ for (String schemaName : schemaNames) {
+ ReadWriteIOUtils.write(schemaName, buffer);
+ }
+
+ // measurements
+ ReadWriteIOUtils.write(measurements.size(), buffer);
+ for (List<String> measurementList : measurements) {
+ ReadWriteIOUtils.write(measurementList.size(), buffer);
+ for (String measurement : measurementList) {
+ ReadWriteIOUtils.write(measurement, buffer);
+ }
+ }
+
+ // datatype
+ ReadWriteIOUtils.write(dataTypes.size(), buffer);
+ for (List<TSDataType> dataTypesList : dataTypes) {
+ ReadWriteIOUtils.write(dataTypesList.size(), buffer);
+ for (TSDataType dataType : dataTypesList) {
+ ReadWriteIOUtils.write(dataType.ordinal(), buffer);
+ }
+ }
+
+ // encoding
+ ReadWriteIOUtils.write(encodings.size(), buffer);
+ for (List<TSEncoding> encodingList : encodings) {
+ ReadWriteIOUtils.write(encodingList.size(), buffer);
+ for (TSEncoding encoding : encodingList) {
+ ReadWriteIOUtils.write(encoding.ordinal(), buffer);
+ }
+ }
+
+ // compressor
+ ReadWriteIOUtils.write(compressors.size(), buffer);
+ for (CompressionType compressionType : compressors) {
+ ReadWriteIOUtils.write(compressionType.ordinal(), buffer);
+ }
+
+ buffer.putLong(index);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ name = ReadWriteIOUtils.readString(buffer);
+
+ // schema names
+ int size = ReadWriteIOUtils.readInt(buffer);
+ schemaNames = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ schemaNames.add(ReadWriteIOUtils.readString(buffer));
+ }
+
+ // measurements
+ size = ReadWriteIOUtils.readInt(buffer);
+ measurements = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ int listSize = ReadWriteIOUtils.readInt(buffer);
+ List<String> measurementsList = new ArrayList<>(listSize);
+ for (int j = 0; j < listSize; j++) {
+ measurementsList.add(ReadWriteIOUtils.readString(buffer));
+ }
+ measurements.add(measurementsList);
+ }
+
+ // datatypes
+ size = ReadWriteIOUtils.readInt(buffer);
+ dataTypes = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ int listSize = ReadWriteIOUtils.readInt(buffer);
+ List<TSDataType> dataTypesList = new ArrayList<>(listSize);
+ for (int j = 0; j < listSize; j++) {
+ dataTypesList.add(TSDataType.values()[ReadWriteIOUtils.readInt(buffer)]);
+ }
+ dataTypes.add(dataTypesList);
+ }
+
+ // encodings
+ size = ReadWriteIOUtils.readInt(buffer);
+ encodings = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ int listSize = ReadWriteIOUtils.readInt(buffer);
+ List<TSEncoding> encodingsList = new ArrayList<>(listSize);
+ for (int j = 0; j < listSize; j++) {
+ encodingsList.add(TSEncoding.values()[ReadWriteIOUtils.readInt(buffer)]);
+ }
+ encodings.add(encodingsList);
+ }
+
+ // compressor
+ size = ReadWriteIOUtils.readInt(buffer);
+ compressors = new ArrayList<>(size);
+ for (int i = 0; i < size; i++) {
+ compressors.add(CompressionType.values()[ReadWriteIOUtils.readInt(buffer)]);
+ }
+
+ this.index = buffer.getLong();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeByte((byte) PhysicalPlanType.CREATE_TEMPLATE.ordinal());
+
+ ReadWriteIOUtils.write(name, stream);
+
+ // schema names
+ ReadWriteIOUtils.write(schemaNames.size(), stream);
+ for (String schemaName : schemaNames) {
+ ReadWriteIOUtils.write(schemaName, stream);
+ }
+
+ // measurements
+ ReadWriteIOUtils.write(measurements.size(), stream);
+ for (List<String> measurementList : measurements) {
+ ReadWriteIOUtils.write(measurementList.size(), stream);
+ for (String measurement : measurementList) {
+ ReadWriteIOUtils.write(measurement, stream);
+ }
+ }
+
+ // datatype
+ ReadWriteIOUtils.write(dataTypes.size(), stream);
+ for (List<TSDataType> dataTypesList : dataTypes) {
+ ReadWriteIOUtils.write(dataTypesList.size(), stream);
+ for (TSDataType dataType : dataTypesList) {
+ ReadWriteIOUtils.write(dataType.ordinal(), stream);
+ }
+ }
+
+ // encoding
+ ReadWriteIOUtils.write(encodings.size(), stream);
+ for (List<TSEncoding> encodingList : encodings) {
+ ReadWriteIOUtils.write(encodingList.size(), stream);
+ for (TSEncoding encoding : encodingList) {
+ ReadWriteIOUtils.write(encoding.ordinal(), stream);
+ }
+ }
+
+ // compressor
+ ReadWriteIOUtils.write(compressors.size(), stream);
+ for (CompressionType compressionType : compressors) {
+ ReadWriteIOUtils.write(compressionType.ordinal(), stream);
+ }
+
+ stream.writeLong(index);
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return null;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SetDeviceTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SetDeviceTemplatePlan.java
new file mode 100644
index 0000000..b76703a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/SetDeviceTemplatePlan.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.crud;
+
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SetDeviceTemplatePlan extends PhysicalPlan {
+ String templateName;
+ String prefixPath;
+
+ public SetDeviceTemplatePlan() {
+ super(false, OperatorType.SET_DEVICE_TEMPLATE);
+ }
+
+ public SetDeviceTemplatePlan(String templateName, String prefixPath) {
+ super(false, OperatorType.SET_DEVICE_TEMPLATE);
+ this.templateName = templateName;
+ this.prefixPath = prefixPath;
+ }
+
+ public String getTemplateName() {
+ return templateName;
+ }
+
+ public void setTemplateName(String templateName) {
+ this.templateName = templateName;
+ }
+
+ public String getPrefixPath() {
+ return prefixPath;
+ }
+
+ public void setPrefixPath(String prefixPath) {
+ this.prefixPath = prefixPath;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return null;
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.SET_DEVICE_TEMPLATE.ordinal());
+
+ ReadWriteIOUtils.write(templateName, buffer);
+ ReadWriteIOUtils.write(prefixPath, buffer);
+
+ buffer.putLong(index);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ templateName = ReadWriteIOUtils.readString(buffer);
+ prefixPath = ReadWriteIOUtils.readString(buffer);
+
+ this.index = buffer.getLong();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeByte((byte) PhysicalPlanType.SET_DEVICE_TEMPLATE.ordinal());
+
+ ReadWriteIOUtils.write(templateName, stream);
+ ReadWriteIOUtils.write(prefixPath, stream);
+
+ stream.writeLong(index);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java
new file mode 100644
index 0000000..ef7412e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/AutoCreateDeviceMNodePlan.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class AutoCreateDeviceMNodePlan extends PhysicalPlan {
+
+ private static final Logger logger = LoggerFactory.getLogger(AutoCreateDeviceMNodePlan.class);
+ protected PartialPath path;
+
+ public AutoCreateDeviceMNodePlan() {
+ super(false, Operator.OperatorType.AUTO_CREATE_DEVICE_MNODE);
+ }
+
+ public AutoCreateDeviceMNodePlan(PartialPath path) {
+ super(false, Operator.OperatorType.AUTO_CREATE_DEVICE_MNODE);
+ this.path = path;
+ }
+
+ public AutoCreateDeviceMNodePlan(boolean isQuery, Operator.OperatorType operatorType) {
+ super(isQuery, operatorType);
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.singletonList(path);
+ }
+
+ public PartialPath getPath() {
+ return path;
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.AUTO_CREATE_DEVICE_MNODE.ordinal());
+ putString(buffer, path.getFullPath());
+ buffer.putLong(index);
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.write((byte) PhysicalPlanType.AUTO_CREATE_DEVICE_MNODE.ordinal());
+ putString(stream, path.getFullPath());
+ stream.writeLong(index);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ String pathString = readString(buffer);
+ try {
+ path = new PartialPath(pathString);
+ } catch (IllegalPathException e) {
+ logger.error("Failed to deserialize device {} from buffer", pathString);
+ }
+ index = buffer.getLong();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetUsingDeviceTemplatePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetUsingDeviceTemplatePlan.java
new file mode 100644
index 0000000..6d20145
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/sys/SetUsingDeviceTemplatePlan.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.qp.physical.sys;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class SetUsingDeviceTemplatePlan extends PhysicalPlan {
+
+ private static final Logger logger = LoggerFactory.getLogger(SetUsingDeviceTemplatePlan.class);
+ PartialPath prefixPath;
+
+ public SetUsingDeviceTemplatePlan() {
+ super(false, OperatorType.SET_USING_DEVICE_TEMPLATE);
+ }
+
+ public SetUsingDeviceTemplatePlan(PartialPath prefixPath) {
+ super(false, OperatorType.SET_USING_DEVICE_TEMPLATE);
+ this.prefixPath = prefixPath;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return null;
+ }
+
+ public PartialPath getPrefixPath() {
+ return prefixPath;
+ }
+
+ @Override
+ public void serialize(ByteBuffer buffer) {
+ buffer.put((byte) PhysicalPlanType.SET_USING_DEVICE_TEMPLATE.ordinal());
+ ReadWriteIOUtils.write(prefixPath.getFullPath(), buffer);
+ buffer.putLong(index);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer buffer) {
+ String pathString = readString(buffer);
+ try {
+ prefixPath = new PartialPath(pathString);
+ } catch (IllegalPathException e) {
+ logger.error("Failed to deserialize device {} from buffer", pathString);
+ }
+ index = buffer.getLong();
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+ stream.writeByte((byte) PhysicalPlanType.SET_USING_DEVICE_TEMPLATE.ordinal());
+ ReadWriteIOUtils.write(prefixPath.getFullPath(), stream);
+ stream.writeLong(index);
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 500a845..ab4f79e 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.template.Template;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
@@ -45,6 +46,7 @@ import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -220,7 +222,17 @@ public class AlignByDeviceDataSet extends QueryDataSet {
protected Set<String> getDeviceMeasurements(PartialPath device) throws IOException {
try {
MNode deviceNode = IoTDB.metaManager.getNodeByPath(device);
- return deviceNode.getChildren().keySet();
+ Set<String> res = new HashSet<>(deviceNode.getChildren().keySet());
+ for (MNode mnode : deviceNode.getChildren().values()) {
+ res.addAll(mnode.getChildren().keySet());
+ }
+
+ Template template = deviceNode.getUpperTemplate();
+ if (template != null) {
+ res.addAll(template.getSchemaMap().keySet());
+ }
+
+ return res;
} catch (MetadataException e) {
throw new IOException("Cannot get node from " + device, e);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 73c29eb..67f0467 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -48,6 +48,7 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
@@ -58,6 +59,7 @@ import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.UDFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.qp.physical.sys.AuthorPlan;
@@ -90,6 +92,7 @@ import org.apache.iotdb.service.rpc.thrift.TSCancelOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseOperationReq;
import org.apache.iotdb.service.rpc.thrift.TSCloseSessionReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateMultiTimeseriesReq;
+import org.apache.iotdb.service.rpc.thrift.TSCreateSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSCreateTimeseriesReq;
import org.apache.iotdb.service.rpc.thrift.TSDeleteDataReq;
import org.apache.iotdb.service.rpc.thrift.TSExecuteBatchStatementReq;
@@ -114,6 +117,7 @@ import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.iotdb.service.rpc.thrift.TSQueryDataSet;
import org.apache.iotdb.service.rpc.thrift.TSQueryNonAlignDataSet;
import org.apache.iotdb.service.rpc.thrift.TSRawDataQueryReq;
+import org.apache.iotdb.service.rpc.thrift.TSSetSchemaTemplateReq;
import org.apache.iotdb.service.rpc.thrift.TSSetTimeZoneReq;
import org.apache.iotdb.service.rpc.thrift.TSStatus;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
@@ -1885,6 +1889,85 @@ public class TSServiceImpl implements TSIService.Iface {
return statementId;
}
+ @Override
+ public TSStatus createSchemaTemplate(TSCreateSchemaTemplateReq req) throws TException {
+ try {
+ if (!checkLogin(req.getSessionId())) {
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} create device template {}.{}.{}.{}.{}.{}",
+ currSessionId.get(),
+ req.getName(),
+ req.getSchemaNames(),
+ req.getMeasurements(),
+ req.getDataTypes(),
+ req.getEncodings(),
+ req.getCompressors());
+ }
+
+ List<List<TSDataType>> dataTypes = new ArrayList<>();
+ for (List<Integer> list : req.getDataTypes()) {
+ List<TSDataType> dataTypesList = new ArrayList<>();
+ for (int dataType : list) {
+ dataTypesList.add(TSDataType.values()[dataType]);
+ }
+ dataTypes.add(dataTypesList);
+ }
+
+ List<List<TSEncoding>> encodings = new ArrayList<>();
+ for (List<Integer> list : req.getEncodings()) {
+ List<TSEncoding> encodingsList = new ArrayList<>();
+ for (int encoding : list) {
+ encodingsList.add(TSEncoding.values()[encoding]);
+ }
+ encodings.add(encodingsList);
+ }
+
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ for (int compressType : req.getCompressors()) {
+ compressionTypes.add(CompressionType.values()[compressType]);
+ }
+
+ CreateTemplatePlan plan =
+ new CreateTemplatePlan(
+ req.getName(),
+ req.getSchemaNames(),
+ req.getMeasurements(),
+ dataTypes,
+ encodings,
+ compressionTypes);
+
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ return status != null ? status : executeNonQueryPlan(plan);
+ } catch (Exception e) {
+ return onNPEOrUnexpectedException(
+ e, "creating aligned timeseries", TSStatusCode.EXECUTE_STATEMENT_ERROR);
+ }
+ }
+
+ @Override
+ public TSStatus setSchemaTemplate(TSSetSchemaTemplateReq req) throws TException {
+ if (!checkLogin(req.getSessionId())) {
+ return RpcUtils.getStatus(TSStatusCode.NOT_LOGIN_ERROR);
+ }
+
+ if (AUDIT_LOGGER.isDebugEnabled()) {
+ AUDIT_LOGGER.debug(
+ "Session-{} set device template {}.{}",
+ currSessionId.get(),
+ req.getTemplateName(),
+ req.getPrefixPath());
+ }
+
+ SetDeviceTemplatePlan plan = new SetDeviceTemplatePlan(req.templateName, req.prefixPath);
+
+ TSStatus status = checkAuthority(plan, req.getSessionId());
+ return status != null ? status : executeNonQueryPlan(plan);
+ }
+
private TSStatus checkAuthority(PhysicalPlan plan, long sessionId) {
List<PartialPath> paths = plan.getPaths();
try {
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
index 8cb27ac..20b4cf7 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessorTest.java
@@ -162,7 +162,7 @@ public class StorageGroupProcessorTest {
}
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
@@ -191,7 +191,7 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
}
@@ -252,7 +252,7 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(1, queryDataSource.getUnseqResources().size());
@@ -282,7 +282,7 @@ public class StorageGroupProcessorTest {
processor.syncCloseAllWorkingTsFileProcessors();
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(10, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -322,7 +322,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
@@ -404,7 +404,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -486,7 +486,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -568,7 +568,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(2, queryDataSource.getSeqResources().size());
Assert.assertEquals(0, queryDataSource.getUnseqResources().size());
@@ -606,7 +606,7 @@ public class StorageGroupProcessorTest {
}
QueryDataSource queryDataSource =
- processor.query(new PartialPath(deviceId), measurementId, context, null, null);
+ processor.query(new PartialPath(deviceId, measurementId), context, null, null);
Assert.assertEquals(10, queryDataSource.getSeqResources().size());
for (TsFileResource resource : queryDataSource.getSeqResources()) {
Assert.assertTrue(resource.isClosed());
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 1943ef3..1f096ee 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -223,7 +223,7 @@ public class TTLTest {
// files before ttl
QueryDataSource dataSource =
storageGroupProcessor.query(
- new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
List<TsFileResource> seqResource = dataSource.getSeqResources();
List<TsFileResource> unseqResource = dataSource.getUnseqResources();
assertEquals(4, seqResource.size());
@@ -234,7 +234,7 @@ public class TTLTest {
// files after ttl
dataSource =
storageGroupProcessor.query(
- new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertTrue(seqResource.size() < 4);
@@ -269,7 +269,7 @@ public class TTLTest {
storageGroupProcessor.setDataTTL(0);
dataSource =
storageGroupProcessor.query(
- new PartialPath(sg1), s1, EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
+ new PartialPath(sg1, s1), EnvironmentUtils.TEST_QUERY_CONTEXT, null, null);
seqResource = dataSource.getSeqResources();
unseqResource = dataSource.getUnseqResources();
assertEquals(0, seqResource.size());
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
index 07c72b1..3135881 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerBasicTest.java
@@ -22,12 +22,21 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.mnode.MNode;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
@@ -45,6 +54,7 @@ import java.util.stream.Collectors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -739,6 +749,298 @@ public class MManagerBasicTest {
}
@Test
+ public void testTemplate() throws MetadataException {
+ CreateTemplatePlan plan = getCreateTemplatePlan();
+
+ MManager manager = IoTDB.metaManager;
+ manager.createDeviceTemplate(plan);
+
+ // set device template
+ SetDeviceTemplatePlan setDeviceTemplatePlan =
+ new SetDeviceTemplatePlan("template1", "root.sg1.d1");
+
+ manager.setDeviceTemplate(setDeviceTemplatePlan);
+
+ MNode node = manager.getDeviceNode(new PartialPath("root.sg1.d1"));
+ node.setUseTemplate(true);
+
+ MeasurementSchema s11 =
+ new MeasurementSchema("s11", TSDataType.INT64, TSEncoding.RLE, CompressionType.SNAPPY);
+ assertNotNull(node.getDeviceTemplate());
+ assertEquals(node.getDeviceTemplate().getSchemaMap().get("s11"), s11);
+
+ Set<MeasurementSchema> allSchema =
+ new HashSet<>(node.getDeviceTemplate().getSchemaMap().values());
+ for (MeasurementSchema schema :
+ manager.getAllMeasurementByDevicePath(new PartialPath("root.sg1.d1"))) {
+ allSchema.remove(schema);
+ }
+
+ assertTrue(allSchema.isEmpty());
+ }
+
+ private CreateTemplatePlan getCreateTemplatePlan() {
+ List<List<String>> measurementList = new ArrayList<>();
+ measurementList.add(Collections.singletonList("s11"));
+
+ List<List<TSDataType>> dataTypeList = new ArrayList<>();
+ dataTypeList.add(Collections.singletonList(TSDataType.INT64));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ encodingList.add(Collections.singletonList(TSEncoding.RLE));
+
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ compressionTypes.add(CompressionType.SNAPPY);
+
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s11");
+
+ return new CreateTemplatePlan(
+ "template1", schemaNames, measurementList, dataTypeList, encodingList, compressionTypes);
+ }
+
+ @Test
+ public void testTemplateCompatibility() throws MetadataException {
+ List<List<String>> measurementList = new ArrayList<>();
+ measurementList.add(Collections.singletonList("s11"));
+ List<String> measurements = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ measurements.add("s" + i);
+ }
+ measurementList.add(measurements);
+
+ List<List<TSDataType>> dataTypeList = new ArrayList<>();
+ dataTypeList.add(Collections.singletonList(TSDataType.INT64));
+ List<TSDataType> dataTypes = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ dataTypes.add(TSDataType.INT64);
+ }
+ dataTypeList.add(dataTypes);
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ encodingList.add(Collections.singletonList(TSEncoding.RLE));
+ List<TSEncoding> encodings = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ encodings.add(TSEncoding.RLE);
+ }
+ encodingList.add(encodings);
+
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ for (int i = 0; i < 11; i++) {
+ compressionTypes.add(CompressionType.SNAPPY);
+ }
+
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s11");
+ schemaNames.add("test_vector");
+
+ CreateTemplatePlan plan1 =
+ new CreateTemplatePlan(
+ "template1",
+ new ArrayList<>(schemaNames),
+ new ArrayList<>(measurementList),
+ new ArrayList<>(dataTypeList),
+ new ArrayList<>(encodingList),
+ new ArrayList<>(compressionTypes));
+
+ measurementList.add(Collections.singletonList("s12"));
+ schemaNames.add("s12");
+ dataTypeList.add(Collections.singletonList(TSDataType.INT64));
+ encodingList.add(Collections.singletonList(TSEncoding.RLE));
+ compressionTypes.add(CompressionType.SNAPPY);
+
+ CreateTemplatePlan plan2 =
+ new CreateTemplatePlan(
+ "template2",
+ new ArrayList<>(schemaNames),
+ new ArrayList<>(measurementList),
+ new ArrayList<>(dataTypeList),
+ new ArrayList<>(encodingList),
+ new ArrayList<>(compressionTypes));
+
+ MManager manager = IoTDB.metaManager;
+
+ assertTrue(manager.isTemplateCompatible(new Template(plan1), new Template(plan2)));
+ assertFalse(manager.isTemplateCompatible(new Template(plan2), new Template(plan1)));
+
+ System.out.println(measurementList);
+ measurementList.get(1).add("s13");
+ dataTypeList.get(1).add(TSDataType.INT64);
+ encodingList.get(1).add(TSEncoding.RLE);
+
+ CreateTemplatePlan plan3 =
+ new CreateTemplatePlan(
+ "template3",
+ new ArrayList<>(schemaNames),
+ new ArrayList<>(measurementList),
+ new ArrayList<>(dataTypeList),
+ new ArrayList<>(encodingList),
+ new ArrayList<>(compressionTypes));
+
+ assertTrue(manager.isTemplateCompatible(new Template(plan1), new Template(plan3)));
+
+ List<String> vectorList = new ArrayList<>(measurementList.get(1));
+ vectorList.remove(0);
+ List<TSDataType> vectorDataTypesList = new ArrayList<>(dataTypeList.get(1));
+ vectorDataTypesList.remove(0);
+ List<TSEncoding> vectorEncodingsList = new ArrayList<>(encodingList.get(1));
+ vectorEncodingsList.remove(0);
+
+ measurementList.set(1, vectorList);
+ dataTypeList.set(1, vectorDataTypesList);
+ encodingList.set(1, vectorEncodingsList);
+
+ CreateTemplatePlan plan4 =
+ new CreateTemplatePlan(
+ "template4",
+ new ArrayList<>(schemaNames),
+ new ArrayList<>(measurementList),
+ new ArrayList<>(dataTypeList),
+ new ArrayList<>(encodingList),
+ new ArrayList<>(compressionTypes));
+
+ assertFalse(manager.isTemplateCompatible(new Template(plan1), new Template(plan4)));
+
+ // test manager
+ manager.createDeviceTemplate(plan1);
+ manager.createDeviceTemplate(plan2);
+ manager.createDeviceTemplate(plan4);
+
+ manager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.sg1.d1"));
+ try {
+ manager.setDeviceTemplate(new SetDeviceTemplatePlan("template4", "root.sg1.d1.d2"));
+ fail("These two templates are incompatible");
+ } catch (MetadataException e) {
+ assertEquals("Incompatible template", e.getMessage());
+ }
+
+ manager.setDeviceTemplate(new SetDeviceTemplatePlan("template2", "root.sg1.d1.d2"));
+ }
+
+ @Test
+ public void testTemplateAndTimeSeriesCompatibility() throws MetadataException {
+ CreateTemplatePlan plan = getCreateTemplatePlan();
+ MManager manager = IoTDB.metaManager;
+ manager.createDeviceTemplate(plan);
+
+ // set device template
+ SetDeviceTemplatePlan setDeviceTemplatePlan =
+ new SetDeviceTemplatePlan("template1", "root.sg1.d1");
+
+ manager.setDeviceTemplate(setDeviceTemplatePlan);
+
+ CreateTimeSeriesPlan createTimeSeriesPlan =
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.sg1.d1.s20"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.GZIP,
+ null,
+ null,
+ null,
+ null);
+
+ manager.createTimeseries(createTimeSeriesPlan);
+
+ CreateTimeSeriesPlan createTimeSeriesPlan2 =
+ new CreateTimeSeriesPlan(
+ new PartialPath("root.sg1.d1.s11"),
+ TSDataType.INT32,
+ TSEncoding.PLAIN,
+ CompressionType.GZIP,
+ null,
+ null,
+ null,
+ null);
+
+ try {
+ manager.createTimeseries(createTimeSeriesPlan2);
+ fail();
+ } catch (Exception e) {
+ assertEquals(
+ "Path [root.sg1.d1.s11 ( which is incompatible with template )] already exist",
+ e.getMessage());
+ }
+ }
+
+ @Test
+ public void testShowTimeseriesWithTemplate() {
+ List<List<String>> measurementList = new ArrayList<>();
+ measurementList.add(Collections.singletonList("s0"));
+ measurementList.add(Collections.singletonList("s1"));
+
+ List<List<TSDataType>> dataTypeList = new ArrayList<>();
+ dataTypeList.add(Collections.singletonList(TSDataType.INT32));
+ dataTypeList.add(Collections.singletonList(TSDataType.FLOAT));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ encodingList.add(Collections.singletonList(TSEncoding.RLE));
+ encodingList.add(Collections.singletonList(TSEncoding.RLE));
+
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ for (int i = 0; i < 2; i++) {
+ compressionTypes.add(compressionType);
+ }
+
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s0");
+ schemaNames.add("s1");
+
+ CreateTemplatePlan plan =
+ new CreateTemplatePlan(
+ "template1",
+ schemaNames,
+ measurementList,
+ dataTypeList,
+ encodingList,
+ compressionTypes);
+ MManager manager = IoTDB.metaManager;
+ try {
+ manager.createDeviceTemplate(plan);
+
+ // set device template
+ SetDeviceTemplatePlan setDeviceTemplatePlan =
+ new SetDeviceTemplatePlan("template1", "root.laptop.d1");
+ manager.setDeviceTemplate(setDeviceTemplatePlan);
+ manager.getDeviceNode(new PartialPath("root.laptop.d1")).setUseTemplate(true);
+
+ // show timeseries root.laptop.d1.s0
+ ShowTimeSeriesPlan showTimeSeriesPlan =
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.laptop.d1.s0"), false, null, null, 0, 0, false);
+ List<ShowTimeSeriesResult> result =
+ manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ assertEquals(1, result.size());
+ assertEquals("root.laptop.d1.s0", result.get(0).getName());
+
+ // show timeseries root.laptop.d1.vector.s1
+ showTimeSeriesPlan =
+ new ShowTimeSeriesPlan(
+ new PartialPath("root.laptop.d1.s1"), false, null, null, 0, 0, false);
+ result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ assertEquals(1, result.size());
+ assertEquals("root.laptop.d1.s1", result.get(0).getName());
+
+ showTimeSeriesPlan =
+ new ShowTimeSeriesPlan(new PartialPath("root"), false, null, null, 0, 0, false);
+ result = manager.showTimeseries(showTimeSeriesPlan, new QueryContext());
+ assertEquals(2, result.size());
+ Set<String> set = new HashSet<>();
+ set.add("root.laptop.d1.s0");
+ set.add("root.laptop.d1.s1");
+
+ for (int i = 0; i < result.size(); i++) {
+ set.remove(result.get(i).getName());
+ }
+
+ assertTrue(set.isEmpty());
+ } catch (MetadataException e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void testTotalSeriesNumber() throws Exception {
MManager manager = IoTDB.metaManager;
diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
index e7212f0..4b52b54 100644
--- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerImproveTest.java
@@ -35,6 +35,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -136,12 +137,16 @@ public class MManagerImproveTest {
}
private void doCacheTest(String deviceId, List<String> measurementList) throws MetadataException {
- MNode node = mManager.getDeviceNodeWithAutoCreate(new PartialPath(deviceId));
- for (String s : measurementList) {
- assertTrue(node.hasChild(s));
- MeasurementMNode measurementNode = (MeasurementMNode) node.getChild(s);
- TSDataType dataType = measurementNode.getSchema().getType();
- assertEquals(TSDataType.TEXT, dataType);
+ try {
+ MNode node = mManager.getDeviceNodeWithAutoCreate(new PartialPath(deviceId)).left;
+ for (String s : measurementList) {
+ assertTrue(node.hasChild(s));
+ MeasurementMNode measurementNode = (MeasurementMNode) node.getChild(s);
+ TSDataType dataType = measurementNode.getSchema().getType();
+ assertEquals(TSDataType.TEXT, dataType);
+ }
+ } catch (IOException e) {
+ throw new MetadataException(e);
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
new file mode 100644
index 0000000..1b85f8a
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertRowPlanTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.qp.physical;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.PartialPath;
+import org.apache.iotdb.db.qp.Planner;
+import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class InsertRowPlanTest {
+
+ private final Planner processor = new Planner();
+
+ @Before
+ public void before() {
+ EnvironmentUtils.envSetUp();
+ }
+
+ @After
+ public void clean() throws IOException, StorageEngineException {
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(true);
+ EnvironmentUtils.cleanEnv();
+ }
+
+ @Test
+ public void testInsertRowPlan()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {
+ TSDataType.DOUBLE,
+ TSDataType.FLOAT,
+ TSDataType.INT64,
+ TSDataType.INT32,
+ TSDataType.BOOLEAN,
+ TSDataType.TEXT
+ };
+
+ String[] columns = new String[6];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+ columns[3] = 100 + "";
+ columns[4] = false + "";
+ columns[5] = "hh" + 0;
+
+ InsertRowPlan rowPlan =
+ new InsertRowPlan(
+ new PartialPath("root.isp.d1"),
+ time,
+ new String[] {"s1", "s2", "s3", "s4", "s5", "s6"},
+ dataTypes,
+ columns);
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(rowPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(6, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(6, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertRowPlanWithDeviceTemplate()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ List<List<String>> measurementList = new ArrayList<>();
+ measurementList.add(Collections.singletonList("s1"));
+ measurementList.add(Collections.singletonList("s2"));
+ measurementList.add(Collections.singletonList("s3"));
+
+ List<List<TSDataType>> dataTypesList = new ArrayList<>();
+ dataTypesList.add(Collections.singletonList(TSDataType.DOUBLE));
+ dataTypesList.add(Collections.singletonList(TSDataType.FLOAT));
+ dataTypesList.add(Collections.singletonList(TSDataType.INT64));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ encodingList.add(Collections.singletonList(TSEncoding.PLAIN));
+ }
+
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ compressionTypes.add(CompressionType.SNAPPY);
+ }
+
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s1");
+ schemaNames.add("s2");
+ schemaNames.add("s3");
+
+ CreateTemplatePlan plan =
+ new CreateTemplatePlan(
+ "template1",
+ schemaNames,
+ measurementList,
+ dataTypesList,
+ encodingList,
+ compressionTypes);
+
+ IoTDB.metaManager.createDeviceTemplate(plan);
+ IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp.d1"));
+
+ IoTDBDescriptor.getInstance().getConfig().setAutoCreateSchemaEnabled(false);
+
+ InsertRowPlan rowPlan = getInsertRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(rowPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(3, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertRowSerialization() throws IllegalPathException, QueryProcessException {
+ InsertRowPlan plan1 = getInsertRowPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insert(plan1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ plan1.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PhysicalPlanType.INSERT.ordinal(), byteBuffer.get());
+
+ InsertRowPlan plan2 = new InsertRowPlan();
+ plan2.deserialize(byteBuffer);
+ Assert.assertEquals(plan1, plan2);
+ }
+
+ private InsertRowPlan getInsertRowPlan() throws IllegalPathException {
+ long time = 110L;
+ TSDataType[] dataTypes =
+ new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT, TSDataType.INT64};
+
+ String[] columns = new String[3];
+ columns[0] = 1.0 + "";
+ columns[1] = 2 + "";
+ columns[2] = 10000 + "";
+
+ return new InsertRowPlan(
+ new PartialPath("root.isp.d1"), time, new String[] {"s1", "s2", "s3"}, dataTypes, columns);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
index 52b5b64..e665ec3 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/InsertTabletPlanTest.java
@@ -19,16 +19,23 @@
package org.apache.iotdb.db.qp.physical;
import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.qp.Planner;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan.PhysicalPlanType;
+import org.apache.iotdb.db.qp.physical.crud.CreateTemplatePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.SetDeviceTemplatePlan;
+import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -39,7 +46,9 @@ import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
public class InsertTabletPlanTest {
@@ -106,4 +115,151 @@ public class InsertTabletPlanTest {
Assert.assertEquals(6, record.getFields().size());
}
}
+
+ @Test
+ public void testInsertTabletPlanWithDeviceTemplate()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ CreateTemplatePlan plan = getCreateTemplatePlan();
+
+ IoTDB.metaManager.createDeviceTemplate(plan);
+ IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp"));
+
+ InsertTabletPlan tabletPlan = getInsertTabletPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+
+ // nothing can be found when we not insert data
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(0, dataSet.getPaths().size());
+
+ executor.insertTablet(tabletPlan);
+
+ queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp");
+ dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(3, record.getFields().size());
+ }
+ }
+
+ private CreateTemplatePlan getCreateTemplatePlan() {
+ List<List<String>> measurementList = new ArrayList<>();
+ measurementList.add(Collections.singletonList("s1"));
+ measurementList.add(Collections.singletonList("s2"));
+ measurementList.add(Collections.singletonList("s3"));
+
+ List<List<TSDataType>> dataTypesList = new ArrayList<>();
+ dataTypesList.add(Collections.singletonList(TSDataType.DOUBLE));
+ dataTypesList.add(Collections.singletonList(TSDataType.FLOAT));
+ dataTypesList.add(Collections.singletonList(TSDataType.INT64));
+
+ List<List<TSEncoding>> encodingList = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ encodingList.add(Collections.singletonList(TSEncoding.PLAIN));
+ }
+
+ List<CompressionType> compressionTypes = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ compressionTypes.add(CompressionType.SNAPPY);
+ }
+
+ List<String> schemaNames = new ArrayList<>();
+ schemaNames.add("s1");
+ schemaNames.add("s2");
+ schemaNames.add("s3");
+
+ return new CreateTemplatePlan(
+ "template1", schemaNames, measurementList, dataTypesList, encodingList, compressionTypes);
+ }
+
+ @Test
+ public void testInsertTabletPlanWithDeviceTemplateAndAutoCreateSchema()
+ throws QueryProcessException, MetadataException, InterruptedException,
+ QueryFilterOptimizationException, StorageEngineException, IOException {
+ CreateTemplatePlan plan = getCreateTemplatePlan();
+
+ IoTDB.metaManager.createDeviceTemplate(plan);
+ IoTDB.metaManager.setDeviceTemplate(new SetDeviceTemplatePlan("template1", "root.isp"));
+ InsertTabletPlan tabletPlan = getInsertTabletPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(tabletPlan);
+
+ QueryPlan queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ QueryDataSet dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(3, record.getFields().size());
+ }
+
+ // test recover
+ EnvironmentUtils.stopDaemon();
+ IoTDB.metaManager.clear();
+ // wait for close
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.currentThread().interrupt();
+ }
+ EnvironmentUtils.activeDaemon();
+
+ queryPlan = (QueryPlan) processor.parseSQLToPhysicalPlan("select * from root.isp.d1");
+ dataSet = executor.processQuery(queryPlan, EnvironmentUtils.TEST_QUERY_CONTEXT);
+ Assert.assertEquals(3, dataSet.getPaths().size());
+ while (dataSet.hasNext()) {
+ RowRecord record = dataSet.next();
+ Assert.assertEquals(3, record.getFields().size());
+ }
+ }
+
+ @Test
+ public void testInsertTabletSerialization() throws IllegalPathException, QueryProcessException {
+ InsertTabletPlan plan1 = getInsertTabletPlan();
+
+ PlanExecutor executor = new PlanExecutor();
+ executor.insertTablet(plan1);
+
+ ByteBuffer byteBuffer = ByteBuffer.allocate(10000);
+ plan1.serialize(byteBuffer);
+ byteBuffer.flip();
+
+ Assert.assertEquals(PhysicalPlanType.BATCHINSERT.ordinal(), byteBuffer.get());
+
+ InsertTabletPlan plan2 = new InsertTabletPlan();
+ plan2.deserialize(byteBuffer);
+
+ Assert.assertEquals(plan1, plan2);
+ }
+
+ private InsertTabletPlan getInsertTabletPlan() throws IllegalPathException {
+ long[] times = new long[] {110L, 111L, 112L, 113L};
+ List<Integer> dataTypes = new ArrayList<>();
+ dataTypes.add(TSDataType.DOUBLE.ordinal());
+ dataTypes.add(TSDataType.FLOAT.ordinal());
+ dataTypes.add(TSDataType.INT64.ordinal());
+
+ Object[] columns = new Object[3];
+ columns[0] = new double[4];
+ columns[1] = new float[4];
+ columns[2] = new long[4];
+
+ for (int r = 0; r < 4; r++) {
+ ((double[]) columns[0])[r] = 1.0;
+ ((float[]) columns[1])[r] = 2;
+ ((long[]) columns[2])[r] = 10000;
+ }
+
+ InsertTabletPlan tabletPlan =
+ new InsertTabletPlan(
+ new PartialPath("root.isp.d1"), new String[] {"s1", "s2", "s3"}, dataTypes);
+ tabletPlan.setTimes(times);
+ tabletPlan.setColumns(columns);
+ tabletPlan.setRowCount(times.length);
+ return tabletPlan;
+ }
}
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
index 55eea5c..a77d592 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesAggregateReaderTest.java
@@ -44,7 +44,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
public class SeriesAggregateReaderTest {
@@ -71,7 +73,7 @@ public class SeriesAggregateReaderTest {
PartialPath path = new PartialPath(SERIES_READER_TEST_SG + ".device0.sensor0");
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
- QueryDataSource queryDataSource = new QueryDataSource(path, seqResources, unseqResources);
+ QueryDataSource queryDataSource = new QueryDataSource(seqResources, unseqResources);
SeriesAggregateReader seriesReader =
new SeriesAggregateReader(
path,
diff --git a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
index d78d6d4..adb6b88 100644
--- a/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/query/reader/series/SeriesReaderByTimestampTest.java
@@ -61,11 +61,7 @@ public class SeriesReaderByTimestampTest {
@Test
public void test() throws IOException, IllegalPathException {
- QueryDataSource dataSource =
- new QueryDataSource(
- new PartialPath(SERIES_READER_TEST_SG + ".device0.sensor0"),
- seqResources,
- unseqResources);
+ QueryDataSource dataSource = new QueryDataSource(seqResources, unseqResources);
Set<String> allSensors = new HashSet<>();
allSensors.add("sensor0");
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
index d644319..0ace7d4 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/RpcUtils.java
@@ -81,6 +81,7 @@ public class RpcUtils {
if (status.getCode() == TSStatusCode.NEED_REDIRECTION.getStatusCode()) {
return;
}
+
if (status.code != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
throw new StatementExecutionException(status);
}
diff --git a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index f2fd3c1..9febb12 100644
--- a/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ b/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -46,6 +46,9 @@ public enum TSStatusCode {
LOAD_FILE_ERROR(316),
STORAGE_GROUP_NOT_READY(317),
ILLEGAL_PARAMETER(318),
+ DUPLICATED_TEMPLATE(320),
+ UNDEFINED_TEMPLATE(321),
+ STORAGE_GROUP_NOT_EXIST(322),
EXECUTE_STATEMENT_ERROR(400),
SQL_PARSE_ERROR(401),
diff --git a/thrift/src/main/thrift/rpc.thrift b/thrift/src/main/thrift/rpc.thrift
index 94892fa..5eadbde 100644
--- a/thrift/src/main/thrift/rpc.thrift
+++ b/thrift/src/main/thrift/rpc.thrift
@@ -299,6 +299,22 @@ struct ServerProperties {
3: required string timestampPrecision;
}
+struct TSSetSchemaTemplateReq {
+ 1: required i64 sessionId
+ 2: required string templateName
+ 3: required string prefixPath
+}
+
+struct TSCreateSchemaTemplateReq {
+ 1: required i64 sessionId
+ 2: required string name
+ 3: required list<string> schemaNames
+ 4: required list<list<string>> measurements
+ 5: required list<list<i32>> dataTypes
+ 6: required list<list<i32>> encodings
+ 7: required list<i32> compressors
+}
+
service TSIService {
TSOpenSessionResp openSession(1:TSOpenSessionReq req);
@@ -369,4 +385,8 @@ service TSIService {
TSExecuteStatementResp executeRawDataQuery(1:TSRawDataQueryReq req);
i64 requestStatementId(1:i64 sessionId);
+
+ TSStatus createSchemaTemplate(1:TSCreateSchemaTemplateReq req);
+
+ TSStatus setSchemaTemplate(1:TSSetSchemaTemplateReq req);
}