You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/14 05:04:44 UTC
[iotdb] branch new_vector updated: Fit cluster part
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch new_vector
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/new_vector by this push:
new dcc9dd5 Fit cluster part
dcc9dd5 is described below
commit dcc9dd5891b62c74c6e0070c52d8cd4ee8d6d92d
Author: Alima777 <wx...@gmail.com>
AuthorDate: Sun Nov 14 13:04:00 2021 +0800
Fit cluster part
---
.../apache/iotdb/cluster/metadata/CMManager.java | 22 +++++----
.../iotdb/cluster/query/LocalQueryExecutor.java | 33 ++++++-------
.../query/last/ClusterLastQueryExecutor.java | 2 -
.../cluster/query/reader/ClusterReaderFactory.java | 6 +--
.../iotdb/cluster/utils/ClusterQueryUtils.java | 55 ++++++----------------
.../cluster/client/sync/SyncClientAdaptorTest.java | 16 +++----
.../org/apache/iotdb/cluster/common/IoTDBTest.java | 14 +++---
.../apache/iotdb/cluster/query/BaseQueryTest.java | 7 ++-
.../query/ClusterAggregateExecutorTest.java | 47 ++++++------------
.../query/ClusterPhysicalGeneratorTest.java | 1 -
.../cluster/query/ClusterPlanExecutorTest.java | 2 -
.../iotdb/cluster/query/ClusterPlannerTest.java | 1 -
.../cluster/query/ClusterQueryRouterTest.java | 52 +++++++-------------
.../query/fill/ClusterFillExecutorTest.java | 17 +++----
.../ClusterGroupByNoVFilterDataSetTest.java | 7 +--
.../groupby/ClusterGroupByVFilterDataSetTest.java | 11 +++--
.../query/groupby/MergeGroupByExecutorTest.java | 5 +-
.../query/groupby/RemoteGroupByExecutorTest.java | 5 +-
.../query/reader/ClusterReaderFactoryTest.java | 9 ++--
.../cluster/server/member/DataGroupMemberTest.java | 11 ++---
.../cluster/server/member/MetaGroupMemberTest.java | 4 +-
.../org/apache/iotdb/db/metadata/mtree/MTree.java | 11 ++++-
.../iotdb/db/metadata/path/MeasurementPath.java | 10 ++++
thrift-cluster/src/main/thrift/cluster.thrift | 9 ++--
.../tsfile/write/schema/IMeasurementSchema.java | 2 +
.../write/schema/UnaryMeasurementSchema.java | 5 ++
.../write/schema/VectorMeasurementSchema.java | 5 ++
27 files changed, 165 insertions(+), 204 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
index 856c452..4626a52 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
+import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -119,8 +120,6 @@ import java.util.stream.Collectors;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.LOG_FAIL_CONNECT;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.THREAD_POOL_SIZE;
import static org.apache.iotdb.cluster.query.ClusterPlanExecutor.waitForThreadPool;
-import static org.apache.iotdb.cluster.utils.ClusterQueryUtils.getAssembledPathFromRequest;
-import static org.apache.iotdb.cluster.utils.ClusterQueryUtils.getPathStrListForRequest;
import static org.apache.iotdb.db.utils.EncodingInferenceUtils.getDefaultEncoding;
@SuppressWarnings("java:S1135") // ignore todos
@@ -1057,17 +1056,17 @@ public class CMManager extends MManager {
if (result != null) {
// paths may be empty, implying that the group does not contain matched paths, so we do not
// need to query other nodes in the group
- List<MeasurementPath> partialPaths = new ArrayList<>();
+ List<MeasurementPath> measurementPaths = new ArrayList<>();
for (int i = 0; i < result.paths.size(); i++) {
- // todo check this transform
MeasurementPath matchedPath =
- (MeasurementPath) getAssembledPathFromRequest(result.paths.get(i));
- partialPaths.add(matchedPath);
+ ClusterQueryUtils.getAssembledPathFromRequest(
+ result.getPaths().get(i), result.getDataTypes().get(i));
+ measurementPaths.add(matchedPath);
if (withAlias && matchedPath != null) {
matchedPath.setMeasurementAlias(result.aliasList.get(i));
}
}
- return partialPaths;
+ return measurementPaths;
} else {
// a null implies a network failure, so we have to query other nodes in the group
return null;
@@ -1671,14 +1670,16 @@ public class CMManager extends MManager {
public GetAllPathsResult getAllPaths(List<String> paths, boolean withAlias)
throws MetadataException {
- List<List<String>> retPaths = new ArrayList<>();
+ List<String> retPaths = new ArrayList<>();
+ List<Byte> dataTypes = new ArrayList<>();
List<String> alias = withAlias ? new ArrayList<>() : null;
for (String path : paths) {
List<MeasurementPath> allTimeseriesPathWithAlias =
super.getMeasurementPathsWithAlias(new PartialPath(path), -1, -1).left;
- for (PartialPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
- retPaths.add(getPathStrListForRequest(timeseriesPathWithAlias));
+ for (MeasurementPath timeseriesPathWithAlias : allTimeseriesPathWithAlias) {
+ retPaths.add(timeseriesPathWithAlias.getFullPath());
+ dataTypes.add(timeseriesPathWithAlias.getSeriesTypeInByte());
if (withAlias) {
alias.add(timeseriesPathWithAlias.getMeasurementAlias());
}
@@ -1687,6 +1688,7 @@ public class CMManager extends MManager {
GetAllPathsResult getAllPathsResult = new GetAllPathsResult();
getAllPathsResult.setPaths(retPaths);
+ getAllPathsResult.setDataTypes(dataTypes);
getAllPathsResult.setAliasList(alias);
return getAllPathsResult;
}
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
index cf94688..4b212ec 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/LocalQueryExecutor.java
@@ -40,7 +40,6 @@ import org.apache.iotdb.cluster.rpc.thrift.PullSchemaResp;
import org.apache.iotdb.cluster.rpc.thrift.RaftNode;
import org.apache.iotdb.cluster.rpc.thrift.SingleSeriesQueryRequest;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
-import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
import org.apache.iotdb.cluster.utils.ClusterUtils;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
@@ -219,7 +218,8 @@ public class LocalQueryExecutor {
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- PartialPath path = getAssembledPathFromRequest(request.getPath());
+ MeasurementPath path =
+ getAssembledPathFromRequest(request.getPath(), (byte) request.getDataTypeOrdinal());
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
Filter timeFilter = null;
Filter valueFilter = null;
@@ -297,12 +297,14 @@ public class LocalQueryExecutor {
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- List<PartialPath> paths = Lists.newArrayList();
- request.getPath().forEach(path -> paths.add(getAssembledPathFromRequest(path)));
-
+ List<MeasurementPath> paths = Lists.newArrayList();
List<TSDataType> dataTypes = Lists.newArrayList();
- request.getDataTypeOrdinal().forEach(dataType -> dataTypes.add(TSDataType.values()[dataType]));
-
+ for (int i = 0; i < request.getPath().size(); i++) {
+ paths.add(
+ getAssembledPathFromRequest(
+ request.getPath().get(i), request.getDataTypeOrdinal().get(i).byteValue()));
+ dataTypes.add(TSDataType.values()[request.getDataTypeOrdinal().get(i)]);
+ }
Filter timeFilter = null;
Filter valueFilter = null;
if (request.isSetTimeFilterBytes()) {
@@ -539,7 +541,8 @@ public class LocalQueryExecutor {
request.getQueryId());
dataGroupMember.syncLeaderWithConsistencyCheck(false);
- PartialPath path = getAssembledPathFromRequest(request.getPath());
+ MeasurementPath path =
+ getAssembledPathFromRequest(request.getPath(), (byte) request.getDataTypeOrdinal());
TSDataType dataType = TSDataType.values()[request.dataTypeOrdinal];
Set<String> deviceMeasurements = request.getDeviceMeasurements();
@@ -637,7 +640,7 @@ public class LocalQueryExecutor {
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
PartialPath path;
try {
- path = new PartialPath(request.getPath());
+ path = new MeasurementPath(request.getPath(), dataType);
} catch (IllegalPathException e) {
logger.error(
"{}: aggregation has error path: {}, queryId: {}",
@@ -706,7 +709,6 @@ public class LocalQueryExecutor {
throw new QueryProcessException(e.getMessage());
}
- ClusterQueryUtils.checkPathExistence(path);
List<AggregateResult> results = new ArrayList<>();
for (String aggregation : aggregations) {
results.add(AggregateResultFactory.getAggrResultByName(aggregation, dataType, ascending));
@@ -791,7 +793,6 @@ public class LocalQueryExecutor {
throw new StorageEngineException(e);
}
- ClusterQueryUtils.checkPathExistence(path);
List<Integer> nodeSlots =
((SlotPartitionTable) dataGroupMember.getMetaGroupMember().getPartitionTable())
.getNodeSlots(dataGroupMember.getHeader());
@@ -821,14 +822,14 @@ public class LocalQueryExecutor {
*/
public long getGroupByExecutor(GroupByRequest request)
throws QueryProcessException, StorageEngineException {
+ List<Integer> aggregationTypeOrdinals = request.getAggregationTypeOrdinals();
+ TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
PartialPath path;
try {
- path = new PartialPath(request.getPath());
+ path = new MeasurementPath(request.getPath(), dataType);
} catch (IllegalPathException e) {
throw new QueryProcessException(e);
}
- List<Integer> aggregationTypeOrdinals = request.getAggregationTypeOrdinals();
- TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
Filter timeFilter = null;
if (request.isSetTimeFilterBytes()) {
timeFilter = FilterFactory.deserialize(request.timeFilterBytes);
@@ -922,8 +923,8 @@ public class LocalQueryExecutor {
public ByteBuffer previousFill(PreviousFillRequest request)
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
- PartialPath path = new PartialPath(request.getPath());
TSDataType dataType = TSDataType.values()[request.getDataTypeOrdinal()];
+ PartialPath path = new MeasurementPath(request.getPath(), dataType);
long queryId = request.getQueryId();
long queryTime = request.getQueryTime();
long beforeRange = request.getBeforeRange();
@@ -1014,7 +1015,7 @@ public class LocalQueryExecutor {
for (Integer dataTypeOrdinal : request.dataTypeOrdinals) {
dataTypes.add(TSDataType.values()[dataTypeOrdinal]);
}
- ClusterQueryUtils.checkPathExistence(partialPaths);
+
IExpression expression = null;
if (request.isSetFilterBytes()) {
Filter filter = FilterFactory.deserialize(request.filterBytes);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
index 1dae797..3bb71fd 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/last/ClusterLastQueryExecutor.java
@@ -31,7 +31,6 @@ import org.apache.iotdb.cluster.rpc.thrift.LastQueryRequest;
import org.apache.iotdb.cluster.rpc.thrift.Node;
import org.apache.iotdb.cluster.server.member.DataGroupMember;
import org.apache.iotdb.cluster.server.member.MetaGroupMember;
-import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
@@ -180,7 +179,6 @@ public class ClusterLastQueryExecutor extends LastQueryExecutor {
PartitionGroup group, List<PartialPath> seriesPaths, QueryContext context)
throws QueryProcessException, StorageEngineException, IOException {
if (group.contains(metaGroupMember.getThisNode())) {
- ClusterQueryUtils.checkPathExistence(seriesPaths);
return calculateSeriesLastLocally(group, seriesPaths, context);
} else {
return calculateSeriesLastRemotely(group, seriesPaths, context);
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
index e25a353..f74b406 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactory.java
@@ -52,6 +52,7 @@ import org.apache.iotdb.cluster.utils.ClusterQueryUtils;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.aggregation.AggregationType;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -540,7 +541,6 @@ public class ClusterReaderFactory {
boolean ascending,
Set<Integer> requiredSlots)
throws StorageEngineException, QueryProcessException {
- ClusterQueryUtils.checkPathExistence(path);
// If requiredSlots is null, it means that this node should provide data of all slots about
// required paths.
if (requiredSlots == null) {
@@ -686,7 +686,7 @@ public class ClusterReaderFactory {
request.setValueFilterBytes(SerializeUtils.serializeFilter(valueFilter));
}
- List<List<String>> fullPaths = Lists.newArrayList();
+ List<String> fullPaths = Lists.newArrayList();
paths.forEach(path -> fullPaths.add(getPathStrListForRequest(path)));
List<Integer> dataTypeOrdinals = Lists.newArrayList();
@@ -1076,7 +1076,7 @@ public class ClusterReaderFactory {
* @throws StorageEngineException
*/
public IBatchReader getMultSeriesBatchReader(
- List<PartialPath> paths,
+ List<MeasurementPath> paths,
Map<String, Set<String>> allSensors,
List<TSDataType> dataTypes,
Filter timeFilter,
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
index 4ec1ecb..3cec0a8 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/utils/ClusterQueryUtils.java
@@ -23,17 +23,17 @@ import org.apache.iotdb.cluster.metadata.MetaPuller;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.List;
public class ClusterQueryUtils {
@@ -43,20 +43,6 @@ public class ClusterQueryUtils {
// util class
}
- /**
- * Check if the given path exists locally or can be pulled from a remote node.
- *
- * @param path
- * @throws QueryProcessException
- */
- public static void checkPathExistence(String path) throws QueryProcessException {
- try {
- checkPathExistence(new PartialPath(path));
- } catch (IllegalPathException e) {
- throw new QueryProcessException(e);
- }
- }
-
public static void checkPathExistence(PartialPath path) throws QueryProcessException {
if (!IoTDB.metaManager.isPathExist(path)) {
try {
@@ -67,27 +53,15 @@ public class ClusterQueryUtils {
}
}
- public static void checkPathExistence(List<PartialPath> paths) throws QueryProcessException {
- for (PartialPath path : paths) {
- checkPathExistence(path);
- }
- }
-
/**
* Generate path string list for RPC request.
*
- * <p>If vector path, return its vectorId with all subSensors. Else just return path string.
+ * <p>If vector path, return its vectorId with all subSensors. Else just return path string. TODO
+ * aligned path
*/
- public static List<String> getPathStrListForRequest(Path path) {
- if (path instanceof AlignedPath) {
- List<String> pathWithSubSensors =
- new ArrayList<>(((AlignedPath) path).getMeasurementList().size() + 1);
- pathWithSubSensors.add(path.getFullPath());
- pathWithSubSensors.addAll(((AlignedPath) path).getMeasurementList());
- return pathWithSubSensors;
- } else {
- return Collections.singletonList(path.getFullPath());
- }
+ public static String getPathStrListForRequest(Path path) {
+ // TODO aligned Path
+ return path.getFullPath();
}
/**
@@ -95,13 +69,14 @@ public class ClusterQueryUtils {
*
* <p>This method is corresponding to getPathStringListForRequest().
*/
- public static PartialPath getAssembledPathFromRequest(List<String> pathString) {
+ public static MeasurementPath getAssembledPathFromRequest(String pathString, byte dataType) {
+ // TODO aligned path
try {
- if (pathString.size() == 1) {
- return new PartialPath(pathString.get(0));
- } else {
- return new AlignedPath(pathString.get(0), pathString.subList(1, pathString.size()));
- }
+ MeasurementPath matchedPath = new MeasurementPath(pathString);
+ matchedPath.setMeasurementSchema(
+ new UnaryMeasurementSchema(
+ matchedPath.getMeasurement(), TSDataType.deserialize(dataType)));
+ return matchedPath;
} catch (IllegalPathException e) {
logger.error("Failed to create partial path, fullPath is {}.", pathString, e);
return null;
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
index 21cac56..1247c4c 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/client/sync/SyncClientAdaptorTest.java
@@ -245,11 +245,11 @@ public class SyncClientAdaptorTest {
List<String> path,
boolean withAlias,
AsyncMethodCallback<GetAllPathsResult> resultHandler) {
- List<List<String>> pathString = new ArrayList<>();
- for (String s : path) {
- pathString.add(Collections.singletonList(s));
+ List<Byte> dataTypes = new ArrayList<>();
+ for (int i = 0; i < path.size(); i++) {
+ dataTypes.add(TSDataType.DOUBLE.serialize());
}
- resultHandler.onComplete(new GetAllPathsResult(pathString));
+ resultHandler.onComplete(new GetAllPathsResult(path, dataTypes));
}
@Override
@@ -395,10 +395,10 @@ public class SyncClientAdaptorTest {
paths.subList(0, paths.size() / 2),
SyncClientAdaptor.getUnregisteredMeasurements(
dataClient, TestUtils.getRaftNode(0, 0), paths));
- List<String> result = new ArrayList<>();
- SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0), paths, false)
- .paths
- .forEach(p -> result.add(p.get(0)));
+ List<String> result =
+ new ArrayList<>(
+ SyncClientAdaptor.getAllPaths(dataClient, TestUtils.getRaftNode(0, 0), paths, false)
+ .paths);
assertEquals(paths, result);
assertEquals(
paths.size(),
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
index 51f76b2..9034121 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/common/IoTDBTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.executor.PlanExecutor;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
@@ -37,13 +38,13 @@ import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import org.junit.After;
import org.junit.Before;
@@ -163,16 +164,13 @@ public abstract class IoTDBTest {
queryPlan.setExpression(expression);
List<PartialPath> paths = new ArrayList<>();
for (String pathStr : pathStrs) {
- paths.add(new PartialPath(pathStr));
+ MeasurementPath path = new MeasurementPath(pathStr);
+ path.setMeasurementSchema(
+ new UnaryMeasurementSchema(path.getMeasurement(), TSDataType.DOUBLE));
+ paths.add(path);
}
queryPlan.setDeduplicatedPathsAndUpdate(paths);
queryPlan.setPaths(paths);
- List<TSDataType> dataTypes = new ArrayList<>();
- for (PartialPath path : paths) {
- dataTypes.add(IoTDB.metaManager.getSeriesType(path));
- }
- queryPlan.setDeduplicatedDataTypes(dataTypes);
- queryPlan.setDataTypes(dataTypes);
queryPlan.setExpression(expression);
return planExecutor.processQuery(queryPlan, context);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
index 38fc870..2ce9652 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/BaseQueryTest.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.cluster.common.TestUtils;
import org.apache.iotdb.cluster.server.member.BaseMember;
import org.apache.iotdb.cluster.server.monitor.NodeStatusManager;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -50,7 +51,6 @@ import static org.junit.Assert.assertNull;
public class BaseQueryTest extends BaseMember {
protected List<PartialPath> pathList;
- protected List<TSDataType> dataTypes;
protected int defaultCompactionThread =
IoTDBDescriptor.getInstance().getConfig().getConcurrentCompactionThread();
@@ -76,10 +76,9 @@ public class BaseQueryTest extends BaseMember {
IoTDBDescriptor.getInstance().getConfig().setConcurrentCompactionThread(0);
super.setUp();
pathList = new ArrayList<>();
- dataTypes = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
- dataTypes.add(TSDataType.DOUBLE);
+ MeasurementPath path = new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE);
+ pathList.add(path);
}
NodeStatusManager.getINSTANCE().setMetaGroupMember(testMetaMember);
TestUtils.prepareData();
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
index 602994f..9063699 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterAggregateExecutorTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.cluster.query.aggregate.ClusterAggregateExecutor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
@@ -41,6 +42,7 @@ import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -56,20 +58,13 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
public void testNoFilter()
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
AggregationPlan plan = new AggregationPlan();
- List<PartialPath> paths =
- Arrays.asList(
- new PartialPath(TestUtils.getTestSeries(0, 0)),
- new PartialPath(TestUtils.getTestSeries(0, 1)),
- new PartialPath(TestUtils.getTestSeries(0, 2)),
- new PartialPath(TestUtils.getTestSeries(0, 3)),
- new PartialPath(TestUtils.getTestSeries(0, 4)));
- List<TSDataType> dataTypes =
- Arrays.asList(
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE);
+ List<PartialPath> paths = new ArrayList<>();
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 1), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 2), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 3), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 4), TSDataType.DOUBLE));
+
List<String> aggregations =
Arrays.asList(
SQLConstant.MIN_TIME,
@@ -79,8 +74,6 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
SQLConstant.SUM);
plan.setPaths(paths);
plan.setDeduplicatedPathsAndUpdate(paths);
- plan.setDataTypes(dataTypes);
- plan.setDeduplicatedDataTypes(dataTypes);
plan.setAggregations(aggregations);
plan.setDeduplicatedAggregations(aggregations);
@@ -107,20 +100,12 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
public void testFilter()
throws StorageEngineException, IOException, QueryProcessException, IllegalPathException {
AggregationPlan plan = new AggregationPlan();
- List<PartialPath> paths =
- Arrays.asList(
- new PartialPath(TestUtils.getTestSeries(0, 0)),
- new PartialPath(TestUtils.getTestSeries(0, 1)),
- new PartialPath(TestUtils.getTestSeries(0, 2)),
- new PartialPath(TestUtils.getTestSeries(0, 3)),
- new PartialPath(TestUtils.getTestSeries(0, 4)));
- List<TSDataType> dataTypes =
- Arrays.asList(
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE);
+ List<PartialPath> paths = new ArrayList<>();
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 1), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 2), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 3), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 4), TSDataType.DOUBLE));
List<String> aggregations =
Arrays.asList(
SQLConstant.MIN_TIME,
@@ -130,8 +115,6 @@ public class ClusterAggregateExecutorTest extends BaseQueryTest {
SQLConstant.SUM);
plan.setPaths(paths);
plan.setDeduplicatedPathsAndUpdate(paths);
- plan.setDataTypes(dataTypes);
- plan.setDeduplicatedDataTypes(dataTypes);
plan.setAggregations(aggregations);
plan.setDeduplicatedAggregations(aggregations);
plan.setExpression(
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPhysicalGeneratorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPhysicalGeneratorTest.java
index d4fe0ba..fcea051 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPhysicalGeneratorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPhysicalGeneratorTest.java
@@ -68,6 +68,5 @@ public class ClusterPhysicalGeneratorTest extends BaseQueryTest {
RawDataQueryPlan plan = (RawDataQueryPlan) physicalGenerator.transformToPhysicalPlan(operator);
assertEquals(pathList, plan.getDeduplicatedPaths());
- assertEquals(dataTypes, plan.getDeduplicatedDataTypes());
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java
index 4da865e..fd2de79 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlanExecutorTest.java
@@ -58,9 +58,7 @@ public class ClusterPlanExecutorTest extends BaseQueryTest {
IOException, MetadataException, InterruptedException {
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
queryPlan.setDeduplicatedPathsAndUpdate(pathList);
- queryPlan.setDeduplicatedDataTypes(dataTypes);
queryPlan.setPaths(pathList);
- queryPlan.setDataTypes(dataTypes);
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlannerTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlannerTest.java
index 9fd1acd..1bd4dee 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlannerTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterPlannerTest.java
@@ -51,6 +51,5 @@ public class ClusterPlannerTest extends BaseQueryTest {
String sql = String.format("SELECT s0 FROM %s", String.join(",", sgs));
RawDataQueryPlan plan = (RawDataQueryPlan) parser.parseSQLToPhysicalPlan(sql);
assertEquals(pathList, plan.getDeduplicatedPaths());
- assertEquals(dataTypes, plan.getDeduplicatedDataTypes());
}
}
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
index b699737..c6dfaa4 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
@@ -76,7 +77,6 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
public void test() throws StorageEngineException, IOException, QueryProcessException {
RawDataQueryPlan queryPlan = new RawDataQueryPlan();
queryPlan.setDeduplicatedPathsAndUpdate(pathList);
- queryPlan.setDeduplicatedDataTypes(dataTypes);
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
@@ -93,20 +93,12 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
throws StorageEngineException, IOException, QueryProcessException,
QueryFilterOptimizationException, IllegalPathException {
AggregationPlan plan = new AggregationPlan();
- List<PartialPath> paths =
- Arrays.asList(
- new PartialPath(TestUtils.getTestSeries(0, 0)),
- new PartialPath(TestUtils.getTestSeries(0, 1)),
- new PartialPath(TestUtils.getTestSeries(0, 2)),
- new PartialPath(TestUtils.getTestSeries(0, 3)),
- new PartialPath(TestUtils.getTestSeries(0, 4)));
- List<TSDataType> dataTypes =
- Arrays.asList(
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE,
- TSDataType.DOUBLE);
+ List<PartialPath> paths = new ArrayList<>();
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 1), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 2), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 3), TSDataType.DOUBLE));
+ paths.add(new MeasurementPath(TestUtils.getTestSeries(0, 4), TSDataType.DOUBLE));
List<String> aggregations =
Arrays.asList(
SQLConstant.MIN_TIME,
@@ -116,8 +108,6 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
SQLConstant.SUM);
plan.setPaths(paths);
plan.setDeduplicatedPathsAndUpdate(paths);
- plan.setDataTypes(dataTypes);
- plan.setDeduplicatedDataTypes(dataTypes);
plan.setAggregations(aggregations);
plan.setDeduplicatedAggregations(aggregations);
@@ -137,10 +127,9 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
FillQueryPlan plan = new FillQueryPlan();
plan.setDeduplicatedPathsAndUpdate(
- Collections.singletonList(new PartialPath(TestUtils.getTestSeries(0, 10))));
- plan.setDeduplicatedDataTypes(Collections.singletonList(TSDataType.DOUBLE));
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
plan.setPaths(plan.getDeduplicatedPaths());
- plan.setDataTypes(plan.getDeduplicatedDataTypes());
long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
Map<TSDataType, IFill> tsDataTypeIFillMap =
Collections.singletonMap(
@@ -176,10 +165,9 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
FillQueryPlan plan = new FillQueryPlan();
plan.setDeduplicatedPathsAndUpdate(
- Collections.singletonList(new PartialPath(TestUtils.getTestSeries(0, 10))));
- plan.setDeduplicatedDataTypes(Collections.singletonList(TSDataType.DOUBLE));
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
plan.setPaths(plan.getDeduplicatedPaths());
- plan.setDataTypes(plan.getDeduplicatedDataTypes());
long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
Map<TSDataType, IFill> tsDataTypeIFillMap =
Collections.singletonMap(
@@ -221,17 +209,13 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
try {
GroupByTimePlan groupByPlan = new GroupByTimePlan();
List<PartialPath> pathList = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
List<String> aggregations = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
- dataTypes.add(TSDataType.DOUBLE);
+ pathList.add(new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE));
aggregations.add(SQLConstant.COUNT);
}
groupByPlan.setPaths(pathList);
groupByPlan.setDeduplicatedPathsAndUpdate(pathList);
- groupByPlan.setDataTypes(dataTypes);
- groupByPlan.setDeduplicatedDataTypes(dataTypes);
groupByPlan.setAggregations(aggregations);
groupByPlan.setDeduplicatedAggregations(aggregations);
@@ -243,9 +227,11 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
IExpression expression =
BinaryExpression.and(
new SingleSeriesExpression(
- new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)),
+ new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE),
+ ValueFilter.gtEq(5.0)),
new SingleSeriesExpression(
- new PartialPath(TestUtils.getTestSeries(5, 0)), TimeFilter.ltEq(15)));
+ new MeasurementPath(TestUtils.getTestSeries(5, 0), TSDataType.DOUBLE),
+ TimeFilter.ltEq(15)));
groupByPlan.setExpression(expression);
QueryDataSet queryDataSet = clusterQueryRouter.groupBy(groupByPlan, queryContext);
@@ -274,17 +260,13 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
try {
GroupByTimePlan groupByPlan = new GroupByTimePlan();
List<PartialPath> pathList = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
List<String> aggregations = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
- dataTypes.add(TSDataType.DOUBLE);
+ pathList.add(new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE));
aggregations.add(SQLConstant.COUNT);
}
groupByPlan.setPaths(pathList);
groupByPlan.setDeduplicatedPathsAndUpdate(pathList);
- groupByPlan.setDataTypes(dataTypes);
- groupByPlan.setDeduplicatedDataTypes(dataTypes);
groupByPlan.setAggregations(aggregations);
groupByPlan.setDeduplicatedAggregations(aggregations);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/fill/ClusterFillExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/fill/ClusterFillExecutorTest.java
index f81ea6d..93ae2fa 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/fill/ClusterFillExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/fill/ClusterFillExecutorTest.java
@@ -26,7 +26,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
@@ -52,10 +52,9 @@ public class ClusterFillExecutorTest extends BaseQueryTest {
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
FillQueryPlan plan = new FillQueryPlan();
plan.setDeduplicatedPathsAndUpdate(
- Collections.singletonList(new PartialPath(TestUtils.getTestSeries(0, 10))));
- plan.setDeduplicatedDataTypes(Collections.singletonList(TSDataType.DOUBLE));
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
plan.setPaths(plan.getDeduplicatedPaths());
- plan.setDataTypes(plan.getDeduplicatedDataTypes());
long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
Map<TSDataType, IFill> tsDataTypeIFillMap =
Collections.singletonMap(
@@ -93,10 +92,9 @@ public class ClusterFillExecutorTest extends BaseQueryTest {
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
FillQueryPlan plan = new FillQueryPlan();
plan.setDeduplicatedPathsAndUpdate(
- Collections.singletonList(new PartialPath(TestUtils.getTestSeries(0, 10))));
- plan.setDeduplicatedDataTypes(Collections.singletonList(TSDataType.DOUBLE));
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
plan.setPaths(plan.getDeduplicatedPaths());
- plan.setDataTypes(plan.getDeduplicatedDataTypes());
long defaultFillInterval = IoTDBDescriptor.getInstance().getConfig().getDefaultFillInterval();
Map<TSDataType, IFill> tsDataTypeIFillMap =
Collections.singletonMap(
@@ -135,10 +133,9 @@ public class ClusterFillExecutorTest extends BaseQueryTest {
throws QueryProcessException, StorageEngineException, IOException, IllegalPathException {
FillQueryPlan plan = new FillQueryPlan();
plan.setDeduplicatedPathsAndUpdate(
- Collections.singletonList(new PartialPath(TestUtils.getTestSeries(0, 10))));
- plan.setDeduplicatedDataTypes(Collections.singletonList(TSDataType.DOUBLE));
+ Collections.singletonList(
+ new MeasurementPath(TestUtils.getTestSeries(0, 10), TSDataType.DOUBLE)));
plan.setPaths(plan.getDeduplicatedPaths());
- plan.setDataTypes(plan.getDeduplicatedDataTypes());
double fillValue = 1.0D;
Map<TSDataType, IFill> tsDataTypeIFillMap =
Collections.singletonMap(
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java
index a4b0666..49ed188 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByNoVFilterDataSetTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
@@ -52,17 +53,13 @@ public class ClusterGroupByNoVFilterDataSetTest extends BaseQueryTest {
try {
GroupByTimePlan groupByPlan = new GroupByTimePlan();
List<PartialPath> pathList = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
List<String> aggregations = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
- dataTypes.add(TSDataType.DOUBLE);
+ pathList.add(new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE));
aggregations.add(SQLConstant.COUNT);
}
groupByPlan.setPaths(pathList);
groupByPlan.setDeduplicatedPathsAndUpdate(pathList);
- groupByPlan.setDataTypes(dataTypes);
- groupByPlan.setDeduplicatedDataTypes(dataTypes);
groupByPlan.setAggregations(aggregations);
groupByPlan.setDeduplicatedAggregations(aggregations);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java
index 740ae0a..e3bacab 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/ClusterGroupByVFilterDataSetTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
@@ -55,11 +56,9 @@ public class ClusterGroupByVFilterDataSetTest extends BaseQueryTest {
try {
GroupByTimePlan groupByPlan = new GroupByTimePlan();
List<PartialPath> pathList = new ArrayList<>();
- List<TSDataType> dataTypes = new ArrayList<>();
List<String> aggregations = new ArrayList<>();
for (int i = 0; i < 10; i++) {
- pathList.add(new PartialPath(TestUtils.getTestSeries(i, 0)));
- dataTypes.add(TSDataType.DOUBLE);
+ pathList.add(new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE));
aggregations.add(SQLConstant.COUNT);
}
groupByPlan.setPaths(pathList);
@@ -75,9 +74,11 @@ public class ClusterGroupByVFilterDataSetTest extends BaseQueryTest {
IExpression expression =
BinaryExpression.and(
new SingleSeriesExpression(
- new PartialPath(TestUtils.getTestSeries(0, 0)), ValueFilter.gtEq(5.0)),
+ new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE),
+ ValueFilter.gtEq(5.0)),
new SingleSeriesExpression(
- new PartialPath(TestUtils.getTestSeries(5, 0)), TimeFilter.ltEq(15)));
+ new MeasurementPath(TestUtils.getTestSeries(5, 0), TSDataType.DOUBLE),
+ TimeFilter.ltEq(15)));
groupByPlan.setExpression(expression);
ClusterGroupByVFilterDataSet dataSet =
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
index 8641106..5f13b6b 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/MergeGroupByExecutorTest.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.cluster.query.RemoteQueryContext;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -47,7 +48,7 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
@Test
public void testNoTimeFilter()
throws QueryProcessException, IOException, IllegalPathException, StorageEngineException {
- PartialPath path = new PartialPath(TestUtils.getTestSeries(0, 0));
+ PartialPath path = new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE);
TSDataType dataType = TSDataType.DOUBLE;
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
@@ -82,7 +83,7 @@ public class MergeGroupByExecutorTest extends BaseQueryTest {
@Test
public void testTimeFilter()
throws QueryProcessException, IOException, IllegalPathException, StorageEngineException {
- PartialPath path = new PartialPath(TestUtils.getTestSeries(0, 0));
+ PartialPath path = new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE);
TSDataType dataType = TSDataType.DOUBLE;
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
index 74376af..2ce2e5e 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/groupby/RemoteGroupByExecutorTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.cluster.query.reader.EmptyReader;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.aggregation.AggregationType;
@@ -52,7 +53,7 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
@Test
public void testNoTimeFilter()
throws QueryProcessException, IOException, StorageEngineException, IllegalPathException {
- PartialPath path = new PartialPath(TestUtils.getTestSeries(0, 0));
+ PartialPath path = new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE);
TSDataType dataType = TSDataType.DOUBLE;
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
@@ -110,7 +111,7 @@ public class RemoteGroupByExecutorTest extends BaseQueryTest {
@Test
public void testTimeFilter()
throws QueryProcessException, IOException, StorageEngineException, IllegalPathException {
- PartialPath path = new PartialPath(TestUtils.getTestSeries(0, 0));
+ PartialPath path = new MeasurementPath(TestUtils.getTestSeries(0, 0), TSDataType.DOUBLE);
TSDataType dataType = TSDataType.DOUBLE;
QueryContext context =
new RemoteQueryContext(QueryResourceManager.getInstance().assignQueryId(true));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
index 7eac181..85147bb 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/reader/ClusterReaderFactoryTest.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.query.reader.series.SeriesRawDataBatchReader;
import org.junit.Test;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
import static org.junit.Assert.assertNotNull;
@@ -53,8 +54,8 @@ public class ClusterReaderFactoryTest extends BaseQueryTest {
(SeriesRawDataBatchReader)
readerFactory.getSeriesBatchReader(
pathList.get(0),
- new HashSet<>(),
- dataTypes.get(0),
+ new HashSet<>(Collections.singletonList(pathList.get(0).getMeasurement())),
+ pathList.get(0).getSeriesType(),
null,
null,
context,
@@ -67,8 +68,8 @@ public class ClusterReaderFactoryTest extends BaseQueryTest {
(SeriesRawDataBatchReader)
readerFactory.getSeriesBatchReader(
pathList.get(0),
- new HashSet<>(),
- dataTypes.get(0),
+ new HashSet<>(Collections.singletonList(pathList.get(0).getMeasurement())),
+ pathList.get(0).getSeriesType(),
null,
null,
context,
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
index 61ece19..75f2912 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/DataGroupMemberTest.java
@@ -735,7 +735,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), raftId)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
+ request.setPath(TestUtils.getTestSeries(0, 0));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
@@ -803,7 +803,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), raftId)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
+ request.setPath(TestUtils.getTestSeries(0, 0));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
@@ -871,7 +871,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
+ request.setPath(TestUtils.getTestSeries(0, 0));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(1));
request.setQueryId(0);
@@ -939,7 +939,7 @@ public class DataGroupMemberTest extends BaseMember {
partitionTable.getHeaderGroup(new RaftNode(TestUtils.getNode(10), 0)));
dataGroupMember.setCharacter(NodeCharacter.LEADER);
SingleSeriesQueryRequest request = new SingleSeriesQueryRequest();
- request.setPath(Collections.singletonList(TestUtils.getTestSeries(0, 0)));
+ request.setPath(TestUtils.getTestSeries(0, 0));
request.setDataTypeOrdinal(TSDataType.DOUBLE.ordinal());
request.setRequester(TestUtils.getNode(10));
request.setQueryId(0);
@@ -991,8 +991,7 @@ public class DataGroupMemberTest extends BaseMember {
new DataAsyncService(dataGroupMember)
.getAllPaths(
TestUtils.getRaftNode(0, raftId), Collections.singletonList(path), false, handler);
- List<String> result = new ArrayList<>();
- pathResult.get().paths.forEach(p -> result.add(p.get(0)));
+ List<String> result = new ArrayList<>(pathResult.get().paths);
assertEquals(20, result.size());
for (int i = 0; i < 10; i++) {
assertTrue(result.contains(TestUtils.getTestSeries(0, i)));
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
index 8e5d845..a918a50 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
@@ -987,7 +987,7 @@ public class MetaGroupMemberTest extends BaseMember {
for (int i = 0; i < 10; i++) {
IReaderByTimestamp readerByTimestamp =
readerFactory.getReaderByTimestamp(
- new PartialPath(TestUtils.getTestSeries(i, 0)),
+ new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE),
deviceMeasurements,
TSDataType.DOUBLE,
context,
@@ -1049,7 +1049,7 @@ public class MetaGroupMemberTest extends BaseMember {
for (int i = 0; i < 10; i++) {
ManagedSeriesReader reader =
readerFactory.getSeriesReader(
- new PartialPath(TestUtils.getTestSeries(i, 0)),
+ new MeasurementPath(TestUtils.getTestSeries(i, 0), TSDataType.DOUBLE),
deviceMeasurements,
TSDataType.DOUBLE,
TimeFilter.gtEq(5),
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
index 1b9caeb..b8f83cd 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/mtree/MTree.java
@@ -35,7 +35,14 @@ import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter;
import org.apache.iotdb.db.metadata.MetadataConstant;
import org.apache.iotdb.db.metadata.logfile.MLogReader;
import org.apache.iotdb.db.metadata.logfile.MLogWriter;
-import org.apache.iotdb.db.metadata.mnode.*;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mnode.InternalMNode;
+import org.apache.iotdb.db.metadata.mnode.MNodeUtils;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.StorageGroupMNode;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.EntityCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MNodeCollector;
import org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
@@ -959,7 +966,7 @@ public class MTree implements Serializable {
MeasurementCollector<List<PartialPath>> collector =
new MeasurementCollector<List<PartialPath>>(root, pathPattern, limit, offset) {
@Override
- protected void collectMeasurement(IMeasurementMNode node) throws MetadataException {
+ protected void collectMeasurement(IMeasurementMNode node) {
MeasurementPath path = node.getMeasurementPath();
if (nodes[nodes.length - 1].equals(node.getAlias())) {
// only when user query with alias, the alias in path will be set
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
index cb3ebf2..d9853a4 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/path/MeasurementPath.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.tsfile.file.metadata.statistics.Statistics;
import org.apache.iotdb.tsfile.read.common.TimeRange;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.UnaryMeasurementSchema;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
import java.io.IOException;
@@ -65,6 +66,11 @@ public class MeasurementPath extends PartialPath {
super(measurementPath);
}
+ public MeasurementPath(String measurementPath, TSDataType type) throws IllegalPathException {
+ super(measurementPath);
+ this.measurementSchema = new UnaryMeasurementSchema(getMeasurement(), type);
+ }
+
public MeasurementPath(PartialPath measurementPath, IMeasurementSchema measurementSchema) {
super(measurementPath.getNodes());
this.measurementSchema = measurementSchema;
@@ -84,6 +90,10 @@ public class MeasurementPath extends PartialPath {
return getMeasurementSchema().getType();
}
+ public byte getSeriesTypeInByte() {
+ return getMeasurementSchema().getTypeInByte();
+ }
+
public void setMeasurementSchema(IMeasurementSchema measurementSchema) {
this.measurementSchema = measurementSchema;
}
diff --git a/thrift-cluster/src/main/thrift/cluster.thrift b/thrift-cluster/src/main/thrift/cluster.thrift
index 05459ae..1bbd9e3 100644
--- a/thrift-cluster/src/main/thrift/cluster.thrift
+++ b/thrift-cluster/src/main/thrift/cluster.thrift
@@ -184,7 +184,7 @@ struct PullSchemaResp {
}
struct SingleSeriesQueryRequest {
- 1: required list<string> path
+ 1: required string path
2: optional binary timeFilterBytes
3: optional binary valueFilterBytes
4: required long queryId
@@ -199,7 +199,7 @@ struct SingleSeriesQueryRequest {
}
struct MultSeriesQueryRequest {
- 1: required list<list<string>> path
+ 1: required list<string> path
2: optional binary timeFilterBytes
3: optional binary valueFilterBytes
4: required long queryId
@@ -263,8 +263,9 @@ struct LastQueryRequest {
}
struct GetAllPathsResult {
- 1: required list<list<string>> paths
- 2: optional list<string> aliasList
+ 1: required list<string> paths
+ 2: required list<byte> dataTypes
+ 3: optional list<string> aliasList
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
index 5680d0e..ba4bf84 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/IMeasurementSchema.java
@@ -39,6 +39,8 @@ public interface IMeasurementSchema {
TSDataType getType();
+ byte getTypeInByte();
+
void setType(TSDataType dataType);
TSEncoding getTimeTSEncoding();
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/UnaryMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/UnaryMeasurementSchema.java
index e2a867b..18f5348 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/UnaryMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/UnaryMeasurementSchema.java
@@ -204,6 +204,11 @@ public class UnaryMeasurementSchema
}
@Override
+ public byte getTypeInByte() {
+ return type;
+ }
+
+ @Override
public TSEncoding getTimeTSEncoding() {
return TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
index 5b79bd0..19f13e1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/schema/VectorMeasurementSchema.java
@@ -134,6 +134,11 @@ public class VectorMeasurementSchema
}
@Override
+ public byte getTypeInByte() {
+ return ((byte) 6);
+ }
+
+ @Override
public void setType(TSDataType dataType) {
throw new UnsupportedOperationException("unsupported method for VectorMeasurementSchema");
}