You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/11/19 10:32:17 UTC
[iotdb] 01/01: recomplete align by device
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch alignbydevice
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b297a99f8de18712470552e32ddcd568be36bc7a
Author: Alima777 <wx...@gmail.com>
AuthorDate: Fri Nov 19 18:27:24 2021 +0800
recomplete align by device
---
.../iotdb/db/qp/logical/crud/QueryOperator.java | 127 +++++++++++----------
.../db/qp/physical/crud/AlignByDevicePlan.java | 86 ++++++++------
.../iotdb/db/qp/physical/crud/MeasurementInfo.java | 37 ++----
.../iotdb/db/qp/physical/crud/QueryPlan.java | 11 +-
.../db/query/dataset/AlignByDeviceDataSet.java | 98 ++++------------
.../org/apache/iotdb/db/service/TSServiceImpl.java | 29 ++---
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 5 +-
7 files changed, 165 insertions(+), 228 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 0d2fb81..c4b5040 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -30,7 +30,6 @@ import org.apache.iotdb.db.qp.constant.SQLConstant;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
import org.apache.iotdb.db.qp.physical.crud.QueryIndexPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
@@ -173,8 +172,18 @@ public class QueryOperator extends Operator {
}
public void check() throws LogicalOperatorException {
- if (isAlignByDevice() && selectComponent.hasTimeSeriesGeneratingFunction()) {
- throw new LogicalOperatorException("ALIGN BY DEVICE clause is not supported in UDF queries.");
+ if (isAlignByDevice()) {
+ if (selectComponent.hasTimeSeriesGeneratingFunction()) {
+ throw new LogicalOperatorException(
+ "ALIGN BY DEVICE clause is not supported in UDF queries.");
+ }
+
+ for (PartialPath path : selectComponent.getPaths()) {
+ String device = path.getDevice();
+ if (!device.isEmpty()) {
+ throw new LogicalOperatorException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
+ }
+ }
}
}
@@ -230,74 +239,54 @@ public class QueryOperator extends Operator {
throws QueryProcessException {
AlignByDevicePlan alignByDevicePlan = new AlignByDevicePlan();
- List<PartialPath> prefixPaths = fromComponent.getPrefixPaths();
// remove stars in fromPaths and get deviceId with deduplication
- List<PartialPath> devices = removeStarsInDeviceWithUnique(prefixPaths);
- List<ResultColumn> resultColumns = selectComponent.getResultColumns();
+ List<PartialPath> devices = removeStarsInDeviceWithUnique(fromComponent.getPrefixPaths());
+ List<ResultColumn> resultColumns =
+ convertSpecialClauseValues(alignByDevicePlan, selectComponent.getResultColumns());
List<String> aggregationFuncs = selectComponent.getAggregationFunctions();
// to record result measurement columns
List<String> measurements = new ArrayList<>();
Map<String, MeasurementInfo> measurementInfoMap = new HashMap<>();
List<PartialPath> paths = new ArrayList<>();
+ List<String> aggregations = new ArrayList<>();
- for (int i = 0; i < resultColumns.size(); i++) { // per suffix in SELECT
+ // per suffix in SELECT
+ for (int i = 0; i < resultColumns.size(); i++) {
ResultColumn resultColumn = resultColumns.get(i);
- Expression suffixExpression = resultColumn.getExpression();
- PartialPath suffixPath = getSuffixPathFromExpression(suffixExpression);
+ PartialPath suffixPath = getSuffixPathFromExpression(resultColumn.getExpression());
String aggregation = aggregationFuncs != null ? aggregationFuncs.get(i) : null;
-
// to record measurements in the loop of a suffix path
Set<String> measurementSetOfGivenSuffix = new LinkedHashSet<>();
+
+ // concat suffix with per device
for (PartialPath device : devices) {
PartialPath fullPath = device.concatPath(suffixPath);
try {
// remove stars in SELECT to get actual paths
List<MeasurementPath> actualPaths = getMatchedTimeseries(fullPath);
- if (suffixPath.getNodes().length > 1) {
- throw new QueryProcessException(AlignByDevicePlan.MEASUREMENT_ERROR_MESSAGE);
- }
if (resultColumn.hasAlias() && actualPaths.size() >= 2) {
throw new QueryProcessException(
String.format(AlignByDevicePlan.ALIAS_ERROR_MESSAGE, resultColumn.getAlias()));
}
- if (actualPaths.isEmpty()) {
- String nonExistMeasurement = getMeasurementName(fullPath, aggregation);
- if (measurementSetOfGivenSuffix.add(nonExistMeasurement)) {
- measurementInfoMap.putIfAbsent(
- nonExistMeasurement, new MeasurementInfo(MeasurementType.NonExist));
+ for (MeasurementPath path : actualPaths) {
+ MeasurementInfo measurementInfo =
+ new MeasurementInfo(getMeasurementName(path, aggregation));
+ TSDataType columnDataType = path.getSeriesType();
+ if (aggregation != null) {
+ columnDataType = getAggregationType(aggregation);
+ aggregations.add(aggregation);
}
- } else {
- for (PartialPath path : actualPaths) {
- String measurementName = getMeasurementName(path, aggregation);
- TSDataType measurementDataType = path.getSeriesType();
- TSDataType columnDataType = getAggregationType(aggregation);
- columnDataType = columnDataType == null ? measurementDataType : columnDataType;
- MeasurementInfo measurementInfo =
- measurementInfoMap.getOrDefault(measurementName, new MeasurementInfo());
-
- if (resultColumn.hasAlias()) {
- measurementInfo.setMeasurementAlias(resultColumn.getAlias());
- }
-
- // check datatype consistency
- // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
- // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
- if (measurementInfo.getColumnDataType() != null) {
- if (!columnDataType.equals(measurementInfo.getColumnDataType())) {
- throw new QueryProcessException(
- "The data types of the same measurement column should be the same across devices.");
- }
- } else {
- measurementInfo.setColumnDataType(columnDataType);
- measurementInfo.setMeasurementDataType(measurementDataType);
- }
-
- measurementSetOfGivenSuffix.add(measurementName);
- measurementInfo.setMeasurementType(MeasurementType.Exist);
- measurementInfoMap.put(measurementName, measurementInfo);
- // update paths
- paths.add(path);
+ checkDataTypeConsistency(
+ columnDataType, measurementInfoMap.get(measurementInfo.getMeasurement()));
+
+ if (!measurementInfoMap.containsKey(measurementInfo.getMeasurement())) {
+ measurementInfo.setMeasurementAlias(
+ resultColumn.hasAlias() ? resultColumn.getAlias() : null);
+ measurementInfo.setColumnDataType(columnDataType);
+ measurementInfoMap.put(measurementInfo.getMeasurement(), measurementInfo);
}
+ measurementSetOfGivenSuffix.add(measurementInfo.getMeasurement());
+ paths.add(path);
}
} catch (MetadataException | QueryProcessException e) {
throw new QueryProcessException(e.getMessage());
@@ -313,22 +302,33 @@ public class QueryOperator extends Operator {
measurements.addAll(measurementSetOfGivenSuffix);
}
- List<String> trimMeasurements = convertSpecialClauseValues(alignByDevicePlan, measurements);
// assigns to alignByDevicePlan
- alignByDevicePlan.setMeasurements(trimMeasurements);
- alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
- alignByDevicePlan.setDevices(devices);
+ alignByDevicePlan.setMeasurements(measurements);
alignByDevicePlan.setPaths(paths);
+ alignByDevicePlan.setAggregations(aggregations);
+ alignByDevicePlan.setMeasurementInfoMap(measurementInfoMap);
alignByDevicePlan.setEnableTracing(enableTracing);
+ alignByDevicePlan.deduplicate(generator);
+
if (whereComponent != null) {
alignByDevicePlan.setDeviceToFilterMap(
- concatFilterByDevice(devices, whereComponent.getFilterOperator()));
+ concatFilterByDevice(alignByDevicePlan, devices, whereComponent.getFilterOperator()));
}
return alignByDevicePlan;
}
+ private void checkDataTypeConsistency(TSDataType checkedDataType, MeasurementInfo measurementInfo)
+ throws QueryProcessException {
+ // check datatype consistency
+ // an inconsistent example: select s0 from root.sg1.d1, root.sg1.d2 align by device
+ // while root.sg1.d1.s0 is INT32 and root.sg1.d2.s0 is FLOAT.
+ if (measurementInfo != null && !checkedDataType.equals(measurementInfo.getColumnDataType())) {
+ throw new QueryProcessException(AlignByDevicePlan.DATATYPE_ERROR_MESSAGE);
+ }
+ }
+
private void convertSpecialClauseValues(QueryPlan queryPlan) {
if (specialClauseComponent != null) {
queryPlan.setWithoutAllNull(specialClauseComponent.isWithoutAllNull());
@@ -340,16 +340,16 @@ public class QueryOperator extends Operator {
}
}
- private List<String> convertSpecialClauseValues(QueryPlan queryPlan, List<String> measurements)
- throws QueryProcessException {
+ private List<ResultColumn> convertSpecialClauseValues(
+ QueryPlan queryPlan, List<ResultColumn> resultColumns) throws QueryProcessException {
convertSpecialClauseValues(queryPlan);
// sLimit trim on the measurementColumnList
if (specialClauseComponent.hasSlimit()) {
int seriesSLimit = specialClauseComponent.getSeriesLimit();
int seriesOffset = specialClauseComponent.getSeriesOffset();
- return slimitTrimColumn(measurements, seriesSLimit, seriesOffset);
+ return slimitTrimColumn(resultColumns, seriesSLimit, seriesOffset);
}
- return measurements;
+ return resultColumns;
}
private List<PartialPath> removeStarsInDeviceWithUnique(List<PartialPath> paths)
@@ -390,9 +390,10 @@ public class QueryOperator extends Operator {
return initialMeasurement;
}
- private List<String> slimitTrimColumn(List<String> columnList, int seriesLimit, int seriesOffset)
+ private List<ResultColumn> slimitTrimColumn(
+ List<ResultColumn> resultColumns, int seriesLimit, int seriesOffset)
throws QueryProcessException {
- int size = columnList.size();
+ int size = resultColumns.size();
// check parameter range
if (seriesOffset >= size) {
@@ -406,14 +407,15 @@ public class QueryOperator extends Operator {
}
// trim seriesPath list
- return new ArrayList<>(columnList.subList(seriesOffset, endPosition));
+ return new ArrayList<>(resultColumns.subList(seriesOffset, endPosition));
}
// e.g. translate "select * from root.ln.d1, root.ln.d2 where s1 < 20 AND s2 > 10" to
// [root.ln.d1 -> root.ln.d1.s1 < 20 AND root.ln.d1.s2 > 10,
// root.ln.d2 -> root.ln.d2.s1 < 20 AND root.ln.d2.s2 > 10)]
private Map<String, IExpression> concatFilterByDevice(
- List<PartialPath> devices, FilterOperator operator) throws QueryProcessException {
+ AlignByDevicePlan alignByDevicePlan, List<PartialPath> devices, FilterOperator operator)
+ throws QueryProcessException {
Map<String, IExpression> deviceToFilterMap = new HashMap<>();
Set<PartialPath> filterPaths = new HashSet<>();
Iterator<PartialPath> deviceIterator = devices.iterator();
@@ -425,6 +427,7 @@ public class QueryOperator extends Operator {
concatFilterPath(device, newOperator, filterPaths);
} catch (LogicalOptimizeException | MetadataException e) {
deviceIterator.remove();
+ alignByDevicePlan.removeDevice(device.getFullPath());
continue;
}
// transform to a list so it can be indexed
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
index a4079a1..c1c0897 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/AlignByDevicePlan.java
@@ -22,11 +22,14 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.logical.Operator;
import org.apache.iotdb.db.qp.logical.Operator.OperatorType;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
public class AlignByDevicePlan extends QueryPlan {
@@ -34,14 +37,17 @@ public class AlignByDevicePlan extends QueryPlan {
"The paths of the SELECT clause can only be measurements or STAR.";
public static final String ALIAS_ERROR_MESSAGE =
"alias %s can only be matched with one time series";
+ public static final String DATATYPE_ERROR_MESSAGE =
+ "The data types of the same measurement column should be the same across devices.";
// to record result measurement columns, e.g. temperature, status, speed
private List<String> measurements;
- private List<TSDataType> dataTypes;
private Map<String, MeasurementInfo> measurementInfoMap;
+ private List<PartialPath> deduplicatePaths;
+ private List<String> aggregations;
- // to check data type consistency for the same name sensor of different devices
- private List<PartialPath> devices;
+ // paths index of each device that need to execute
+ private Map<String, List<Integer>> deviceToPathIndex = new LinkedHashMap<>();
private Map<String, IExpression> deviceToFilterMap;
private GroupByTimePlan groupByTimePlan;
@@ -54,7 +60,40 @@ public class AlignByDevicePlan extends QueryPlan {
@Override
public void deduplicate(PhysicalGenerator physicalGenerator) {
- // do nothing
+ Set<PartialPath> deduplicatePaths = new LinkedHashSet<>();
+ List<String> deduplicatedAggregations = new ArrayList<>();
+ for (int i = 0; i < paths.size(); i++) {
+ PartialPath path = paths.get(i);
+ if (!deduplicatePaths.contains(path)) {
+ deduplicatePaths.add(path);
+ if (this.aggregations != null) {
+ deduplicatedAggregations.add(this.aggregations.get(i));
+ }
+ deviceToPathIndex
+ .computeIfAbsent(path.getDevice(), k -> new ArrayList<>())
+ .add(deduplicatePaths.size() - 1);
+ }
+ }
+ // paths are deduplicated from here
+ this.deduplicatePaths = new ArrayList<>(deduplicatePaths);
+ setAggregations(deduplicatedAggregations);
+ this.paths = null;
+ }
+
+ public List<PartialPath> getDeduplicatePaths() {
+ return deduplicatePaths;
+ }
+
+ public void removeDevice(String device) {
+ deviceToPathIndex.remove(device);
+ }
+
+ public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
+ this.measurementInfoMap = measurementInfoMap;
+ }
+
+ public Map<String, MeasurementInfo> getMeasurementInfoMap() {
+ return measurementInfoMap;
}
public void setMeasurements(List<String> measurements) {
@@ -65,21 +104,20 @@ public class AlignByDevicePlan extends QueryPlan {
return measurements;
}
- @Override
- public List<TSDataType> getDataTypes() {
- return dataTypes;
+ public List<String> getAggregations() {
+ return aggregations;
}
- public void setDataTypes(List<TSDataType> dataTypes) {
- this.dataTypes = dataTypes;
+ public void setAggregations(List<String> aggregations) {
+ this.aggregations = aggregations.isEmpty() ? null : aggregations;
}
- public void setDevices(List<PartialPath> devices) {
- this.devices = devices;
+ public Map<String, List<Integer>> getDeviceToPathIndex() {
+ return deviceToPathIndex;
}
- public List<PartialPath> getDevices() {
- return devices;
+ public void setDeviceToPathIndex(Map<String, List<Integer>> deviceToPathIndex) {
+ this.deviceToPathIndex = deviceToPathIndex;
}
public Map<String, IExpression> getDeviceToFilterMap() {
@@ -116,24 +154,4 @@ public class AlignByDevicePlan extends QueryPlan {
this.aggregationPlan = aggregationPlan;
this.setOperatorType(Operator.OperatorType.AGGREGATION);
}
-
- public void setMeasurementInfoMap(Map<String, MeasurementInfo> measurementInfoMap) {
- this.measurementInfoMap = measurementInfoMap;
- }
-
- public Map<String, MeasurementInfo> getMeasurementInfoMap() {
- return measurementInfoMap;
- }
-
- /**
- * Exist: the measurements which don't belong to NonExist and Constant. NonExist: the measurements
- * that do not exist in any device, data type is considered as String. The value is considered as
- * null. Constant: the measurements that have quotation mark. e.g. "abc",'11'. The data type is
- * considered as String and the value is the measurement name.
- */
- public enum MeasurementType {
- Exist,
- NonExist,
- Constant
- }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
index 7c6dd17..c91493c 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/MeasurementInfo.java
@@ -18,51 +18,38 @@
*/
package org.apache.iotdb.db.qp.physical.crud;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
public class MeasurementInfo {
public MeasurementInfo() {}
- public MeasurementInfo(MeasurementType measurementType) {
- this.measurementType = measurementType;
+ public MeasurementInfo(String measurement) {
+ this.measurement = measurement;
}
+ private String measurement;
+
// select s1, s2 as speed from root, then s2 -> speed
private String measurementAlias;
- // to record different kinds of measurement
- private MeasurementType measurementType;
-
- // to record the real type of the measurement, used for actual query
- private TSDataType measurementDataType;
-
// to record the datatype of the column in the result set
private TSDataType columnDataType;
- public void setMeasurementAlias(String measurementAlias) {
- this.measurementAlias = measurementAlias;
+ public void setMeasurement(String measurement) {
+ this.measurement = measurement;
}
- public String getMeasurementAlias() {
- return measurementAlias;
+ public String getMeasurement() {
+ return measurement;
}
- public void setMeasurementType(MeasurementType measurementType) {
- this.measurementType = measurementType;
- }
-
- public MeasurementType getMeasurementType() {
- return measurementType;
- }
-
- public void setMeasurementDataType(TSDataType measurementDataType) {
- this.measurementDataType = measurementDataType;
+ public void setMeasurementAlias(String measurementAlias) {
+ this.measurementAlias = measurementAlias;
}
- public TSDataType getMeasurementDataType() {
- return measurementDataType;
+ public String getMeasurementAlias() {
+ return measurementAlias;
}
public void setColumnDataType(TSDataType columnDataType) {
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
index e947144..83da2c0 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/QueryPlan.java
@@ -74,14 +74,11 @@ public abstract class QueryPlan extends PhysicalPlan {
@Override
public void setPaths(List<PartialPath> paths) {
- if (paths == null) this.paths = null; // align by device
- else {
- List<MeasurementPath> measurementPaths = new ArrayList<>();
- for (PartialPath path : paths) {
- measurementPaths.add((MeasurementPath) path);
- }
- this.paths = measurementPaths;
+ List<MeasurementPath> measurementPaths = new ArrayList<>();
+ for (PartialPath path : paths) {
+ measurementPaths.add((MeasurementPath) path);
}
+ this.paths = measurementPaths;
}
public List<TSDataType> getDataTypes() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index 620bfae..ce957cb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -19,20 +19,15 @@
package org.apache.iotdb.db.query.dataset;
import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
-import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan.MeasurementType;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.MeasurementInfo;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.executor.IQueryRouter;
-import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.rpc.RedirectException;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -48,8 +43,6 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
/** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
public class AlignByDeviceDataSet extends QueryDataSet {
@@ -60,9 +53,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private IExpression expression;
private List<String> measurements;
- private List<PartialPath> devices;
+ private List<PartialPath> paths;
+ private List<String> aggregations;
+ private Map<String, List<Integer>> deviceToPathIndex;
private Map<String, IExpression> deviceToFilterMap;
- private Map<String, MeasurementInfo> measurementInfoMap;
private GroupByTimePlan groupByTimePlan;
private FillQueryPlan fillQueryPlan;
@@ -70,24 +64,26 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private RawDataQueryPlan rawDataQueryPlan;
private boolean curDataSetInitialized;
- private PartialPath currentDevice;
private QueryDataSet currentDataSet;
- private Iterator<PartialPath> deviceIterator;
+ private Iterator<String> deviceIterator;
+ private String currentDevice;
private List<String> executeColumns;
private int pathsNum = 0;
public AlignByDeviceDataSet(
AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) {
- super(null, alignByDevicePlan.getDataTypes());
+ super(null, null);
// align by device's column number is different from other datasets
// TODO I don't know whether it's right or not in AlignedPath, remember to check here while
// adapting AlignByDevice query for new vector
- super.columnNum = alignByDevicePlan.getDataTypes().size();
+ super.columnNum = alignByDevicePlan.getMeasurements().size() + 1; // + 1 for 'device'
this.measurements = alignByDevicePlan.getMeasurements();
- this.devices = alignByDevicePlan.getDevices();
- this.measurementInfoMap = alignByDevicePlan.getMeasurementInfoMap();
+ this.paths = alignByDevicePlan.getDeduplicatePaths();
+ this.aggregations = alignByDevicePlan.getAggregations();
this.queryRouter = queryRouter;
this.context = context;
+ this.deviceIterator = alignByDevicePlan.getDeviceToPathIndex().keySet().iterator();
+ this.deviceToPathIndex = alignByDevicePlan.getDeviceToPathIndex();
this.deviceToFilterMap = alignByDevicePlan.getDeviceToFilterMap();
switch (alignByDevicePlan.getOperatorType()) {
@@ -115,7 +111,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
this.curDataSetInitialized = false;
- this.deviceIterator = devices.iterator();
}
public int getPathsNum() {
@@ -133,37 +128,22 @@ public class AlignByDeviceDataSet extends QueryDataSet {
while (deviceIterator.hasNext()) {
currentDevice = deviceIterator.next();
- // get all measurements of current device
- Map<String, MeasurementPath> measurementToPathMap =
- getMeasurementsUnderGivenDevice(currentDevice);
- Set<String> measurementOfGivenDevice = measurementToPathMap.keySet();
-
- // extract paths and aggregations queried from all measurements
- // executeColumns is for calculating rowRecord
executeColumns = new ArrayList<>();
List<PartialPath> executePaths = new ArrayList<>();
List<String> executeAggregations = new ArrayList<>();
- for (Entry<String, MeasurementInfo> entry : measurementInfoMap.entrySet()) {
- if (entry.getValue().getMeasurementType() != MeasurementType.Exist) {
- continue;
- }
- String column = entry.getKey();
- String measurement = column;
- if (dataSetType == DataSetType.GROUPBYTIME || dataSetType == DataSetType.AGGREGATE) {
- measurement = column.substring(column.indexOf('(') + 1, column.indexOf(')'));
- if (measurementOfGivenDevice.contains(measurement)) {
- executeAggregations.add(column.substring(0, column.indexOf('(')));
- }
- }
- if (measurementOfGivenDevice.contains(measurement)) {
- executeColumns.add(column);
- executePaths.add(measurementToPathMap.get(measurement));
+ for (int i : deviceToPathIndex.get(currentDevice)) {
+ executePaths.add(paths.get(i));
+ String executeColumn = paths.get(i).getMeasurement();
+ if (aggregations != null) {
+ executeAggregations.add(aggregations.get(i));
+ executeColumn = String.format("%s(%s)", aggregations.get(i), executeColumn);
}
+ executeColumns.add(executeColumn);
}
// get filter to execute for the current device
if (deviceToFilterMap != null) {
- this.expression = deviceToFilterMap.get(currentDevice.getFullPath());
+ this.expression = deviceToFilterMap.get(currentDevice);
}
// for tracing: try to calculate the number of series paths
@@ -219,23 +199,6 @@ public class AlignByDeviceDataSet extends QueryDataSet {
return false;
}
- /** Get all measurements under given device. */
- protected Map<String, MeasurementPath> getMeasurementsUnderGivenDevice(PartialPath device)
- throws IOException {
- try {
- // TODO: Implement this method in Cluster MManager
- Map<String, MeasurementPath> measurementToPathMap = new HashMap<>();
- List<MeasurementPath> measurementPaths =
- IoTDB.metaManager.getAllMeasurementByDevicePath(device);
- for (MeasurementPath measurementPath : measurementPaths) {
- measurementToPathMap.put(measurementPath.getMeasurement(), measurementPath);
- }
- return measurementToPathMap;
- } catch (MetadataException e) {
- throw new IOException("Cannot get node from " + device, e);
- }
- }
-
@Override
public RowRecord nextWithoutConstraint() throws IOException {
RowRecord originRowRecord = currentDataSet.next();
@@ -243,7 +206,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
RowRecord rowRecord = new RowRecord(originRowRecord.getTimestamp());
Field deviceField = new Field(TSDataType.TEXT);
- deviceField.setBinaryV(new Binary(currentDevice.getFullPath()));
+ deviceField.setBinaryV(new Binary(currentDevice));
rowRecord.addField(deviceField);
// device field should not be considered as a value field it should affect the WITHOUT NULL
// judgement
@@ -256,23 +219,10 @@ public class AlignByDeviceDataSet extends QueryDataSet {
}
for (String measurement : measurements) {
- MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
- switch (measurementInfo.getMeasurementType()) {
- case Exist:
- if (currentColumnMap.get(measurement) != null) {
- rowRecord.addField(currentColumnMap.get(measurement));
- } else {
- rowRecord.addField(new Field(null));
- }
- break;
- case NonExist:
- rowRecord.addField(new Field(null));
- break;
- case Constant:
- Field res = new Field(TSDataType.TEXT);
- res.setBinaryV(Binary.valueOf(measurement));
- rowRecord.addField(res);
- break;
+ if (currentColumnMap.get(measurement) != null) {
+ rowRecord.addField(currentColumnMap.get(measurement));
+ } else {
+ rowRecord.addField(new Field(null));
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 505c5a9..53fcb0f 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -883,42 +883,27 @@ public class TSServiceImpl extends BasicServiceProvider implements TSIService.If
// get column types and do deduplication
columnTypes.add(TSDataType.TEXT.toString()); // the DEVICE column of ALIGN_BY_DEVICE result
- List<TSDataType> deduplicatedColumnsType = new ArrayList<>();
- deduplicatedColumnsType.add(TSDataType.TEXT); // the DEVICE column of ALIGN_BY_DEVICE result
Set<String> deduplicatedMeasurements = new LinkedHashSet<>();
- Map<String, MeasurementInfo> measurementInfoMap = plan.getMeasurementInfoMap();
-
// build column header with constant and non exist column and deduplication
List<String> measurements = plan.getMeasurements();
for (String measurement : measurements) {
- MeasurementInfo measurementInfo = measurementInfoMap.get(measurement);
+ MeasurementInfo measurementInfo = plan.getMeasurementInfoMap().get(measurement);
TSDataType type = TSDataType.TEXT;
- switch (measurementInfo.getMeasurementType()) {
- case Exist:
- type = measurementInfo.getColumnDataType();
- break;
- case NonExist:
- case Constant:
- type = TSDataType.TEXT;
+ String measurementName = measurement;
+ if (measurementInfo != null) {
+ type = measurementInfo.getColumnDataType();
+ measurementName = measurementInfo.getMeasurementAlias();
}
- String measurementAlias = measurementInfo.getMeasurementAlias();
- respColumns.add(measurementAlias != null ? measurementAlias : measurement);
+ respColumns.add(measurementName != null ? measurementName : measurement);
columnTypes.add(type.toString());
- if (!deduplicatedMeasurements.contains(measurement)) {
- deduplicatedMeasurements.add(measurement);
- deduplicatedColumnsType.add(type);
- }
+ deduplicatedMeasurements.add(measurement);
}
// save deduplicated measurementColumn names and types in QueryPlan for the next stage to use.
// i.e., used by AlignByDeviceDataSet constructor in `fetchResults` stage.
plan.setMeasurements(new ArrayList<>(deduplicatedMeasurements));
- plan.setDataTypes(deduplicatedColumnsType);
-
- // set these null since they are never used henceforth in ALIGN_BY_DEVICE query processing.
- plan.setPaths(null);
}
private TSExecuteStatementResp executeSelectIntoStatement(
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 7467407..4e99c9f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -330,11 +330,8 @@ public class IoTDBAlignByDeviceIT {
"103,root.vehicle.d0,199,null,",
"104,root.vehicle.d0,190,null,",
"105,root.vehicle.d0,199,11.11,",
- "106,root.vehicle.d0,null,null,",
"1000,root.vehicle.d0,55555,1000.11,",
"946684800000,root.vehicle.d0,100,null,",
- "1,root.vehicle.d1,null,null,",
- "1000,root.vehicle.d1,null,null,",
};
Class.forName(Config.JDBC_DRIVER_NAME);
@@ -371,7 +368,7 @@ public class IoTDBAlignByDeviceIT {
Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
cnt++;
}
- Assert.assertEquals(16, cnt);
+ Assert.assertEquals(13, cnt);
}
} catch (Exception e) {
e.printStackTrace();