You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/02/16 00:57:09 UTC
[iotdb] branch master updated: [IOTDB-2532] Query with align by device can't get value after clear cache (#5045)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9195d3e [IOTDB-2532] Query with align by device can't get value after clear cache (#5045)
9195d3e is described below
commit 9195d3edae4f41889ad693db311a5a78613cf8b9
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Feb 16 08:55:54 2022 +0800
[IOTDB-2532] Query with align by device can't get value after clear cache (#5045)
---
.../iotdb/cluster/query/ClusterQueryRouter.java | 20 +---
.../cluster/query/ClusterQueryRouterTest.java | 2 +
.../iotdb/db/integration/IoTDBAlignByDeviceIT.java | 96 +++++++++++++++++
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 43 +++++++-
.../qp/logical/crud/GroupByFillQueryOperator.java | 10 ++
.../db/qp/logical/crud/GroupByQueryOperator.java | 34 ++++++
.../iotdb/db/qp/logical/crud/QueryOperator.java | 36 +++++--
.../iotdb/db/qp/physical/crud/GroupByTimePlan.java | 27 +++++
.../db/qp/physical/crud/RawDataQueryPlan.java | 15 +++
.../db/query/dataset/AlignByDeviceDataSet.java | 23 ++++-
.../dataset/groupby/GroupByEngineDataSet.java | 10 +-
.../iotdb/db/query/executor/QueryRouter.java | 114 +++------------------
.../iotdb/db/qp/physical/PhysicalPlanTest.java | 19 ++--
13 files changed, 304 insertions(+), 145 deletions(-)
diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
index c1f5bbe..19dde2f 100644
--- a/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
+++ b/cluster/src/main/java/org/apache/iotdb/cluster/query/ClusterQueryRouter.java
@@ -41,14 +41,10 @@ import org.apache.iotdb.db.query.executor.FillQueryExecutor;
import org.apache.iotdb.db.query.executor.LastQueryExecutor;
import org.apache.iotdb.db.query.executor.QueryRouter;
import org.apache.iotdb.db.query.executor.RawDataQueryExecutor;
-import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import java.io.IOException;
-import java.util.ArrayList;
public class ClusterQueryRouter extends QueryRouter {
@@ -94,21 +90,9 @@ public class ClusterQueryRouter extends QueryRouter {
@Override
public QueryDataSet udtfQuery(UDTFPlan udtfPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException, InterruptedException {
- IExpression expression = udtfPlan.getExpression();
- IExpression optimizedExpression;
- try {
- optimizedExpression =
- expression == null
- ? null
- : ExpressionOptimizer.getInstance()
- .optimize(expression, new ArrayList<>(udtfPlan.getDeduplicatedPaths()));
- } catch (QueryFilterOptimizationException e) {
- throw new StorageEngineException(e.getMessage());
- }
- udtfPlan.setExpression(optimizedExpression);
-
boolean withValueFilter =
- optimizedExpression != null && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME;
+ udtfPlan.getExpression() != null
+ && udtfPlan.getExpression().getType() != ExpressionType.GLOBAL_TIME;
ClusterUDTFQueryExecutor clusterUDTFQueryExecutor =
new ClusterUDTFQueryExecutor(udtfPlan, metaGroupMember);
diff --git a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
index c6dfaa4..e1ed020 100644
--- a/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
+++ b/cluster/src/test/java/org/apache/iotdb/cluster/query/ClusterQueryRouterTest.java
@@ -60,6 +60,7 @@ import java.util.List;
import java.util.Map;
import static junit.framework.TestCase.assertEquals;
+import static org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan.getTimeExpression;
import static org.junit.Assert.assertFalse;
public class ClusterQueryRouterTest extends BaseQueryTest {
@@ -275,6 +276,7 @@ public class ClusterQueryRouterTest extends BaseQueryTest {
groupByPlan.setSlidingStep(5);
groupByPlan.setInterval(5);
+ groupByPlan.setExpression(getTimeExpression(groupByPlan));
QueryDataSet dataSet = clusterQueryRouter.groupBy(groupByPlan, queryContext);
Object[][] answers =
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
index 4e6c6b3..18ebf76 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBAlignByDeviceIT.java
@@ -467,6 +467,102 @@ public class IoTDBAlignByDeviceIT {
}
@Test
+ public void selectDifferentSeriesWithValueFilterWithoutCacheTest() {
+ String[] retArray =
+ new String[] {
+ "100,root.vehicle.d0,99,",
+ "101,root.vehicle.d0,99,",
+ "102,root.vehicle.d0,80,",
+ "103,root.vehicle.d0,99,",
+ "104,root.vehicle.d0,90,",
+ "105,root.vehicle.d0,99,",
+ };
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CLEAR CACHE");
+ // single device
+ boolean hasResultSet =
+ statement.execute("select s0 from root.vehicle.d0 where s1 < 200 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ List<Integer> actualIndexToExpectedIndexList =
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,s0",
+ new int[] {Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER});
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] expectedStrings = retArray[cnt].split(",");
+ StringBuilder expectedBuilder = new StringBuilder();
+ StringBuilder actualBuilder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ actualBuilder.append(resultSet.getString(i)).append(",");
+ expectedBuilder
+ .append(expectedStrings[actualIndexToExpectedIndexList.get(i - 1)])
+ .append(",");
+ }
+ Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void selectDifferentSeriesWithBinaryValueFilterWithoutCacheTest() {
+ String[] retArray =
+ new String[] {
+ "105,root.vehicle.d0,99,",
+ };
+
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute("CLEAR CACHE");
+ // single device
+ boolean hasResultSet =
+ statement.execute(
+ "select s0 from root.vehicle.d0 where s1 < 200 and s2 > 10 align by device");
+ Assert.assertTrue(hasResultSet);
+
+ try (ResultSet resultSet = statement.getResultSet()) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ List<Integer> actualIndexToExpectedIndexList =
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,s0",
+ new int[] {Types.TIMESTAMP, Types.VARCHAR, Types.INTEGER});
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ String[] expectedStrings = retArray[cnt].split(",");
+ StringBuilder expectedBuilder = new StringBuilder();
+ StringBuilder actualBuilder = new StringBuilder();
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ actualBuilder.append(resultSet.getString(i)).append(",");
+ expectedBuilder
+ .append(expectedStrings[actualIndexToExpectedIndexList.get(i - 1)])
+ .append(",");
+ }
+ Assert.assertEquals(expectedBuilder.toString(), actualBuilder.toString());
+ cnt++;
+ }
+ Assert.assertEquals(retArray.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
public void aggregateTest() {
String[] retArray =
new String[] {"root.vehicle.d1,2,null,null,null,null,", "root.vehicle.d0,11,11,6,6,1,"};
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 9540444..ea825ca 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -79,7 +79,6 @@ import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
-import org.apache.iotdb.db.qp.physical.sys.*;
import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.AlterTimeSeriesPlan;
import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
@@ -114,6 +113,9 @@ import org.apache.iotdb.db.qp.physical.sys.ShowChildPathsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowFunctionsPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowLockInfoPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowNodesInTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPathsSetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPathsUsingTemplatePlan;
import org.apache.iotdb.db.qp.physical.sys.ShowPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowStorageGroupPlan;
import org.apache.iotdb.db.qp.physical.sys.ShowTTLPlan;
@@ -185,7 +187,41 @@ import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
-import static org.apache.iotdb.db.conf.IoTDBConstant.*;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_BOUNDARY;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_SCHEMA_TEMPLATE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_STORAGE_GROUP;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_COMPRESSION;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_DATATYPE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TIMESERIES_ENCODING;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_TTL;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_USER;
+import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_VALUE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_BUILTIN_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDAF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_EXTERNAL_UDTF;
+import static org.apache.iotdb.db.conf.IoTDBConstant.FUNCTION_TYPE_NATIVE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.QUERY_ID;
+import static org.apache.iotdb.db.conf.IoTDBConstant.STATEMENT;
import static org.apache.iotdb.rpc.TSStatusCode.INTERNAL_SERVER_ERROR;
import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
@@ -608,7 +644,8 @@ public class PlanExecutor implements IPlanExecutor {
}
protected AlignByDeviceDataSet getAlignByDeviceDataSet(
- AlignByDevicePlan plan, QueryContext context, IQueryRouter router) {
+ AlignByDevicePlan plan, QueryContext context, IQueryRouter router)
+ throws QueryProcessException {
return new AlignByDeviceDataSet(plan, context, router);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
index f6f71c6..517886d 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByFillQueryOperator.java
@@ -24,7 +24,9 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
public class GroupByFillQueryOperator extends GroupByQueryOperator {
@@ -58,4 +60,12 @@ public class GroupByFillQueryOperator extends GroupByQueryOperator {
return groupByTimeFillPlan;
}
+
+ @Override
+ protected IExpression optimizeExpression(IExpression expression, RawDataQueryPlan queryPlan)
+ throws QueryProcessException {
+ GroupByTimeFillPlan groupByFillPlan = (GroupByTimeFillPlan) queryPlan;
+ groupByFillPlan.initFillRange();
+ return super.optimizeExpression(expression, queryPlan);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByQueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByQueryOperator.java
index 3ed87ec..4301059 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByQueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/GroupByQueryOperator.java
@@ -20,11 +20,23 @@
package org.apache.iotdb.db.qp.logical.crud;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.qp.physical.crud.AlignByDevicePlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
+import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
+import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan.getTimeExpression;
public class GroupByQueryOperator extends AggregationQueryOperator {
@@ -65,4 +77,26 @@ public class GroupByQueryOperator extends AggregationQueryOperator {
return groupByTimePlan;
}
+
+ @Override
+ protected IExpression optimizeExpression(IExpression expression, RawDataQueryPlan queryPlan)
+ throws QueryProcessException {
+ GroupByTimePlan groupByTimePlan = (GroupByTimePlan) queryPlan;
+ List<PartialPath> selectedSeries = groupByTimePlan.getDeduplicatedPaths();
+ GlobalTimeExpression timeExpression = getTimeExpression(groupByTimePlan);
+
+ if (expression == null) {
+ expression = timeExpression;
+ } else {
+ expression = BinaryExpression.and(expression, timeExpression);
+ }
+
+ // optimize expression to an executable one
+ try {
+ return ExpressionOptimizer.getInstance()
+ .optimize(expression, new ArrayList<>(selectedSeries));
+ } catch (QueryFilterOptimizationException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
index 54290f1..fbefe7e 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/logical/crud/QueryOperator.java
@@ -39,8 +39,10 @@ import org.apache.iotdb.db.query.expression.ResultColumn;
import org.apache.iotdb.db.query.expression.unary.FunctionExpression;
import org.apache.iotdb.db.query.expression.unary.TimeSeriesOperand;
import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import java.util.ArrayList;
import java.util.HashMap;
@@ -200,11 +202,6 @@ public class QueryOperator extends Operator {
rawDataQueryPlan.setResultColumns(selectComponent.getResultColumns());
rawDataQueryPlan.setEnableTracing(enableTracing);
- // transform filter operator to expression
- if (whereComponent != null) {
- transformFilterOperatorToExpression(generator, rawDataQueryPlan);
- }
-
if (queryPlan instanceof QueryIndexPlan) {
((QueryIndexPlan) queryPlan).setIndexType(indexType);
((QueryIndexPlan) queryPlan).setProps(props);
@@ -219,22 +216,41 @@ public class QueryOperator extends Operator {
convertSpecialClauseValues(rawDataQueryPlan);
+ // transform filter operator to expression
+ IExpression expression = transformFilterOperatorToExpression();
+ expression = optimizeExpression(expression, (RawDataQueryPlan) queryPlan);
+ if (expression != null) {
+ ((RawDataQueryPlan) queryPlan).setExpression(expression);
+ }
+
return rawDataQueryPlan;
}
- protected void transformFilterOperatorToExpression(
- PhysicalGenerator generator, RawDataQueryPlan rawDataQueryPlan) throws QueryProcessException {
+ protected IExpression transformFilterOperatorToExpression() throws QueryProcessException {
+ if (whereComponent == null) {
+ return null;
+ }
FilterOperator filterOperator = whereComponent.getFilterOperator();
List<PartialPath> filterPaths = new ArrayList<>(filterOperator.getPathSet());
HashMap<PartialPath, TSDataType> pathTSDataTypeHashMap = new HashMap<>();
for (PartialPath filterPath : filterPaths) {
- rawDataQueryPlan.addFilterPathInDeviceToMeasurements(filterPath);
pathTSDataTypeHashMap.put(
filterPath,
SQLConstant.isReservedPath(filterPath) ? TSDataType.INT64 : filterPath.getSeriesType());
}
- IExpression expression = filterOperator.transformToExpression(pathTSDataTypeHashMap);
- rawDataQueryPlan.setExpression(expression);
+ return filterOperator.transformToExpression(pathTSDataTypeHashMap);
+ }
+
+ protected IExpression optimizeExpression(IExpression expression, RawDataQueryPlan queryPlan)
+ throws QueryProcessException {
+ try {
+ return expression == null
+ ? null
+ : ExpressionOptimizer.getInstance()
+ .optimize(expression, new ArrayList<>(queryPlan.getDeduplicatedPaths()));
+ } catch (QueryFilterOptimizationException e) {
+ throw new QueryProcessException(e.getMessage());
+ }
}
protected AlignByDevicePlan generateAlignByDevicePlan(PhysicalGenerator generator)
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
index dd61488..1fab54b 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/GroupByTimePlan.java
@@ -19,8 +19,13 @@
package org.apache.iotdb.db.qp.physical.crud;
import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.service.rpc.thrift.TSExecuteStatementResp;
+import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
+import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
+import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
import org.apache.thrift.TException;
@@ -108,4 +113,26 @@ public class GroupByTimePlan extends AggregationPlan {
public void setLeftCRightO(boolean leftCRightO) {
this.leftCRightO = leftCRightO;
}
+
+ public static GlobalTimeExpression getTimeExpression(GroupByTimePlan plan)
+ throws QueryProcessException {
+ if (plan.isSlidingStepByMonth() || plan.isIntervalByMonth()) {
+ if (!plan.isAscending()) {
+ throw new QueryProcessException("Group by month doesn't support order by time desc now.");
+ }
+ return new GlobalTimeExpression(
+ (new GroupByMonthFilter(
+ plan.getInterval(),
+ plan.getSlidingStep(),
+ plan.getStartTime(),
+ plan.getEndTime(),
+ plan.isSlidingStepByMonth(),
+ plan.isIntervalByMonth(),
+ SessionManager.getInstance().getCurrSessionTimeZone())));
+ } else {
+ return new GlobalTimeExpression(
+ new GroupByFilter(
+ plan.getInterval(), plan.getSlidingStep(), plan.getStartTime(), plan.getEndTime()));
+ }
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
index 2d1aa1d..5810551 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/physical/crud/RawDataQueryPlan.java
@@ -26,7 +26,9 @@ import org.apache.iotdb.db.qp.strategy.PhysicalGenerator;
import org.apache.iotdb.db.utils.SchemaUtils;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.expression.IBinaryExpression;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.utils.Pair;
import java.util.ArrayList;
@@ -96,6 +98,19 @@ public class RawDataQueryPlan extends QueryPlan {
public void setExpression(IExpression expression) throws QueryProcessException {
this.expression = expression;
+ updateDeviceMeasurementsUsingExpression(expression);
+ }
+
+ public void updateDeviceMeasurementsUsingExpression(IExpression expression) {
+ if (expression instanceof SingleSeriesExpression) {
+ Path path = ((SingleSeriesExpression) expression).getSeriesPath();
+ deviceToMeasurements
+ .computeIfAbsent(path.getDevice(), key -> new HashSet<>())
+ .add(path.getMeasurement());
+ } else if (expression instanceof IBinaryExpression) {
+ updateDeviceMeasurementsUsingExpression(((IBinaryExpression) expression).getLeft());
+ updateDeviceMeasurementsUsingExpression(((IBinaryExpression) expression).getRight());
+ }
}
public List<PartialPath> getDeduplicatedPaths() {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
index e138bb9..c26d335 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/AlignByDeviceDataSet.java
@@ -36,6 +36,8 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.expression.IExpression;
+import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
+import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -46,6 +48,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import static org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan.getTimeExpression;
+
/** This QueryDataSet is used for ALIGN_BY_DEVICE query result. */
public class AlignByDeviceDataSet extends QueryDataSet {
@@ -62,6 +66,7 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private GroupByTimePlan groupByTimePlan;
private GroupByTimeFillPlan groupByFillPlan;
+ private IExpression timeExpression;
private FillQueryPlan fillQueryPlan;
private AggregationPlan aggregationPlan;
private RawDataQueryPlan rawDataQueryPlan;
@@ -74,7 +79,8 @@ public class AlignByDeviceDataSet extends QueryDataSet {
private int pathsNum = 0;
public AlignByDeviceDataSet(
- AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter) {
+ AlignByDevicePlan alignByDevicePlan, QueryContext context, IQueryRouter queryRouter)
+ throws QueryProcessException {
super(null, null);
// align by device's column number is different from other datasets
// TODO I don't know whether it's right or not in AlignedPath, remember to check here while
@@ -94,11 +100,14 @@ public class AlignByDeviceDataSet extends QueryDataSet {
this.dataSetType = DataSetType.GROUP_BY_FILL;
this.groupByFillPlan = alignByDevicePlan.getGroupByFillPlan();
this.groupByFillPlan.setAscending(alignByDevicePlan.isAscending());
+ this.groupByFillPlan.initFillRange();
+ this.timeExpression = getTimeExpression(groupByFillPlan);
break;
case GROUP_BY_TIME:
this.dataSetType = DataSetType.GROUP_BY_TIME;
this.groupByTimePlan = alignByDevicePlan.getGroupByTimePlan();
this.groupByTimePlan.setAscending(alignByDevicePlan.isAscending());
+ this.timeExpression = getTimeExpression(groupByTimePlan);
break;
case AGGREGATION:
this.dataSetType = DataSetType.AGGREGATE;
@@ -153,6 +162,18 @@ public class AlignByDeviceDataSet extends QueryDataSet {
if (deviceToFilterMap != null) {
this.expression = deviceToFilterMap.get(currentDevice);
}
+ if (dataSetType == DataSetType.GROUP_BY_TIME || dataSetType == DataSetType.GROUP_BY_FILL) {
+ this.expression =
+ expression == null ? timeExpression : BinaryExpression.and(expression, timeExpression);
+ }
+ if (expression != null) {
+ try {
+ this.expression =
+ ExpressionOptimizer.getInstance().optimize(expression, new ArrayList<>(executePaths));
+ } catch (QueryFilterOptimizationException e) {
+ throw new IOException(e.getMessage());
+ }
+ }
// for tracing: try to calculate the number of series paths
if (context.isEnableTracing()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
index 8173504..ccd374b 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByEngineDataSet.java
@@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.dataset.groupby;
+import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -147,8 +148,13 @@ public abstract class GroupByEngineDataSet extends QueryDataSet {
this.queryId = context.getQueryId();
this.interval = groupByTimePlan.getInterval();
this.slidingStep = groupByTimePlan.getSlidingStep();
- this.startTime = groupByTimePlan.getStartTime();
- this.endTime = groupByTimePlan.getEndTime();
+ if (groupByTimePlan instanceof GroupByTimeFillPlan) {
+ this.startTime = ((GroupByTimeFillPlan) groupByTimePlan).getQueryStartTime();
+ this.endTime = ((GroupByTimeFillPlan) groupByTimePlan).getQueryEndTime();
+ } else {
+ this.startTime = groupByTimePlan.getStartTime();
+ this.endTime = groupByTimePlan.getEndTime();
+ }
this.leftCRightO = groupByTimePlan.isLeftCRightO();
this.ascending = groupByTimePlan.isAscending();
this.isIntervalByMonth = groupByTimePlan.isIntervalByMonth();
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
index f41558f..0d7fb49 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/QueryRouter.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.db.query.executor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
-import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.AggregationPlan;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimeFillPlan;
@@ -31,7 +30,6 @@ import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan;
import org.apache.iotdb.db.qp.physical.crud.UDAFPlan;
import org.apache.iotdb.db.qp.physical.crud.UDTFPlan;
import org.apache.iotdb.db.query.context.QueryContext;
-import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.query.dataset.groupby.GroupByEngineDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByFillDataSet;
import org.apache.iotdb.db.query.dataset.groupby.GroupByLevelDataSet;
@@ -42,12 +40,7 @@ import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.expression.ExpressionType;
-import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
-import org.apache.iotdb.tsfile.read.expression.util.ExpressionOptimizer;
-import org.apache.iotdb.tsfile.read.filter.GroupByFilter;
-import org.apache.iotdb.tsfile.read.filter.GroupByMonthFilter;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.query.dataset.EmptyDataSet;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
@@ -71,21 +64,6 @@ public class QueryRouter implements IQueryRouter {
@Override
public QueryDataSet rawDataQuery(RawDataQueryPlan queryPlan, QueryContext context)
throws StorageEngineException, QueryProcessException {
- IExpression expression = queryPlan.getExpression();
- List<PartialPath> deduplicatedPaths = queryPlan.getDeduplicatedPaths();
-
- IExpression optimizedExpression;
- try {
- optimizedExpression =
- expression == null
- ? null
- : ExpressionOptimizer.getInstance()
- .optimize(expression, new ArrayList<>(deduplicatedPaths));
- } catch (QueryFilterOptimizationException e) {
- throw new StorageEngineException(e.getMessage());
- }
- queryPlan.setExpression(optimizedExpression);
-
RawDataQueryExecutor rawDataQueryExecutor = getRawDataQueryExecutor(queryPlan);
if (!queryPlan.isAlignByTime()) {
@@ -95,11 +73,11 @@ public class QueryRouter implements IQueryRouter {
return rawDataQueryExecutor.executeNonAlign(context);
}
- if (optimizedExpression != null
- && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
+ if (queryPlan.getExpression() != null
+ && queryPlan.getExpression().getType() != ExpressionType.GLOBAL_TIME) {
return rawDataQueryExecutor.executeWithValueFilter(context);
- } else if (optimizedExpression != null
- && optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ } else if (queryPlan.getExpression() != null
+ && queryPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
Filter timeFilter = ((GlobalTimeExpression) queryPlan.getExpression()).getFilter();
TimeValuePairUtils.Intervals intervals = TimeValuePairUtils.extractTimeInterval(timeFilter);
if (intervals.isEmpty()) {
@@ -120,8 +98,7 @@ public class QueryRouter implements IQueryRouter {
@Override
public QueryDataSet aggregate(AggregationPlan aggregationPlan, QueryContext context)
- throws QueryFilterOptimizationException, StorageEngineException, QueryProcessException,
- IOException {
+ throws StorageEngineException, QueryProcessException, IOException {
if (logger.isDebugEnabled()) {
logger.debug(
@@ -135,24 +112,12 @@ public class QueryRouter implements IQueryRouter {
+ aggregationPlan.getDeduplicatedAggregations());
}
- IExpression expression = aggregationPlan.getExpression();
- List<PartialPath> deduplicatedPaths = aggregationPlan.getDeduplicatedPaths();
-
- // optimize expression to an executable one
- IExpression optimizedExpression =
- expression == null
- ? null
- : ExpressionOptimizer.getInstance()
- .optimize(expression, new ArrayList<>(deduplicatedPaths));
-
- aggregationPlan.setExpression(optimizedExpression);
-
AggregationExecutor engineExecutor = getAggregationExecutor(context, aggregationPlan);
QueryDataSet dataSet;
- if (optimizedExpression != null
- && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME) {
+ if (aggregationPlan.getExpression() != null
+ && aggregationPlan.getExpression().getType() != ExpressionType.GLOBAL_TIME) {
dataSet = engineExecutor.executeWithValueFilter(aggregationPlan);
} else {
dataSet = engineExecutor.executeWithoutValueFilter(aggregationPlan);
@@ -199,22 +164,6 @@ public class QueryRouter implements IQueryRouter {
return new AggregationExecutor(context, aggregationPlan);
}
- private IExpression getOptimizeExpression(GroupByTimePlan groupByTimePlan)
- throws QueryFilterOptimizationException, QueryProcessException {
- IExpression expression = groupByTimePlan.getExpression();
- List<PartialPath> selectedSeries = groupByTimePlan.getDeduplicatedPaths();
- GlobalTimeExpression timeExpression = getTimeExpression(groupByTimePlan);
-
- if (expression == null) {
- expression = timeExpression;
- } else {
- expression = BinaryExpression.and(expression, timeExpression);
- }
-
- // optimize expression to an executable one
- return ExpressionOptimizer.getInstance().optimize(expression, new ArrayList<>(selectedSeries));
- }
-
@Override
public QueryDataSet groupBy(GroupByTimePlan groupByTimePlan, QueryContext context)
throws QueryFilterOptimizationException, StorageEngineException, QueryProcessException,
@@ -229,10 +178,8 @@ public class QueryRouter implements IQueryRouter {
}
GroupByEngineDataSet dataSet;
- IExpression optimizedExpression = getOptimizeExpression(groupByTimePlan);
- groupByTimePlan.setExpression(optimizedExpression);
- if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ if (groupByTimePlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
dataSet = getGroupByWithoutValueFilterDataSet(context, groupByTimePlan);
((GroupByWithoutValueFilterDataSet) dataSet).initGroupBy(context, groupByTimePlan);
} else {
@@ -249,28 +196,6 @@ public class QueryRouter implements IQueryRouter {
return dataSet;
}
- private GlobalTimeExpression getTimeExpression(GroupByTimePlan plan)
- throws QueryProcessException {
- if (plan.isSlidingStepByMonth() || plan.isIntervalByMonth()) {
- if (!plan.isAscending()) {
- throw new QueryProcessException("Group by month doesn't support order by time desc now.");
- }
- return new GlobalTimeExpression(
- (new GroupByMonthFilter(
- plan.getInterval(),
- plan.getSlidingStep(),
- plan.getStartTime(),
- plan.getEndTime(),
- plan.isSlidingStepByMonth(),
- plan.isIntervalByMonth(),
- SessionManager.getInstance().getCurrSessionTimeZone())));
- } else {
- return new GlobalTimeExpression(
- new GroupByFilter(
- plan.getInterval(), plan.getSlidingStep(), plan.getStartTime(), plan.getEndTime()));
- }
- }
-
protected GroupByWithoutValueFilterDataSet getGroupByWithoutValueFilterDataSet(
QueryContext context, GroupByTimePlan plan) {
return new GroupByWithoutValueFilterDataSet(context, plan);
@@ -294,16 +219,13 @@ public class QueryRouter implements IQueryRouter {
@Override
public QueryDataSet groupByFill(GroupByTimeFillPlan groupByFillPlan, QueryContext context)
- throws QueryFilterOptimizationException, StorageEngineException, QueryProcessException {
+ throws StorageEngineException, QueryProcessException {
GroupByFillDataSet dataSet = new GroupByFillDataSet(context, groupByFillPlan);
-
groupByFillPlan.initFillRange();
- IExpression optimizedExpression = getOptimizeExpression(groupByFillPlan);
- groupByFillPlan.setExpression(optimizedExpression);
GroupByEngineDataSet engineDataSet;
- if (optimizedExpression.getType() == ExpressionType.GLOBAL_TIME) {
+ if (groupByFillPlan.getExpression().getType() == ExpressionType.GLOBAL_TIME) {
engineDataSet = getGroupByWithoutValueFilterDataSet(context, groupByFillPlan);
((GroupByWithoutValueFilterDataSet) engineDataSet).initGroupBy(context, groupByFillPlan);
} else {
@@ -331,21 +253,9 @@ public class QueryRouter implements IQueryRouter {
@Override
public QueryDataSet udtfQuery(UDTFPlan udtfPlan, QueryContext context)
throws StorageEngineException, QueryProcessException, IOException, InterruptedException {
- IExpression expression = udtfPlan.getExpression();
- IExpression optimizedExpression;
- try {
- optimizedExpression =
- expression == null
- ? null
- : ExpressionOptimizer.getInstance()
- .optimize(expression, new ArrayList<>(udtfPlan.getDeduplicatedPaths()));
- } catch (QueryFilterOptimizationException e) {
- throw new StorageEngineException(e.getMessage());
- }
- udtfPlan.setExpression(optimizedExpression);
-
boolean withValueFilter =
- optimizedExpression != null && optimizedExpression.getType() != ExpressionType.GLOBAL_TIME;
+ udtfPlan.getExpression() != null
+ && udtfPlan.getExpression().getType() != ExpressionType.GLOBAL_TIME;
UDFQueryExecutor udtfQueryExecutor = new UDFQueryExecutor(udtfPlan);
if (udtfPlan.isAlignByTime()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
index ea34f3b..76fe66a 100644
--- a/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/qp/physical/PhysicalPlanTest.java
@@ -66,12 +66,13 @@ import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.expression.IExpression;
-import org.apache.iotdb.tsfile.read.expression.impl.BinaryExpression;
import org.apache.iotdb.tsfile.read.expression.impl.GlobalTimeExpression;
import org.apache.iotdb.tsfile.read.expression.impl.SingleSeriesExpression;
import org.apache.iotdb.tsfile.read.filter.TimeFilter;
import org.apache.iotdb.tsfile.read.filter.ValueFilter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
+import org.apache.iotdb.tsfile.read.filter.operator.AndFilter;
+import org.apache.iotdb.tsfile.read.filter.operator.OrFilter;
import org.junit.After;
import org.junit.Assert;
@@ -655,11 +656,10 @@ public class PhysicalPlanTest {
PhysicalPlan plan = processor.parseSQLToPhysicalPlan(sqlStr);
IExpression queryFilter = ((RawDataQueryPlan) plan).getExpression();
IExpression expect =
- new GlobalTimeExpression(FilterFactory.and(TimeFilter.gt(50L), TimeFilter.ltEq(100L)));
- expect =
- BinaryExpression.or(
- expect,
- new SingleSeriesExpression(new Path("root.vehicle.d1", "s1"), ValueFilter.lt(10.0)));
+ new SingleSeriesExpression(
+ new Path("root.vehicle.d1", "s1"),
+ new OrFilter(
+ new AndFilter(TimeFilter.gt(50), TimeFilter.ltEq(100)), ValueFilter.lt(10.0)));
assertEquals(expect.toString(), queryFilter.toString());
}
@@ -670,9 +670,10 @@ public class PhysicalPlanTest {
IExpression queryFilter = ((RawDataQueryPlan) plan).getExpression();
IExpression expect =
- BinaryExpression.and(
- new SingleSeriesExpression(new Path("root.vehicle.d1", "s1"), ValueFilter.lt(10.0)),
- new GlobalTimeExpression(FilterFactory.and(TimeFilter.gt(50L), TimeFilter.ltEq(100L))));
+ new SingleSeriesExpression(
+ new Path("root.vehicle.d1", "s1"),
+ new AndFilter(
+ ValueFilter.lt(10.0), new AndFilter(TimeFilter.gt(50), TimeFilter.ltEq(100))));
assertEquals(expect.toString(), queryFilter.toString());