You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ro...@apache.org on 2022/01/15 08:53:55 UTC
[iotdb] 01/01: [IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set
This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch iotdb-2375
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1b0b7574948b8c3960d49bb842594cbacbb77e84
Author: Steve Yurong Su <ro...@apache.org>
AuthorDate: Sat Jan 15 16:52:54 2022 +0800
[IOTDB-2375][IOTDB-2376] CQ: Get a wrong result data set
---
.../db/integration/IoTDBContinuousQueryIT.java | 191 ++++++++++++++++++---
.../cq/ContinuousQuerySchemaCheckTask.java | 9 +-
.../db/{ => engine}/cq/ContinuousQueryService.java | 2 +-
.../db/{ => engine}/cq/ContinuousQueryTask.java | 155 ++++-------------
.../cq/ContinuousQueryTaskPoolManager.java | 2 +-
.../selectinto/InsertTabletPlansIterator.java | 51 +++++-
.../org/apache/iotdb/db/metadata/MManager.java | 2 +-
.../main/java/org/apache/iotdb/db/qp/Planner.java | 9 +
.../apache/iotdb/db/qp/executor/PlanExecutor.java | 2 +-
.../iotdb/db/query/expression/ResultColumn.java | 5 +
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +-
12 files changed, 275 insertions(+), 157 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
index 8a875db..9dcc99e 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBContinuousQueryIT.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.integration;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.itbase.category.LocalStandaloneTest;
import org.apache.iotdb.jdbc.Config;
@@ -53,6 +55,8 @@ public class IoTDBContinuousQueryIT {
private Connection connection;
private volatile Exception exception = null;
+ private PartialPath[] partialPathArray;
+
private final Thread dataGenerator =
new Thread() {
@Override
@@ -62,11 +66,13 @@ public class IoTDBContinuousQueryIT {
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
do {
- for (String timeSeries : timeSeriesArray) {
+ for (PartialPath partialPath : partialPathArray) {
statement.execute(
String.format(
- "insert into %s(timestamp, temperature) values(now(), %.3f)",
- timeSeries, 200 * Math.random()));
+ "insert into %s(timestamp, %s) values(now(), %.3f)",
+ partialPath.getDevicePath(),
+ partialPath.getMeasurement(),
+ 200 * Math.random()));
}
} while (!isInterrupted());
} catch (Exception e) {
@@ -84,22 +90,23 @@ public class IoTDBContinuousQueryIT {
dataGenerator.join();
}
- private final String[] timeSeriesArray = {
- "root.ln.wf01.wt01.ws01",
- "root.ln.wf01.wt01.ws02",
- "root.ln.wf01.wt02.ws01",
- "root.ln.wf01.wt02.ws02",
- "root.ln.wf02.wt01.ws01",
- "root.ln.wf02.wt01.ws02",
- "root.ln.wf02.wt02.ws01",
- "root.ln.wf02.wt02.ws02"
- };
-
- private void createTimeSeries() throws SQLException {
- for (String timeSeries : timeSeriesArray) {
+ private void createTimeSeries(String[] timeSeriesArray) throws SQLException {
+ initPartialPaths(timeSeriesArray);
+ for (PartialPath partialPath : partialPathArray) {
statement.execute(
String.format(
- "create timeseries %s.temperature with datatype=FLOAT,encoding=RLE", timeSeries));
+ "create timeseries %s with datatype=FLOAT,encoding=RLE", partialPath.getFullPath()));
+ }
+ }
+
+ private void initPartialPaths(String[] timeSeriesArray) {
+ partialPathArray = new PartialPath[timeSeriesArray.length];
+ for (int i = 0; i < timeSeriesArray.length; ++i) {
+ try {
+ partialPathArray[i] = new PartialPath(timeSeriesArray[i]);
+ } catch (IllegalPathException e) {
+ fail(e.getMessage());
+ }
}
}
@@ -120,7 +127,17 @@ public class IoTDBContinuousQueryIT {
@Test
public void testCreateAndDropContinuousQuery() throws Exception {
- createTimeSeries();
+ createTimeSeries(
+ new String[] {
+ "root.ln.wf01.wt01.ws01.temperature",
+ "root.ln.wf01.wt01.ws02.temperature",
+ "root.ln.wf01.wt02.ws01.temperature",
+ "root.ln.wf01.wt02.ws02.temperature",
+ "root.ln.wf02.wt01.ws01.temperature",
+ "root.ln.wf02.wt01.ws02.temperature",
+ "root.ln.wf02.wt02.ws01.temperature",
+ "root.ln.wf02.wt02.ws02.temperature"
+ });
statement.execute(
"CREATE CONTINUOUS QUERY cq1 "
@@ -182,8 +199,18 @@ public class IoTDBContinuousQueryIT {
}
@Test
- public void testContinuousQueryResultSeries() throws Exception {
- createTimeSeries();
+ public void testContinuousQueryResultSeriesWithLevels() throws Exception {
+ createTimeSeries(
+ new String[] {
+ "root.ln.wf01.wt01.ws01.temperature",
+ "root.ln.wf01.wt01.ws02.temperature",
+ "root.ln.wf01.wt02.ws01.temperature",
+ "root.ln.wf01.wt02.ws02.temperature",
+ "root.ln.wf02.wt01.ws01.temperature",
+ "root.ln.wf02.wt01.ws02.temperature",
+ "root.ln.wf02.wt02.ws01.temperature",
+ "root.ln.wf02.wt02.ws02.temperature"
+ });
startDataGenerator();
Thread.sleep(500);
@@ -215,8 +242,95 @@ public class IoTDBContinuousQueryIT {
}
@Test
+ public void testContinuousQueryResultSeriesWithDuplicatedTargetPaths() throws Exception {
+ createTimeSeries(
+ new String[] {
+ "root.ln.wf01.ws02.temperature",
+ "root.ln.wf01.ws01.temperature",
+ "root.ln.wf02.wt01.temperature",
+ "root.ln.wf02.wt02.temperature",
+ });
+ startDataGenerator();
+
+ Thread.sleep(500);
+
+ try {
+ statement.execute(
+ "CREATE CONTINUOUS QUERY cq1 "
+ + "BEGIN SELECT avg(temperature) INTO root.target.{2}.{3}.avg FROM root.ln.*.* "
+ + "GROUP BY time(1s) END");
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("duplicated"));
+ }
+ }
+
+ @Test
+ public void testContinuousQueryResultSeriesWithoutLevels1() throws Exception {
+ String[] timeSeriesArray = new String[30];
+ int wsIndex = 1;
+ for (int i = 1; i <= 30; ++i) {
+ timeSeriesArray[i - 1] =
+ "root.ln.wf0" + (i < 15 ? 1 : 2) + ".ws" + wsIndex++ + ".temperature";
+ }
+ createTimeSeries(timeSeriesArray);
+ startDataGenerator();
+
+ Thread.sleep(500);
+
+ statement.execute(
+ "CREATE CONTINUOUS QUERY cq1 "
+ + "BEGIN SELECT avg(temperature) INTO root.target.${2}.${3}_avg FROM root.ln.*.* "
+ + "GROUP BY time(1s) END");
+
+ Thread.sleep(5500);
+
+ checkShowTimeSeriesCount(2 * timeSeriesArray.length);
+
+ statement.execute("DROP CONTINUOUS QUERY cq1");
+
+ stopDataGenerator();
+ }
+
+ @Test
+ public void testContinuousQueryResultSeriesWithoutLevels2() throws Exception {
+ String[] timeSeriesArray = new String[30];
+ int wsIndex = 1;
+ for (int i = 1; i <= 30; ++i) {
+ timeSeriesArray[i - 1] =
+ "root.ln.wf0" + (i < 15 ? 1 : 2) + ".ws" + wsIndex++ + ".temperature";
+ }
+ createTimeSeries(timeSeriesArray);
+ startDataGenerator();
+
+ Thread.sleep(500);
+
+ statement.execute(
+ "CREATE CONTINUOUS QUERY cq1 "
+ + "BEGIN SELECT avg(temperature) INTO root.target.${2}.${3}.avg FROM root.ln.*.* "
+ + "GROUP BY time(1s) END");
+
+ Thread.sleep(5500);
+
+ checkShowTimeSeriesCount(2 * timeSeriesArray.length);
+
+ statement.execute("DROP CONTINUOUS QUERY cq1");
+
+ stopDataGenerator();
+ }
+
+ @Test
public void testInterval1000() throws Exception {
- createTimeSeries();
+ createTimeSeries(
+ new String[] {
+ "root.ln.wf01.wt01.ws01.temperature",
+ "root.ln.wf01.wt01.ws02.temperature",
+ "root.ln.wf01.wt02.ws01.temperature",
+ "root.ln.wf01.wt02.ws02.temperature",
+ "root.ln.wf02.wt01.ws01.temperature",
+ "root.ln.wf02.wt01.ws02.temperature",
+ "root.ln.wf02.wt02.ws01.temperature",
+ "root.ln.wf02.wt02.ws02.temperature"
+ });
startDataGenerator();
statement.execute(
@@ -232,7 +346,17 @@ public class IoTDBContinuousQueryIT {
@Test
public void testInterval2000() throws Exception {
- createTimeSeries();
+ createTimeSeries(
+ new String[] {
+ "root.ln.wf01.wt01.ws01.temperature",
+ "root.ln.wf01.wt01.ws02.temperature",
+ "root.ln.wf01.wt02.ws01.temperature",
+ "root.ln.wf01.wt02.ws02.temperature",
+ "root.ln.wf02.wt01.ws01.temperature",
+ "root.ln.wf02.wt01.ws02.temperature",
+ "root.ln.wf02.wt02.ws01.temperature",
+ "root.ln.wf02.wt02.ws02.temperature"
+ });
startDataGenerator();
statement.execute(
@@ -248,7 +372,17 @@ public class IoTDBContinuousQueryIT {
@Test
public void testInterval3000() throws Exception {
- createTimeSeries();
+ createTimeSeries(
+ new String[] {
+ "root.ln.wf01.wt01.ws01.temperature",
+ "root.ln.wf01.wt01.ws02.temperature",
+ "root.ln.wf01.wt02.ws01.temperature",
+ "root.ln.wf01.wt02.ws02.temperature",
+ "root.ln.wf02.wt01.ws01.temperature",
+ "root.ln.wf02.wt01.ws02.temperature",
+ "root.ln.wf02.wt02.ws01.temperature",
+ "root.ln.wf02.wt02.ws02.temperature"
+ });
startDataGenerator();
statement.execute(
@@ -345,4 +479,15 @@ public class IoTDBContinuousQueryIT {
Assert.assertTrue(collect.contains(s));
}
}
+
+ private void checkShowTimeSeriesCount(int expected) throws SQLException {
+ Assert.assertTrue(statement.execute("show timeseries"));
+ int autual = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ ++autual;
+ }
+ }
+ Assert.assertEquals(expected, autual);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
similarity index 91%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
index 73c0481..0f5dbbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQuerySchemaCheckTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQuerySchemaCheckTask.java
@@ -17,14 +17,16 @@
* under the License.
*/
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.ContinuousQueryException;
+import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.metadata.PathNotExistException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
import org.apache.iotdb.db.service.IoTDB;
@@ -47,8 +49,9 @@ public class ContinuousQuerySchemaCheckTask extends ContinuousQueryTask {
/** we only do some checks here. we don't write any data. */
@Override
- protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
- throws MetadataException, IOException, ContinuousQueryException {
+ protected void doInsert(
+ String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
+ throws MetadataException, StorageEngineException, IOException {
Set<PartialPath> targetPaths = new HashSet<>(generateTargetPaths(queryDataSet.getPaths()));
checkTargetPathNumber(queryDataSet, targetPaths);
checkTargetPathDataType(queryPlan, targetPaths);
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
similarity index 99%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
index 58e11f2..0b44b8c 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryService.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryService.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
similarity index 59%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
index f05c977..23bb596 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTask.java
@@ -16,28 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
import org.apache.iotdb.db.concurrent.WrappedRunnable;
import org.apache.iotdb.db.conf.IoTDBConstant;
+import org.apache.iotdb.db.engine.selectinto.InsertTabletPlansIterator;
+import org.apache.iotdb.db.exception.ContinuousQueryException;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.metadata.IllegalPathException;
import org.apache.iotdb.db.exception.metadata.MetadataException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.logical.Operator;
+import org.apache.iotdb.db.qp.logical.crud.QueryOperator;
import org.apache.iotdb.db.qp.physical.crud.GroupByTimePlan;
-import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertMultiTabletPlan;
import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.strategy.LogicalGenerator;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.control.QueryResourceManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.db.service.basic.ServiceProvider;
-import org.apache.iotdb.db.utils.TypeInferenceUtils;
import org.apache.iotdb.tsfile.exception.filter.QueryFilterOptimizationException;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
import org.apache.iotdb.tsfile.read.common.Path;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.thrift.TException;
@@ -46,8 +47,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
+import java.time.ZoneId;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -77,11 +78,18 @@ public class ContinuousQueryTask extends WrappedRunnable {
public void runMayThrow()
throws QueryProcessException, StorageEngineException, IOException, InterruptedException,
QueryFilterOptimizationException, MetadataException, TException, SQLException {
+ // construct logical operator
final String sql = generateSQL();
+ Operator operator = LogicalGenerator.generate(sql, ZoneId.systemDefault());
+ if (!operator.isQuery()) {
+ throw new ContinuousQueryException(
+ String.format("unsupported operation in cq task: %s", operator.getType().name()));
+ }
+ QueryOperator queryOperator = (QueryOperator) operator;
// construct query plan
final GroupByTimePlan queryPlan =
- (GroupByTimePlan) serviceProvider.getPlanner().parseSQLToPhysicalPlan(sql);
+ (GroupByTimePlan) serviceProvider.getPlanner().operatorToPhysicalPlan(queryOperator);
if (queryPlan.getDeduplicatedPaths().isEmpty()) {
if (continuousQueryPlan.isDebug()) {
LOGGER.info(continuousQueryPlan.getContinuousQueryName() + ": deduplicated paths empty.");
@@ -109,7 +117,7 @@ public class ContinuousQueryTask extends WrappedRunnable {
}
// insert data into target timeseries
- doInsert(queryDataSet, queryPlan);
+ doInsert(sql, queryOperator, queryPlan, queryDataSet);
} finally {
ServiceProvider.SESSION_MANAGER.releaseQueryResourceNoExceptions(queryId);
}
@@ -128,118 +136,23 @@ public class ContinuousQueryTask extends WrappedRunnable {
+ continuousQueryPlan.getQuerySqlAfterGroupByClause();
}
- protected void doInsert(QueryDataSet queryDataSet, GroupByTimePlan queryPlan)
- throws IOException, MetadataException, QueryProcessException, StorageEngineException {
- int columnSize = queryDataSet.getDataTypes().size();
- TSDataType dataType =
- TypeInferenceUtils.getAggrDataType(
- queryPlan.getAggregations().get(0), queryPlan.getDataTypes().get(0));
- InsertTabletPlan[] insertTabletPlans =
- generateInsertTabletPlans(columnSize, queryDataSet, dataType);
-
- int batchSize =
- (int)
- Math.min(
- EXECUTION_BATCH_SIZE,
- Math.ceil(
- (float) continuousQueryPlan.getForInterval()
- / (continuousQueryPlan.getGroupByTimeInterval())));
- Object[][] columns = constructColumns(columnSize, batchSize, dataType);
- long[][] timestamps = new long[columnSize][batchSize];
- int[] rowNums = new int[columnSize];
-
- boolean hasNext = true;
- while (hasNext) {
- int rowNum = 0;
-
- while (++rowNum <= batchSize) {
- if (!queryDataSet.hasNext()) {
- hasNext = false;
- break;
- }
- fillColumns(columns, dataType, queryDataSet.next(), rowNums, timestamps);
- }
-
- for (int i = 0; i < columnSize; i++) {
- if (rowNums[i] > 0) {
- insertTabletPlans[i].setTimes(timestamps[i]);
- insertTabletPlans[i].setColumns(columns[i]);
- insertTabletPlans[i].setRowCount(rowNums[i]);
- serviceProvider.executeNonQuery(insertTabletPlans[i]);
- }
- }
- }
- }
-
- protected InsertTabletPlan[] generateInsertTabletPlans(
- int columnSize, QueryDataSet result, TSDataType dataType) throws IllegalPathException {
- List<PartialPath> targetPaths = generateTargetPaths(result.getPaths());
- InsertTabletPlan[] insertTabletPlans = new InsertTabletPlan[columnSize];
- String[] measurements = new String[] {targetPaths.get(0).getMeasurement()};
- List<Integer> dataTypes = Collections.singletonList(dataType.ordinal());
-
- for (int i = 0; i < columnSize; i++) {
- insertTabletPlans[i] =
- new InsertTabletPlan(
- new PartialPath(targetPaths.get(i).getDevice()), measurements, dataTypes);
- }
-
- return insertTabletPlans;
- }
-
- protected Object[][] constructColumns(int columnSize, int fetchSize, TSDataType dataType) {
- Object[][] columns = new Object[columnSize][1];
- for (int i = 0; i < columnSize; i++) {
- switch (dataType) {
- case DOUBLE:
- columns[i][0] = new double[fetchSize];
- break;
- case INT64:
- columns[i][0] = new long[fetchSize];
- break;
- case INT32:
- columns[i][0] = new int[fetchSize];
- break;
- case FLOAT:
- columns[i][0] = new float[fetchSize];
- break;
- default:
- break;
- }
- }
- return columns;
- }
-
- protected void fillColumns(
- Object[][] columns,
- TSDataType dataType,
- RowRecord record,
- int[] rowNums,
- long[][] timestamps) {
- List<Field> fields = record.getFields();
- long ts = record.getTimestamp();
-
- for (int i = 0; i < columns.length; i++) {
- Field field = fields.get(i);
- if (field != null) {
- timestamps[i][rowNums[i]] = ts;
- switch (dataType) {
- case DOUBLE:
- ((double[]) columns[i][0])[rowNums[i]] = field.getDoubleV();
- break;
- case INT64:
- ((long[]) columns[i][0])[rowNums[i]] = field.getLongV();
- break;
- case INT32:
- ((int[]) columns[i][0])[rowNums[i]] = field.getIntV();
- break;
- case FLOAT:
- ((float[]) columns[i][0])[rowNums[i]] = field.getFloatV();
- break;
- default:
- }
-
- rowNums[i]++;
+ protected void doInsert(
+ String sql, QueryOperator queryOperator, GroupByTimePlan queryPlan, QueryDataSet queryDataSet)
+ throws MetadataException, QueryProcessException, StorageEngineException, IOException {
+ InsertTabletPlansIterator insertTabletPlansIterator =
+ new InsertTabletPlansIterator(
+ queryPlan,
+ queryDataSet,
+ queryOperator.getFromComponent().getPrefixPaths().get(0),
+ generateTargetPaths(queryDataSet.getPaths()),
+ false);
+ while (insertTabletPlansIterator.hasNext()) {
+ if (!serviceProvider.executeNonQuery(
+ new InsertMultiTabletPlan(insertTabletPlansIterator.next()))) {
+ throw new ContinuousQueryException(
+ String.format(
+ "failed to execute cq task %s, sql: %s",
+ continuousQueryPlan.getContinuousQueryName(), sql));
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
similarity index 98%
rename from server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
rename to server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
index 5b112d7..000c713 100644
--- a/server/src/main/java/org/apache/iotdb/db/cq/ContinuousQueryTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/cq/ContinuousQueryTaskPoolManager.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iotdb.db.cq;
+package org.apache.iotdb.db.engine.cq;
import org.apache.iotdb.db.concurrent.IoTThreadFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
index a7b0738..a83c23b 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/selectinto/InsertTabletPlansIterator.java
@@ -25,9 +25,13 @@ import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
import org.apache.iotdb.db.qp.physical.crud.QueryPlan;
import org.apache.iotdb.db.query.expression.ResultColumn;
+import org.apache.iotdb.tsfile.read.common.Path;
import org.apache.iotdb.tsfile.read.common.RowRecord;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -38,7 +42,9 @@ import java.util.regex.Pattern;
public class InsertTabletPlansIterator {
- private static final Pattern leveledPathNodePattern = Pattern.compile("\\$\\{\\w+}");
+ private static final Logger LOGGER = LoggerFactory.getLogger(InsertTabletPlansIterator.class);
+
+ private static final Pattern LEVELED_PATH_NODE_PATTERN = Pattern.compile("\\$\\{\\w+}");
private final QueryPlan queryPlan;
private final QueryDataSet queryDataSet;
@@ -80,7 +86,7 @@ public class InsertTabletPlansIterator {
private PartialPath generateActualIntoPath(int index) throws IllegalPathException {
String[] nodes = fromPath.getNodes();
StringBuffer sb = new StringBuffer();
- Matcher m = leveledPathNodePattern.matcher(intoPaths.get(index).getFullPath());
+ Matcher m = LEVELED_PATH_NODE_PATTERN.matcher(intoPaths.get(index).getFullPath());
while (m.find()) {
String param = m.group();
String value = nodes[Integer.parseInt(param.substring(2, param.length() - 1).trim())];
@@ -92,7 +98,7 @@ public class InsertTabletPlansIterator {
private void constructInsertTabletPlanGenerators() {
final Map<String, Integer> sourcePathToQueryDataSetIndex = queryPlan.getPathToIndex();
- final List<ResultColumn> resultColumns = queryPlan.getResultColumns();
+ final List<String> sourcePaths = findSourcePaths();
Map<String, InsertTabletPlanGenerator> deviceToPlanGeneratorMap = new HashMap<>();
for (int i = 0, intoPathsSize = intoPaths.size(); i < intoPathsSize; i++) {
@@ -105,13 +111,50 @@ public class InsertTabletPlansIterator {
.get(device)
.collectTargetPathInformation(
intoPaths.get(i).getMeasurement(),
- sourcePathToQueryDataSetIndex.get(resultColumns.get(i).getResultColumnName()));
+ sourcePathToQueryDataSetIndex.get(sourcePaths.get(i)));
}
insertTabletPlanGenerators =
deviceToPlanGeneratorMap.values().toArray(new InsertTabletPlanGenerator[0]);
}
+ private List<String> findSourcePaths() {
+ // sourcePaths can be in queryPlanColumns or in queryDataSetPaths
+ final List<ResultColumn> queryPlanColumns = queryPlan.getResultColumns();
+ final List<Path> queryDataSetPaths = queryDataSet.getPaths();
+
+ final Map<String, Integer> sourcePathToQueryDataSetIndex = queryPlan.getPathToIndex();
+ final List<String> sourcePaths =
+ new ArrayList<>(Math.max(queryPlanColumns.size(), queryDataSetPaths.size()));
+
+ if (queryPlanColumns.size() == intoPaths.size()) {
+ for (ResultColumn resultColumn : queryPlanColumns) {
+ String path = resultColumn.getResultColumnName();
+ if (!sourcePathToQueryDataSetIndex.containsKey(path)) {
+ sourcePaths.clear();
+ break;
+ }
+ sourcePaths.add(path);
+ }
+ }
+
+ if (sourcePaths.isEmpty() && queryDataSetPaths.size() == intoPaths.size()) {
+ for (Path path : queryDataSetPaths) {
+ if (!sourcePathToQueryDataSetIndex.containsKey(path.getFullPath())) {
+ sourcePaths.clear();
+ break;
+ }
+ sourcePaths.add(path.getFullPath());
+ }
+ }
+
+ if (sourcePaths.isEmpty()) {
+ LOGGER.warn("select into: sourcePaths.isEmpty()");
+ }
+
+ return sourcePaths;
+ }
+
public boolean hasNext() throws IOException {
return queryDataSet.hasNext();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
index f6eb834..6ec6eee 100644
--- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.metadata;
import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine;
import org.apache.iotdb.db.exception.ContinuousQueryException;
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
index 61c1e40..02258df 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/Planner.java
@@ -141,4 +141,13 @@ public class Planner {
public PhysicalPlan parseSQLToPhysicalPlan(String sqlStr) throws QueryProcessException {
return parseSQLToPhysicalPlan(sqlStr, ZoneId.systemDefault());
}
+
+ public PhysicalPlan operatorToPhysicalPlan(Operator operator) throws QueryProcessException {
+ // check if there are logical errors
+ LogicalChecker.check(operator);
+ // optimize the logical operator
+ operator = logicalOptimize(operator);
+ // from logical operator to physical plan
+ return new PhysicalGenerator().transformToPhysicalPlan(operator);
+ }
}
diff --git a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
index 0966e6b..3e05f50 100644
--- a/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
@@ -29,12 +29,12 @@ import org.apache.iotdb.db.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.db.concurrent.ThreadName;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager.TaskStatus;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.flush.pool.FlushTaskPoolManager;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor.TimePartitionFilter;
diff --git a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
index 8be34eb..51195d7 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/expression/ResultColumn.java
@@ -154,6 +154,11 @@ public class ResultColumn {
}
@Override
+ public String toString() {
+ return "ResultColumn{" + "expression=" + expression + ", alias='" + alias + '\'' + '}';
+ }
+
+ @Override
public final int hashCode() {
return alias == null ? getResultColumnName().hashCode() : alias.hashCode();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index 876272f..7e90950 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@ -24,11 +24,11 @@ import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceCheck;
import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
import org.apache.iotdb.db.engine.compaction.cross.inplace.manage.MergeManager;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.flush.FlushManager;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.ConfigurationException;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 2db631c..5256df3 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -24,12 +24,12 @@ import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.constant.TestConstant;
-import org.apache.iotdb.db.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.StorageEngine;
import org.apache.iotdb.db.engine.cache.BloomFilterCache;
import org.apache.iotdb.db.engine.cache.ChunkCache;
import org.apache.iotdb.db.engine.cache.TimeSeriesMetadataCache;
import org.apache.iotdb.db.engine.compaction.CompactionTaskManager;
+import org.apache.iotdb.db.engine.cq.ContinuousQueryService;
import org.apache.iotdb.db.engine.trigger.service.TriggerRegistrationService;
import org.apache.iotdb.db.exception.ContinuousQueryException;
import org.apache.iotdb.db.exception.StorageEngineException;