You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/12/09 13:32:58 UTC
[iotdb] branch master updated: [IOTDB-4572] [IOTDB-3580] support order by in align by device
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 150a5dc7ab [IOTDB-4572] [IOTDB-3580] support order by in align by device
150a5dc7ab is described below
commit 150a5dc7ab2666291bc4088549b12990dc755ad1
Author: YangCaiyin <yc...@gmail.com>
AuthorDate: Fri Dec 9 21:32:50 2022 +0800
[IOTDB-4572] [IOTDB-3580] support order by in align by device
---
.../IoTDBOrderByWithAlignByDeviceIT.java | 1178 ++++++++++++++++
.../iotdb/db/it/query/IoTDBNullValueFillIT.java | 2 +-
.../operator/process/MergeSortOperator.java | 227 +++
.../operator/process/SingleDeviceViewOperator.java | 137 ++
.../process/join/merge/MergeSortComparator.java | 145 ++
.../plan/planner/LocalExecutionPlanContext.java | 11 +
.../db/mpp/plan/planner/LogicalPlanBuilder.java | 77 +-
.../db/mpp/plan/planner/LogicalPlanVisitor.java | 9 +-
.../db/mpp/plan/planner/OperatorTreeGenerator.java | 48 +
.../planner/distribution/DistributionPlanner.java | 8 +-
.../planner/distribution/ExchangeNodeAdder.java | 114 +-
.../plan/planner/distribution/SourceRewriter.java | 186 ++-
.../plan/planner/plan/node/PlanGraphPrinter.java | 25 +
.../mpp/plan/planner/plan/node/PlanNodeType.java | 10 +-
.../db/mpp/plan/planner/plan/node/PlanVisitor.java | 10 +
.../planner/plan/node/SimplePlanNodeRewriter.java | 20 +-
.../planner/plan/node/process/MergeSortNode.java | 123 ++
.../plan/node/process/SingleDeviceViewNode.java | 145 ++
.../plan/statement/component/OrderByComponent.java | 3 +-
.../db/mpp/plan/statement/crud/QueryStatement.java | 11 +-
.../db/utils/datastructure/MergeSortHeap.java | 113 ++
.../iotdb/db/utils/datastructure/MergeSortKey.java | 41 +
.../execution/operator/MergeSortOperatorTest.java | 1475 ++++++++++++++++++++
.../operator/SingleDeviceViewOperatorTest.java | 207 +++
.../db/mpp/plan/plan/QueryLogicalPlanUtil.java | 10 +-
.../distribution/AggregationDistributionTest.java | 4 +-
26 files changed, 4209 insertions(+), 130 deletions(-)
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
new file mode 100644
index 0000000000..e6bbd3633a
--- /dev/null
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/alignbydevice/IoTDBOrderByWithAlignByDeviceIT.java
@@ -0,0 +1,1178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.it.alignbydevice;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBOrderByWithAlignByDeviceIT {
+ private static final String[] places =
+ new String[] {
+ "root.weather.London",
+ "root.weather.Edinburgh",
+ "root.weather.Belfast",
+ "root.weather.Birmingham",
+ "root.weather.Liverpool",
+ "root.weather.Derby",
+ "root.weather.Durham",
+ "root.weather.Hereford",
+ "root.weather.Manchester",
+ "root.weather.Oxford"
+ };
+ private static final long startPrecipitation = 200;
+ private static final double startTemperature = 20.0;
+ private static final long startTime = 1668960000000L;
+ private static final int numOfPointsInDevice = 20;
+ private static final long timeGap = 100L;
+ private static final Map<String, Long> deviceToStartTimestamp = new HashMap<>();
+ public static final Map<String, double[]> deviceToMaxTemperature = new HashMap<>();
+ public static final Map<String, double[]> deviceToAvgTemperature = new HashMap<>();
+ public static final Map<String, long[]> deviceToMaxPrecipitation = new HashMap<>();
+ public static final Map<String, double[]> deviceToAvgPrecipitation = new HashMap<>();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv().initBeforeClass();
+ insertData();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanAfterClass();
+ }
+
+ /**
+ * This method generate test data with crossing time.
+ *
+ * <p>The data can be viewed in online doc:
+ *
+ * <p>https://docs.google.com/spreadsheets/d/18XlOIi27ZIIdRnar2WNXVMxkZwjgwlPZmzJLVpZRpAA/edit#gid=0
+ */
+ private static void insertData() {
+ try (Connection iotDBConnection = EnvFactory.getEnv().getConnection();
+ Statement statement = iotDBConnection.createStatement()) {
+ // create TimeSeries
+ for (String place : places) {
+ String PRE_PRECIPITATION = place + ".precipitation";
+ String PRE_TEMPERATURE = place + ".temperature";
+ String createPrecipitationSql =
+ "CREATE TIMESERIES " + PRE_PRECIPITATION + " WITH DATATYPE=INT64, ENCODING=RLE";
+ String createTemperatureSql =
+ "CREATE TIMESERIES " + PRE_TEMPERATURE + " WITH DATATYPE=DOUBLE, ENCODING=RLE";
+ statement.execute(createPrecipitationSql);
+ statement.execute(createTemperatureSql);
+ }
+ // insert data
+ long start = startTime;
+ double[][] temperatures = new double[places.length][29];
+ long[][] precipitations = new long[places.length][29];
+ for (int index = 0; index < places.length; index++) {
+ String place = places[index];
+
+ for (int i = 0; i < numOfPointsInDevice; i++) {
+ long precipitation = startPrecipitation + place.hashCode() + (start + i * timeGap);
+ double temperature = startTemperature + place.hashCode() + (start + i * timeGap);
+ precipitations[index][(int) ((start - startTime) / timeGap) + i] = precipitation;
+ temperatures[index][(int) ((start - startTime) / timeGap) + i] = temperature;
+ String insertUniqueTime =
+ "INSERT INTO "
+ + place
+ + "(timestamp,precipitation,temperature) VALUES("
+ + (start + i * timeGap)
+ + ","
+ + precipitation
+ + ","
+ + temperature
+ + ")";
+ statement.execute(insertUniqueTime);
+ if (i == 0) deviceToStartTimestamp.put(place, start);
+ }
+ start += timeGap;
+ }
+
+ for (int i = 0; i < places.length; i++) {
+ double[] aT = new double[3];
+ double[] aP = new double[3];
+ double[] mT = new double[3];
+ long[] mP = new long[3];
+ double totalTemperature = 0;
+ long totalPrecipitation = 0;
+ double maxTemperature = -1;
+ long maxPrecipitation = -1;
+ int cnt = 0;
+ for (int j = 0; j < precipitations[i].length; j++) {
+ totalTemperature += temperatures[i][j];
+ totalPrecipitation += precipitations[i][j];
+ maxPrecipitation = Math.max(maxPrecipitation, precipitations[i][j]);
+ maxTemperature = Math.max(maxTemperature, temperatures[i][j]);
+ if ((j + 1) % 10 == 0 || j == precipitations[i].length - 1) {
+ aT[cnt] = totalTemperature / 10;
+ aP[cnt] = (double) totalPrecipitation / 10;
+ mP[cnt] = maxPrecipitation;
+ mT[cnt] = maxTemperature;
+ cnt++;
+ totalTemperature = 0;
+ totalPrecipitation = 0;
+ maxTemperature = -1;
+ maxPrecipitation = -1;
+ }
+ }
+ deviceToMaxTemperature.put(places[i], mT);
+ deviceToMaxPrecipitation.put(places[i], mP);
+ deviceToAvgTemperature.put(places[i], aT);
+ deviceToAvgPrecipitation.put(places[i], aP);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ private void checkHeader(ResultSetMetaData resultSetMetaData, String title) throws SQLException {
+ String[] headers = title.split(",");
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ assertEquals(headers[i - 1], resultSetMetaData.getColumnName(i));
+ }
+ }
+
+ // ORDER BY DEVICE
+ @Test
+ public void orderByDeviceTest1() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE ALIGN BY DEVICE";
+ Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByDeviceTest2() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE ASC ALIGN BY DEVICE";
+ Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByDeviceTest3() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE DESC ALIGN BY DEVICE";
+ Object[] expectedDevice =
+ Arrays.stream(places.clone()).sorted(Comparator.reverseOrder()).toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ORDER BY TIME
+
+ @Test
+ public void orderByTimeTest1() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = -1;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp >= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByTimeTest2() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME ASC ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = -1;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp >= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByTimeTest3() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME DESC ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = Long.MAX_VALUE;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp <= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ORDER BY DEVICE,TIME
+
+ @Test
+ public void orderByDeviceTimeTest1() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE ASC,TIME DESC ALIGN BY DEVICE";
+ Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(
+ deviceToStartTimestamp.get(actualDevice) + timeGap * (numOfPointsInDevice - cnt - 1),
+ actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByDeviceTimeTest2() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE ASC,TIME ASC ALIGN BY DEVICE";
+ Object[] expectedDevice = Arrays.stream(places.clone()).sorted().toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByDeviceTimeTest3() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE DESC,TIME DESC ALIGN BY DEVICE";
+ Object[] expectedDevice =
+ Arrays.stream(places.clone()).sorted(Comparator.reverseOrder()).toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(
+ deviceToStartTimestamp.get(actualDevice) + timeGap * (numOfPointsInDevice - cnt - 1),
+ actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByDeviceTimeTest4() {
+ String sql = "SELECT * FROM root.** ORDER BY DEVICE DESC,TIME ASC ALIGN BY DEVICE";
+ Object[] expectedDevice =
+ Arrays.stream(places.clone()).sorted(Comparator.reverseOrder()).toArray();
+ int index = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedDevice[index], actualDevice);
+ assertEquals(deviceToStartTimestamp.get(actualDevice) + cnt * timeGap, actualTimeStamp);
+
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ (startTemperature + actualDevice.hashCode() + actualTimeStamp) - actualTemperature
+ < 0.00001);
+
+ cnt++;
+ if (cnt % numOfPointsInDevice == 0) {
+ index++;
+ cnt = 0;
+ }
+ }
+ assertEquals(10, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // ORDER BY TIME,DEVICE
+
+ @Test
+ public void orderByTimeDeviceTest1() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME ASC,DEVICE DESC ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = -1;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp >= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByTimeDeviceTest2() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME ASC,DEVICE ASC ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = -1;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp >= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByTimeDeviceTest3() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME DESC,DEVICE DESC ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = Long.MAX_VALUE;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp <= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void orderByTimeDeviceTest4() {
+ String sql = "SELECT * FROM root.** ORDER BY TIME DESC,DEVICE ASC ALIGN BY DEVICE";
+ int total = 0;
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(resultSetMetaData, "Time,Device,precipitation,temperature");
+ long lastTimeStamp = Long.MAX_VALUE;
+ String lastDevice = "";
+ while (resultSet.next()) {
+ long actualTimeStamp = resultSet.getLong(1);
+ assertTrue(actualTimeStamp <= lastTimeStamp);
+ String actualDevice = resultSet.getString(2);
+ if (!lastDevice.equals("") && actualTimeStamp == lastTimeStamp) {
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ }
+ lastDevice = actualDevice;
+ lastTimeStamp = actualTimeStamp;
+ long actualPrecipitation = resultSet.getLong(3);
+ double actualTemperature = resultSet.getDouble(4);
+ assertEquals(
+ startPrecipitation + actualDevice.hashCode() + actualTimeStamp, actualPrecipitation);
+ assertTrue(
+ startTemperature + actualDevice.hashCode() + actualTimeStamp - actualTemperature
+ < 0.00001);
+ total++;
+ }
+ assertEquals(numOfPointsInDevice * places.length, total);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ // aggregation query
+ private static final int[][] countIn1000MSFiledWith100MSTimeGap =
+ new int[][] {
+ {10, 10, 0},
+ {9, 10, 1},
+ {8, 10, 2},
+ {7, 10, 3},
+ {6, 10, 4},
+ {5, 10, 5},
+ {4, 10, 6},
+ {3, 10, 7},
+ {2, 10, 8},
+ {1, 10, 9}
+ };
+
+ private static int getCountNum(String device, int cnt) {
+ int index = 0;
+ for (int i = 0; i < places.length; i++) {
+ if (places[i].equals(device)) {
+ index = i;
+ break;
+ }
+ }
+
+ return countIn1000MSFiledWith100MSTimeGap[index][cnt];
+ }
+
+ @Test
+ public void groupByTimeOrderByDeviceTest1() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = -1;
+ String lastDevice = "";
+ long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 0;
+ while (resultSet.next()) {
+
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedTime[cnt], actualTime);
+ if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+ assertTrue(actualTime >= lastTime);
+ if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ cnt++;
+ if (cnt % 3 == 0) cnt = 0;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByDeviceTest2() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY DEVICE DESC ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = -1;
+ String lastDevice = "";
+ long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 0;
+ while (resultSet.next()) {
+
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedTime[cnt], actualTime);
+ if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+ assertTrue(actualTime >= lastTime);
+ if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ cnt++;
+ if (cnt % 3 == 0) cnt = 0;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByDeviceTest3() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY DEVICE DESC,TIME DESC ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = -1;
+ String lastDevice = "";
+ long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 2;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedTime[cnt], actualTime);
+ if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+ assertTrue(actualTime <= lastTime);
+ if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ cnt--;
+ if (cnt < 0) cnt = 2;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByDeviceTest4() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY DEVICE ASC,TIME DESC ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = -1;
+ String lastDevice = "";
+ long[] expectedTime = new long[] {startTime, startTime + 1000, startTime + 1000 * 2};
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 2;
+ while (resultSet.next()) {
+
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertEquals(expectedTime[cnt], actualTime);
+ if (!Objects.equals(lastDevice, "") && Objects.equals(actualDevice, lastDevice))
+ assertTrue(actualTime <= lastTime);
+ if (!Objects.equals(lastDevice, "")) assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ cnt--;
+ if (cnt < 0) cnt = 2;
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByTimeTest1() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = -1;
+ String lastDevice = "";
+ int index = 0;
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertTrue(actualTime >= lastTime);
+ if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ index++;
+ if (index % 10 == 0) cnt++;
+ }
+ assertEquals(30, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByTimeTest2() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME DESC ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = Long.MAX_VALUE;
+ String lastDevice = "";
+ int index = 0;
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 2;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertTrue(actualTime <= lastTime);
+ if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+ assertTrue(actualDevice.compareTo(lastDevice) >= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ index++;
+ if (index % 10 == 0) cnt--;
+ }
+ assertEquals(30, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByTimeTest3() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME ASC,DEVICE DESC ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = -1;
+ String lastDevice = "";
+ int index = 0;
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 0;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertTrue(actualTime >= lastTime);
+ if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+ assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ index++;
+ if (index % 10 == 0) cnt++;
+ }
+ assertEquals(30, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void groupByTimeOrderByTimeTest4() {
+ String sql =
+ "SELECT AVG(*),COUNT(*),MAX_VALUE(*) FROM root.weather.** GROUP BY([2022-11-21T00:00:00.000+08:00,2022-11-21T00:00:02.801+08:00),1000ms) ORDER BY TIME DESC,DEVICE DESC ALIGN BY DEVICE";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+ long lastTime = Long.MAX_VALUE;
+ String lastDevice = "";
+ int index = 0;
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
+ checkHeader(
+ resultSetMetaData,
+ "Time,Device,AVG(precipitation),AVG(temperature),COUNT(precipitation),COUNT(temperature),MAX_VALUE(precipitation),MAX_VALUE(temperature)");
+ int cnt = 2;
+ while (resultSet.next()) {
+ long actualTime = resultSet.getLong(1);
+ String actualDevice = resultSet.getString(2);
+ assertTrue(actualTime <= lastTime);
+ if (!Objects.equals(lastDevice, "") && actualTime == lastTime)
+ assertTrue(actualDevice.compareTo(lastDevice) <= 0);
+ lastTime = actualTime;
+ lastDevice = actualDevice;
+ double avgPrecipitation = resultSet.getDouble(3);
+ assertTrue(deviceToAvgPrecipitation.get(actualDevice)[cnt] - avgPrecipitation < 0.00001);
+
+ double avgTemperature = resultSet.getDouble(4);
+ assertTrue(deviceToAvgTemperature.get(actualDevice)[cnt] - avgTemperature < 0.00001);
+
+ int countPrecipitation = resultSet.getInt(5);
+ assertEquals(getCountNum(actualDevice, cnt), countPrecipitation);
+ int countTemperature = resultSet.getInt(6);
+ assertEquals(getCountNum(actualDevice, cnt), countTemperature);
+
+ long maxPrecipitation = resultSet.getLong(7);
+ assertEquals(deviceToMaxPrecipitation.get(actualDevice)[cnt], maxPrecipitation);
+
+ double maxTemperature = resultSet.getDouble(8);
+ assertTrue(deviceToMaxTemperature.get(actualDevice)[cnt] - maxTemperature < 0.00001);
+ index++;
+ if (index % 10 == 0) cnt--;
+ }
+ assertEquals(30, index);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+}
diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
index 6b066a5905..0346dcb0f8 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/query/IoTDBNullValueFillIT.java
@@ -203,7 +203,7 @@ public class IoTDBNullValueFillIT {
"1,root.sg1.d2,1,2,1.0,2.0,true,t2,"
};
resultSetEqualTest(
- "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) order by time desc align by device",
+ "select s1, s2, s3, s4, s5, s6 from root.sg1.* fill(previous) order by device,time desc align by device",
expectedAlignByDeviceHeader,
retArray);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
new file mode 100644
index 0000000000..372d01cb23
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/MergeSortOperator.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.db.utils.datastructure.MergeSortHeap;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.ColumnBuilder;
+import org.apache.iotdb.tsfile.read.common.block.column.TimeColumnBuilder;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+
+import static com.google.common.util.concurrent.Futures.successfulAsList;
+
+public class MergeSortOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final List<Operator> inputOperators;
+ private final List<TSDataType> dataTypes;
+ private final TsBlockBuilder tsBlockBuilder;
+ private final int inputOperatorsCount;
+ private final TsBlock[] inputTsBlocks;
+ private final boolean[] noMoreTsBlocks;
+ private final MergeSortHeap mergeSortHeap;
+ private final Comparator<MergeSortKey> comparator;
+
+ private boolean finished;
+
+ public MergeSortOperator(
+ OperatorContext operatorContext,
+ List<Operator> inputOperators,
+ List<TSDataType> dataTypes,
+ Comparator<MergeSortKey> comparator) {
+ this.operatorContext = operatorContext;
+ this.inputOperators = inputOperators;
+ this.dataTypes = dataTypes;
+ this.inputOperatorsCount = inputOperators.size();
+ this.mergeSortHeap = new MergeSortHeap(inputOperatorsCount, comparator);
+ this.comparator = comparator;
+ this.inputTsBlocks = new TsBlock[inputOperatorsCount];
+ this.noMoreTsBlocks = new boolean[inputOperatorsCount];
+ this.tsBlockBuilder = new TsBlockBuilder(dataTypes);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ List<ListenableFuture<?>> listenableFutures = new ArrayList<>();
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!noMoreTsBlocks[i] && isTsBlockEmpty(i)) {
+ ListenableFuture<?> blocked = inputOperators.get(i).isBlocked();
+ if (!blocked.isDone()) {
+ listenableFutures.add(blocked);
+ }
+ }
+ }
+ return listenableFutures.isEmpty() ? NOT_BLOCKED : successfulAsList(listenableFutures);
+ }
+
+ /** If the tsBlock is null or has no more data in the tsBlock, return true; else return false; */
+ private boolean isTsBlockEmpty(int tsBlockIndex) {
+ return inputTsBlocks[tsBlockIndex] == null
+ || inputTsBlocks[tsBlockIndex].getPositionCount() == 0;
+ }
+
+ @Override
+ public TsBlock next() {
+ // 1. fill consumed up TsBlock
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!noMoreTsBlocks[i] && isTsBlockEmpty(i) && inputOperators.get(i).hasNext()) {
+ inputTsBlocks[i] = inputOperators.get(i).next();
+ if (inputTsBlocks[i] == null || inputTsBlocks[i].isEmpty()) {
+ return null;
+ }
+ mergeSortHeap.push(new MergeSortKey(inputTsBlocks[i], 0, i));
+ }
+ }
+
+ // 2. check if we can directly return the original TsBlock instead of merging way
+ MergeSortKey minMergeSortKey = mergeSortHeap.poll();
+ if (mergeSortHeap.isEmpty()
+ || comparator.compare(
+ new MergeSortKey(
+ minMergeSortKey.tsBlock, minMergeSortKey.tsBlock.getPositionCount() - 1),
+ mergeSortHeap.peek())
+ < 0) {
+ inputTsBlocks[minMergeSortKey.columnIndex] = null;
+ return minMergeSortKey.rowIndex == 0
+ ? minMergeSortKey.tsBlock
+ : minMergeSortKey.tsBlock.subTsBlock(minMergeSortKey.rowIndex);
+ }
+ mergeSortHeap.push(minMergeSortKey);
+
+ // 3. do merge sort until one TsBlock is consumed up
+ tsBlockBuilder.reset();
+ TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
+ ColumnBuilder[] valueColumnBuilders = tsBlockBuilder.getValueColumnBuilders();
+ while (!mergeSortHeap.isEmpty()) {
+ MergeSortKey mergeSortKey = mergeSortHeap.poll();
+ TsBlock targetBlock = mergeSortKey.tsBlock;
+ int rowIndex = mergeSortKey.rowIndex;
+ timeBuilder.writeLong(targetBlock.getTimeByIndex(rowIndex));
+ for (int i = 0; i < valueColumnBuilders.length; i++) {
+ if (targetBlock.getColumn(i).isNull(rowIndex)) {
+ valueColumnBuilders[i].appendNull();
+ continue;
+ }
+ valueColumnBuilders[i].write(targetBlock.getColumn(i), rowIndex);
+ }
+ tsBlockBuilder.declarePosition();
+ if (mergeSortKey.rowIndex == mergeSortKey.tsBlock.getPositionCount() - 1) {
+ inputTsBlocks[mergeSortKey.columnIndex] = null;
+ if (!mergeSortHeap.isEmpty()
+ && comparator.compare(mergeSortHeap.peek(), mergeSortKey) > 0) {
+ break;
+ }
+ } else {
+ mergeSortKey.rowIndex++;
+ mergeSortHeap.push(mergeSortKey);
+ }
+ }
+ return tsBlockBuilder.build();
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (finished) {
+ return false;
+ }
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!isTsBlockEmpty(i)) {
+ return true;
+ } else if (!noMoreTsBlocks[i]) {
+ if (inputOperators.get(i).hasNext()) {
+ return true;
+ } else {
+ noMoreTsBlocks[i] = true;
+ inputTsBlocks[i] = null;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (Operator operator : inputOperators) {
+ operator.close();
+ }
+ }
+
+ @Override
+ public boolean isFinished() {
+ if (finished) {
+ return true;
+ }
+ finished = true;
+
+ for (int i = 0; i < inputOperatorsCount; i++) {
+ if (!noMoreTsBlocks[i] || !isTsBlockEmpty(i)) {
+ finished = false;
+ break;
+ }
+ }
+ return finished;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ // MergeToolKit will cache startKey and endKey
+ long maxPeekMemory = TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ // inputTsBlocks will cache all the tsBlocks returned by inputOperators
+ for (Operator operator : inputOperators) {
+ maxPeekMemory += operator.calculateMaxReturnSize();
+ maxPeekMemory += operator.calculateRetainedSizeAfterCallingNext();
+ }
+ for (Operator operator : inputOperators) {
+ maxPeekMemory = Math.max(maxPeekMemory, operator.calculateMaxPeekMemory());
+ }
+ return Math.max(maxPeekMemory, calculateMaxReturnSize());
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return (1L + dataTypes.size()) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ long currentRetainedSize = 0, minChildReturnSize = Long.MAX_VALUE;
+ for (Operator child : inputOperators) {
+ long maxReturnSize = child.calculateMaxReturnSize();
+ minChildReturnSize = Math.min(minChildReturnSize, maxReturnSize);
+ currentRetainedSize += (maxReturnSize + child.calculateRetainedSizeAfterCallingNext());
+ }
+ return currentRetainedSize - minChildReturnSize;
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
new file mode 100644
index 0000000000..0107af8121
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/SingleDeviceViewOperator.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process;
+
+import org.apache.iotdb.db.mpp.execution.operator.Operator;
+import org.apache.iotdb.db.mpp.execution.operator.OperatorContext;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.read.common.block.column.BinaryColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.Column;
+import org.apache.iotdb.tsfile.read.common.block.column.NullColumn;
+import org.apache.iotdb.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.iotdb.tsfile.utils.Binary;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * The SingleDeviceViewOperator plays a similar role with DeviceViewOperator of adding a device
+ * column to current resultSet.
+ *
+ * <p>Different from DeviceViewOperator which merge the resultSet from different devices,
+ * SingleDeviceViewOperator only focuses on one single device, the goal of it is to add a device
+ * view. It's just a transition and won't change the way data flows.
+ */
+public class SingleDeviceViewOperator implements ProcessOperator {
+
+ private final OperatorContext operatorContext;
+ private final Operator deviceOperator;
+ // Used to fill columns and leave null columns which doesn't exist in some devices.
+ private final List<Integer> deviceColumnIndex;
+ // Column dataTypes that includes device column
+ private final List<TSDataType> dataTypes;
+ private final BinaryColumn binaryColumn;
+
+ public SingleDeviceViewOperator(
+ OperatorContext operatorContext,
+ String device,
+ Operator deviceOperator,
+ List<Integer> deviceColumnIndex,
+ List<TSDataType> dataTypes) {
+ this.operatorContext = operatorContext;
+ this.deviceOperator = deviceOperator;
+ this.deviceColumnIndex = deviceColumnIndex;
+ this.dataTypes = dataTypes;
+ this.binaryColumn = new BinaryColumn(1, Optional.empty(), new Binary[] {new Binary(device)});
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public ListenableFuture<?> isBlocked() {
+ ListenableFuture<?> blocked = deviceOperator.isBlocked();
+ if (!blocked.isDone()) {
+ return blocked;
+ }
+ return NOT_BLOCKED;
+ }
+
+ @Override
+ public TsBlock next() {
+ TsBlock tsBlock = deviceOperator.next();
+ if (tsBlock == null) {
+ return null;
+ }
+ // fill existing columns
+ Column[] newValueColumns = new Column[dataTypes.size()];
+ for (int i = 0; i < deviceColumnIndex.size(); i++) {
+ newValueColumns[deviceColumnIndex.get(i)] = tsBlock.getColumn(i);
+ }
+ // construct device column
+ newValueColumns[0] = new RunLengthEncodedColumn(binaryColumn, tsBlock.getPositionCount());
+ // construct other null columns
+ for (int i = 0; i < dataTypes.size(); i++) {
+ if (newValueColumns[i] == null) {
+ newValueColumns[i] = NullColumn.create(dataTypes.get(i), tsBlock.getPositionCount());
+ }
+ }
+ return new TsBlock(tsBlock.getPositionCount(), tsBlock.getTimeColumn(), newValueColumns);
+ }
+
+ @Override
+ public boolean hasNext() {
+ return deviceOperator.hasNext();
+ }
+
+ @Override
+ public void close() throws Exception {
+ deviceOperator.close();
+ }
+
+ @Override
+ public boolean isFinished() {
+ return !this.hasNext();
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ long maxPeekMemory = calculateMaxReturnSize() + calculateRetainedSizeAfterCallingNext();
+ maxPeekMemory = Math.max(maxPeekMemory, deviceOperator.calculateMaxPeekMemory());
+ return maxPeekMemory;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return (long) (dataTypes.size())
+ * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte();
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return deviceOperator.calculateRetainedSizeAfterCallingNext();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
new file mode 100644
index 0000000000..da17deed4e
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/operator/process/join/merge/MergeSortComparator.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.mpp.execution.operator.process.join.merge;
+
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.utils.datastructure.MergeSortKey;
+
+import java.util.Comparator;
+import java.util.List;
+
+public class MergeSortComparator {
+
+ public static final Comparator<MergeSortKey> ASC_TIME_ASC_DEVICE =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int timeComparing =
+ (int) (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex));
+ return timeComparing == 0
+ ? o1.tsBlock
+ .getColumn(0)
+ .getBinary(o1.rowIndex)
+ .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex))
+ : timeComparing;
+ };
+ public static final Comparator<MergeSortKey> ASC_TIME_DESC_DEVICE =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int timeComparing =
+ (int) (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex));
+ return timeComparing == 0
+ ? o2.tsBlock
+ .getColumn(0)
+ .getBinary(o2.rowIndex)
+ .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex))
+ : timeComparing;
+ };
+ public static final Comparator<MergeSortKey> DESC_TIME_ASC_DEVICE =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int timeComparing =
+ (int) (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex));
+ return timeComparing == 0
+ ? o1.tsBlock
+ .getColumn(0)
+ .getBinary(o1.rowIndex)
+ .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex))
+ : timeComparing;
+ };
+ public static final Comparator<MergeSortKey> DESC_TIME_DESC_DEVICE =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int timeComparing =
+ (int) (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex));
+ return timeComparing == 0
+ ? o2.tsBlock
+ .getColumn(0)
+ .getBinary(o2.rowIndex)
+ .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex))
+ : timeComparing;
+ };
+
+ public static final Comparator<MergeSortKey> ASC_DEVICE_ASC_TIME =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int deviceComparing =
+ o1.tsBlock
+ .getColumn(0)
+ .getBinary(o1.rowIndex)
+ .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex));
+ return deviceComparing == 0
+ ? (int)
+ (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex))
+ : deviceComparing;
+ };
+ public static final Comparator<MergeSortKey> ASC_DEVICE_DESC_TIME =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int deviceComparing =
+ o1.tsBlock
+ .getColumn(0)
+ .getBinary(o1.rowIndex)
+ .compareTo(o2.tsBlock.getColumn(0).getBinary(o2.rowIndex));
+ return deviceComparing == 0
+ ? (int)
+ (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex))
+ : deviceComparing;
+ };
+ public static final Comparator<MergeSortKey> DESC_DEVICE_ASC_TIME =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int deviceComparing =
+ o2.tsBlock
+ .getColumn(0)
+ .getBinary(o2.rowIndex)
+ .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex));
+ return deviceComparing == 0
+ ? (int)
+ (o1.tsBlock.getTimeByIndex(o1.rowIndex) - o2.tsBlock.getTimeByIndex(o2.rowIndex))
+ : deviceComparing;
+ };
+ public static final Comparator<MergeSortKey> DESC_DEVICE_DESC_TIME =
+ (MergeSortKey o1, MergeSortKey o2) -> {
+ int deviceComparing =
+ o2.tsBlock
+ .getColumn(0)
+ .getBinary(o2.rowIndex)
+ .compareTo(o1.tsBlock.getColumn(0).getBinary(o1.rowIndex));
+ return deviceComparing == 0
+ ? (int)
+ (o2.tsBlock.getTimeByIndex(o2.rowIndex) - o1.tsBlock.getTimeByIndex(o1.rowIndex))
+ : deviceComparing;
+ };
+
+ public static Comparator<MergeSortKey> getComparator(List<SortItem> sortItemList) {
+ if (sortItemList.get(0).getOrdering() == Ordering.ASC) {
+ if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_ASC_DEVICE;
+ else return ASC_DEVICE_ASC_TIME;
+ } else {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return ASC_TIME_DESC_DEVICE;
+ else return ASC_DEVICE_DESC_TIME;
+ }
+ } else {
+ if (sortItemList.get(1).getOrdering() == Ordering.ASC) {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_ASC_DEVICE;
+ else return DESC_DEVICE_ASC_TIME;
+ } else {
+ if (sortItemList.get(0).getSortKey() == SortKey.TIME) return DESC_TIME_DESC_DEVICE;
+ else return DESC_DEVICE_DESC_TIME;
+ }
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
index e2497d44c9..351303134f 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LocalExecutionPlanContext.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
import org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
import org.apache.iotdb.db.mpp.execution.timer.RuleBasedTimeSliceAllocator;
import org.apache.iotdb.db.mpp.plan.analyze.TypeProvider;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Binary;
@@ -55,6 +56,8 @@ public class LocalExecutionPlanContext {
private final TypeProvider typeProvider;
+ private List<TSDataType> cachedDataTypes;
+
// left is cached last value in last query
// right is full path for each cached last value
private List<Pair<TimeValuePair, Binary>> cachedLastValueAndPathList;
@@ -147,6 +150,14 @@ public class LocalExecutionPlanContext {
this.sinkHandle = sinkHandle;
}
+ public void setCachedDataTypes(List<TSDataType> cachedDataTypes) {
+ this.cachedDataTypes = cachedDataTypes;
+ }
+
+ public List<TSDataType> getCachedDataTypes() {
+ return cachedDataTypes;
+ }
+
public TypeProvider getTypeProvider() {
return typeProvider;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
index a04f877064..3a7f1eeb6b 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanBuilder.java
@@ -60,7 +60,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -516,32 +518,73 @@ public class LogicalPlanBuilder {
Map<String, PlanNode> deviceNameToSourceNodesMap,
Set<Expression> deviceViewOutputExpressions,
Map<String, List<Integer>> deviceToMeasurementIndexesMap,
- Ordering mergeOrder) {
+ List<SortItem> sortItemList) {
List<String> outputColumnNames =
deviceViewOutputExpressions.stream()
.map(Expression::getExpressionString)
.collect(Collectors.toList());
- DeviceViewNode deviceViewNode =
- new DeviceViewNode(
- context.getQueryId().genPlanNodeId(),
- new OrderByParameter(
- Arrays.asList(
- new SortItem(SortKey.DEVICE, Ordering.ASC),
- new SortItem(SortKey.TIME, mergeOrder))),
- outputColumnNames,
- deviceToMeasurementIndexesMap);
-
- for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
- String deviceName = entry.getKey();
- PlanNode subPlan = entry.getValue();
- deviceViewNode.addChildDeviceNode(deviceName, subPlan);
+ int timePriority = -1, devicePriority = -1;
+ for (int i = 0; i < sortItemList.size(); i++) {
+ SortKey sortKey = sortItemList.get(i).getSortKey();
+ if (sortKey == SortKey.TIME) {
+ timePriority = sortItemList.size() - i;
+ } else if (sortKey == SortKey.DEVICE) {
+ devicePriority = sortItemList.size() - i;
+ }
+ }
+ Ordering deviceOrdering =
+ devicePriority == -1
+ ? Ordering.ASC
+ : sortItemList.get(sortItemList.size() - devicePriority).getOrdering();
+ Ordering timeOrdering =
+ timePriority == -1
+ ? Ordering.ASC
+ : sortItemList.get(sortItemList.size() - timePriority).getOrdering();
+
+ if ((timePriority == -1 && devicePriority == -1) || devicePriority > timePriority) {
+ DeviceViewNode deviceViewNode =
+ new DeviceViewNode(
+ context.getQueryId().genPlanNodeId(),
+ new OrderByParameter(
+ Arrays.asList(
+ new SortItem(SortKey.DEVICE, deviceOrdering),
+ new SortItem(SortKey.TIME, timeOrdering))),
+ outputColumnNames,
+ deviceToMeasurementIndexesMap);
+
+ for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
+ String deviceName = entry.getKey();
+ PlanNode subPlan = entry.getValue();
+ deviceViewNode.addChildDeviceNode(deviceName, subPlan);
+ }
+ this.root = deviceViewNode;
+ } else {
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.getQueryId().genPlanNodeId(),
+ new OrderByParameter(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering))),
+ outputColumnNames);
+ for (Map.Entry<String, PlanNode> entry : deviceNameToSourceNodesMap.entrySet()) {
+ String deviceName = entry.getKey();
+ PlanNode subPlan = entry.getValue();
+ SingleDeviceViewNode singleDeviceViewNode =
+ new SingleDeviceViewNode(
+ context.getQueryId().genPlanNodeId(),
+ outputColumnNames,
+ deviceName,
+ deviceToMeasurementIndexesMap.get(deviceName));
+ singleDeviceViewNode.addChild(subPlan);
+ mergeSortNode.addChild(singleDeviceViewNode);
+ }
+ this.root = mergeSortNode;
}
context.getTypeProvider().setType(DEVICE, TSDataType.TEXT);
updateTypeProvider(deviceViewOutputExpressions);
-
- this.root = deviceViewNode;
return this;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
index cbbc9d4131..d9a458b4c4 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/LogicalPlanVisitor.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.StatementNode;
import org.apache.iotdb.db.mpp.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
import org.apache.iotdb.db.mpp.plan.statement.crud.DeleteDataStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.mpp.plan.statement.crud.InsertRowStatement;
@@ -67,6 +68,7 @@ import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ActivateTemplate
import org.apache.iotdb.db.mpp.plan.statement.metadata.template.ShowPathsUsingTemplateStatement;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -104,7 +106,10 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
}
if (queryStatement.isAlignByDevice()) {
- Map<String, PlanNode> deviceToSubPlanMap = new TreeMap<>();
+ Map<String, PlanNode> deviceToSubPlanMap =
+ queryStatement.getResultDeviceOrder() == Ordering.ASC
+ ? new TreeMap<>()
+ : new TreeMap<>(Collections.reverseOrder());
for (String deviceName : analysis.getDeviceToSourceExpressions().keySet()) {
LogicalPlanBuilder subPlanBuilder = new LogicalPlanBuilder(analysis, context);
subPlanBuilder =
@@ -127,7 +132,7 @@ public class LogicalPlanVisitor extends StatementVisitor<PlanNode, MPPQueryConte
deviceToSubPlanMap,
analysis.getDeviceViewOutputExpressions(),
analysis.getDeviceViewInputIndexesMap(),
- queryStatement.getResultTimeOrder());
+ queryStatement.getSortItemList());
} else {
planBuilder =
planBuilder.withNewRoot(
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
index 1a22949343..2af0bd0708 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/OperatorTreeGenerator.java
@@ -48,9 +48,11 @@ import org.apache.iotdb.db.mpp.execution.operator.process.FilterAndProjectOperat
import org.apache.iotdb.db.mpp.execution.operator.process.IntoOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LimitOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.LinearFillOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.MergeSortOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.OffsetOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.ProcessOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.RawDataAggregationOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.SlidingWindowAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TagAggregationOperator;
import org.apache.iotdb.db.mpp.execution.operator.process.TransformOperator;
@@ -80,6 +82,7 @@ import org.apache.iotdb.db.mpp.execution.operator.process.join.VerticallyConcatO
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.ColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MergeSortComparator;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MultiColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.NonOverlappedMultiColumnMerger;
import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
@@ -146,8 +149,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -646,6 +651,27 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
return new NodePathsCountOperator(operatorContext, child);
}
+ @Override
+ public Operator visitSingleDeviceView(
+ SingleDeviceViewNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ SingleDeviceViewOperator.class.getSimpleName());
+ Operator child = node.getChild().accept(this, context);
+ List<Integer> deviceColumnIndex = node.getDeviceToMeasurementIndexes();
+ List<TSDataType> outputColumnTypes = context.getCachedDataTypes();
+ if (outputColumnTypes == null || outputColumnTypes.size() == 0) {
+ throw new IllegalStateException("OutputColumTypes should not be null/empty");
+ }
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new SingleDeviceViewOperator(
+ operatorContext, node.getDevice(), child, deviceColumnIndex, outputColumnTypes);
+ }
+
@Override
public Operator visitDeviceView(DeviceViewNode node, LocalExecutionPlanContext context) {
OperatorContext operatorContext =
@@ -705,6 +731,28 @@ public class OperatorTreeGenerator extends PlanVisitor<Operator, LocalExecutionP
operatorContext, node.getDevices(), children, dataTypes, selector, timeComparator);
}
+ @Override
+ public Operator visitMergeSort(MergeSortNode node, LocalExecutionPlanContext context) {
+ OperatorContext operatorContext =
+ context
+ .getInstanceContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ MergeSortOperator.class.getSimpleName());
+ List<TSDataType> dataTypes = getOutputColumnTypes(node, context.getTypeProvider());
+ context.setCachedDataTypes(dataTypes);
+ List<Operator> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(Collectors.toList());
+
+ List<SortItem> sortItemList = node.getMergeOrderParameter().getSortItemList();
+ context.getTimeSliceAllocator().recordExecutionWeight(operatorContext, 1);
+ return new MergeSortOperator(
+ operatorContext, children, dataTypes, MergeSortComparator.getComparator(sortItemList));
+ }
+
@Override
public Operator visitFill(FillNode node, LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
index 7aafb9198c..965dcb1b56 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/DistributionPlanner.java
@@ -51,7 +51,13 @@ public class DistributionPlanner {
public PlanNode rewriteSource() {
SourceRewriter rewriter = new SourceRewriter(this.analysis);
- return rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
+ List<PlanNode> planNodeList =
+ rewriter.visit(logicalPlan.getRootNode(), new DistributionPlanContext(context));
+ if (planNodeList.size() != 1) {
+ throw new IllegalStateException("root node must return only one");
+ } else {
+ return planNodeList.get(0);
+ }
}
public PlanNode addExchangeNode(PlanNode root) {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
index a9590d995a..82b56565bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/ExchangeNodeAdder.java
@@ -39,7 +39,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ExchangeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.FilterNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TransformNode;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.LastQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesAggregationScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SeriesScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.source.SourceNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
import org.apache.iotdb.db.mpp.plan.statement.crud.QueryStatement;
import java.util.ArrayList;
@@ -207,6 +210,83 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
return processMultiChildNode(node, context);
}
+ @Override
+ public PlanNode visitSingleDeviceView(SingleDeviceViewNode node, NodeGroupContext context) {
+ return processOneChildNode(node, context);
+ }
+
+ @Override
+ public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context) {
+ // 1. Group children by dataRegion
+ Map<TRegionReplicaSet, List<PlanNode>> childrenGroupMap = new HashMap<>();
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ PlanNode rawChildNode = node.getChildren().get(i);
+ PlanNode visitedChild = visit(rawChildNode, context);
+ TRegionReplicaSet region = context.getNodeDistribution(visitedChild.getPlanNodeId()).region;
+ childrenGroupMap.computeIfAbsent(region, k -> new ArrayList<>()).add(visitedChild);
+ }
+
+ // 2.add mergeSortNode for each group
+ List<PlanNode> mergeSortNodeList = new ArrayList<>();
+ for (List<PlanNode> group : childrenGroupMap.values()) {
+ if (group.size() == 1) {
+ mergeSortNodeList.add(group.get(0));
+ continue;
+ }
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames());
+ group.forEach(mergeSortNode::addChild);
+ context.putNodeDistribution(
+ mergeSortNode.getPlanNodeId(),
+ new NodeDistribution(
+ NodeDistributionType.SAME_WITH_ALL_CHILDREN,
+ context.getNodeDistribution(mergeSortNode.getChildren().get(0).getPlanNodeId())
+ .region));
+ mergeSortNodeList.add(mergeSortNode);
+ }
+
+ return groupPlanNodeByMergeSortNode(
+ mergeSortNodeList, node.getOutputColumnNames(), node.getMergeOrderParameter(), context);
+ }
+
+ private PlanNode groupPlanNodeByMergeSortNode(
+ List<PlanNode> mergeSortNodeList,
+ List<String> outputColumns,
+ OrderByParameter orderByParameter,
+ NodeGroupContext context) {
+ if (mergeSortNodeList.size() == 1) {
+ return mergeSortNodeList.get(0);
+ }
+
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(), orderByParameter, outputColumns);
+
+ // Each child has different TRegionReplicaSet, so we can select any one from
+ // its child
+ mergeSortNode.addChild(mergeSortNodeList.get(0));
+ context.putNodeDistribution(
+ mergeSortNode.getPlanNodeId(),
+ new NodeDistribution(
+ NodeDistributionType.SAME_WITH_SOME_CHILD,
+ context.getNodeDistribution(mergeSortNodeList.get(0).getPlanNodeId()).region));
+
+ // add ExchangeNode for other child
+ for (int i = 1; i < mergeSortNodeList.size(); i++) {
+ PlanNode child = mergeSortNodeList.get(i);
+ ExchangeNode exchangeNode =
+ new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
+ exchangeNode.setChild(child);
+ exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+ mergeSortNode.addChild(exchangeNode);
+ }
+
+ return mergeSortNode;
+ }
+
@Override
public PlanNode visitLastQueryMerge(LastQueryMergeNode node, NodeGroupContext context) {
return processMultiChildNode(node, context);
@@ -274,9 +354,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
DeviceViewGroup group = deviceViewGroupMap.computeIfAbsent(region, DeviceViewGroup::new);
group.addChild(device, visitedChild);
}
-
// Generate DeviceViewNode for each group
- List<DeviceViewNode> deviceViewNodeList = new ArrayList<>();
+ List<PlanNode> deviceViewNodeList = new ArrayList<>();
for (DeviceViewGroup group : deviceViewGroupMap.values()) {
DeviceViewNode deviceViewNode =
new DeviceViewNode(
@@ -296,35 +375,8 @@ public class ExchangeNodeAdder extends PlanVisitor<PlanNode, NodeGroupContext> {
deviceViewNodeList.add(deviceViewNode);
}
- if (deviceViewNodeList.size() == 1) {
- return deviceViewNodeList.get(0);
- }
-
- DeviceMergeNode deviceMergeNode =
- new DeviceMergeNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
- node.getDevices());
-
- // Each child of deviceMergeNode has different TRegionReplicaSet, so we can select any one from
- // its child
- deviceMergeNode.addChild(deviceViewNodeList.get(0));
- context.putNodeDistribution(
- deviceMergeNode.getPlanNodeId(),
- new NodeDistribution(
- NodeDistributionType.SAME_WITH_SOME_CHILD,
- context.getNodeDistribution(deviceViewNodeList.get(0).getPlanNodeId()).region));
-
- // Add ExchangeNode for any other child except first one
- for (int i = 1; i < deviceViewNodeList.size(); i++) {
- PlanNode child = deviceViewNodeList.get(i);
- ExchangeNode exchangeNode =
- new ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
- exchangeNode.setChild(child);
- exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
- deviceMergeNode.addChild(exchangeNode);
- }
- return deviceMergeNode;
+ return groupPlanNodeByMergeSortNode(
+ deviceViewNodeList, node.getOutputColumnNames(), node.getMergeOrderParameter(), context);
}
private static class DeviceViewGroup {
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
index 42ace5e178..20d703a945 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/distribution/SourceRewriter.java
@@ -36,11 +36,12 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaFetchS
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.metedata.read.SchemaQueryScanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
@@ -85,7 +86,65 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
@Override
- public PlanNode visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
+ public List<PlanNode> visitMergeSort(MergeSortNode node, DistributionPlanContext context) {
+ MergeSortNode newRoot = cloneMergeSortNodeWithoutChild(node, context);
+ for (int i = 0; i < node.getChildren().size(); i++) {
+ List<PlanNode> rewroteNodes = rewrite(node.getChildren().get(i), context);
+ rewroteNodes.forEach(newRoot::addChild);
+ }
+ return Collections.singletonList(newRoot);
+ }
+
+ private MergeSortNode cloneMergeSortNodeWithoutChild(
+ MergeSortNode node, DistributionPlanContext context) {
+ return new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames());
+ }
+
+ @Override
+ public List<PlanNode> visitSingleDeviceView(
+ SingleDeviceViewNode node, DistributionPlanContext context) {
+
+ if (isAggregationQuery()) {
+ List<PlanNode> rewroteChildren = rewrite(node.getChild(), context);
+ if (rewroteChildren.size() != 1) {
+ throw new IllegalStateException("SingleDeviceViewNode have only one child");
+ }
+ node.setChild(rewroteChildren.get(0));
+ return Collections.singletonList(node);
+ }
+
+ String device = node.getDevice();
+ List<TRegionReplicaSet> regionReplicaSets =
+ analysis.getPartitionInfo(device, analysis.getGlobalTimeFilter());
+
+ List<PlanNode> singleDeviceViewList = new ArrayList<>();
+ for (TRegionReplicaSet tRegionReplicaSet : regionReplicaSets) {
+ singleDeviceViewList.add(
+ buildSingleDeviceViewNodeInRegion(node, tRegionReplicaSet, context.queryContext));
+ }
+
+ return singleDeviceViewList;
+ }
+
+ private PlanNode buildSingleDeviceViewNodeInRegion(
+ PlanNode root, TRegionReplicaSet regionReplicaSet, MPPQueryContext context) {
+ List<PlanNode> children =
+ root.getChildren().stream()
+ .map(child -> buildSingleDeviceViewNodeInRegion(child, regionReplicaSet, context))
+ .collect(Collectors.toList());
+ PlanNode newRoot = root.cloneWithChildren(children);
+ newRoot.setPlanNodeId(context.getQueryId().genPlanNodeId());
+ if (newRoot instanceof SourceNode) {
+ ((SourceNode) newRoot).setRegionReplicaSet(regionReplicaSet);
+ }
+ return newRoot;
+ }
+
+ @Override
+ public List<PlanNode> visitDeviceView(DeviceViewNode node, DistributionPlanContext context) {
checkArgument(
node.getDevices().size() == node.getChildren().size(),
"size of devices and its children in DeviceViewNode should be same");
@@ -109,13 +168,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
relatedDataRegions.addAll(regionReplicaSets);
}
- DeviceMergeNode deviceMergeNode =
- new DeviceMergeNode(
- context.queryContext.getQueryId().genPlanNodeId(),
- node.getMergeOrderParameter(),
- node.getDevices());
-
// Step 2: Iterate all partition and create DeviceViewNode for each region
+ List<PlanNode> deviceViewNodeList = new ArrayList<>();
for (TRegionReplicaSet regionReplicaSet : relatedDataRegions) {
List<String> devices = new ArrayList<>();
List<PlanNode> children = new ArrayList<>();
@@ -129,20 +183,34 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
for (int i = 0; i < devices.size(); i++) {
regionDeviceViewNode.addChildDeviceNode(devices.get(i), children.get(i));
}
- deviceMergeNode.addChild(regionDeviceViewNode);
+ deviceViewNodeList.add(regionDeviceViewNode);
}
- return deviceMergeNode;
+ if (deviceViewNodeList.size() == 1) {
+ return deviceViewNodeList;
+ }
+
+ MergeSortNode mergeSortNode =
+ new MergeSortNode(
+ context.queryContext.getQueryId().genPlanNodeId(),
+ node.getMergeOrderParameter(),
+ node.getOutputColumnNames());
+ for (PlanNode deviceViewNode : deviceViewNodeList) {
+ mergeSortNode.addChild(deviceViewNode);
+ }
+ return Collections.singletonList(mergeSortNode);
}
- private PlanNode processDeviceViewWithAggregation(
+ private List<PlanNode> processDeviceViewWithAggregation(
DeviceViewNode node, DistributionPlanContext context) {
DeviceViewNode newRoot = cloneDeviceViewNodeWithoutChild(node, context);
for (int i = 0; i < node.getDevices().size(); i++) {
- newRoot.addChildDeviceNode(
- node.getDevices().get(i), rewrite(node.getChildren().get(i), context));
+ List<PlanNode> rewroteNode = rewrite(node.getChildren().get(i), context);
+ for (PlanNode planNode : rewroteNode) {
+ newRoot.addChildDeviceNode(node.getDevices().get(i), planNode);
+ }
}
- return newRoot;
+ return Collections.singletonList(newRoot);
}
private DeviceViewNode cloneDeviceViewNodeWithoutChild(
@@ -192,7 +260,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
@Override
- public PlanNode visitSchemaQueryMerge(
+ public List<PlanNode> visitSchemaQueryMerge(
SchemaQueryMergeNode node, DistributionPlanContext context) {
SchemaQueryMergeNode root = (SchemaQueryMergeNode) node.clone();
SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
@@ -263,11 +331,12 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
});
});
}
- return root;
+ return Collections.singletonList(root);
}
@Override
- public PlanNode visitCountMerge(CountSchemaMergeNode node, DistributionPlanContext context) {
+ public List<PlanNode> visitCountMerge(
+ CountSchemaMergeNode node, DistributionPlanContext context) {
CountSchemaMergeNode root = (CountSchemaMergeNode) node.clone();
SchemaQueryScanNode seed = (SchemaQueryScanNode) node.getChildren().get(0);
Set<TRegionReplicaSet> schemaRegions = new HashSet<>();
@@ -287,19 +356,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
schemaQueryScanNode.setRegionReplicaSet(region);
root.addChild(schemaQueryScanNode);
});
- return root;
+ return Collections.singletonList(root);
}
// TODO: (xingtanzjr) a temporary way to resolve the distribution of single SeriesScanNode issue
@Override
- public PlanNode visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
+ public List<PlanNode> visitSeriesScan(SeriesScanNode node, DistributionPlanContext context) {
TimeJoinNode timeJoinNode =
new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
return processRawSeriesScan(node, context, timeJoinNode);
}
@Override
- public PlanNode visitAlignedSeriesScan(
+ public List<PlanNode> visitAlignedSeriesScan(
AlignedSeriesScanNode node, DistributionPlanContext context) {
TimeJoinNode timeJoinNode =
new TimeJoinNode(context.queryContext.getQueryId().genPlanNodeId(), node.getScanOrder());
@@ -307,7 +376,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
@Override
- public PlanNode visitLastQueryScan(LastQueryScanNode node, DistributionPlanContext context) {
+ public List<PlanNode> visitLastQueryScan(
+ LastQueryScanNode node, DistributionPlanContext context) {
LastQueryNode mergeNode =
new LastQueryNode(
context.queryContext.getQueryId().genPlanNodeId(),
@@ -317,7 +387,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
@Override
- public PlanNode visitAlignedLastQueryScan(
+ public List<PlanNode> visitAlignedLastQueryScan(
AlignedLastQueryScanNode node, DistributionPlanContext context) {
LastQueryNode mergeNode =
new LastQueryNode(
@@ -327,19 +397,19 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return processRawSeriesScan(node, context, mergeNode);
}
- private PlanNode processRawSeriesScan(
+ private List<PlanNode> processRawSeriesScan(
SeriesSourceNode node, DistributionPlanContext context, MultiChildProcessNode parent) {
- List<SeriesSourceNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
+ List<PlanNode> sourceNodes = splitSeriesSourceNodeByPartition(node, context);
if (sourceNodes.size() == 1) {
- return sourceNodes.get(0);
+ return sourceNodes;
}
sourceNodes.forEach(parent::addChild);
- return parent;
+ return Collections.singletonList(parent);
}
- private List<SeriesSourceNode> splitSeriesSourceNodeByPartition(
+ private List<PlanNode> splitSeriesSourceNodeByPartition(
SeriesSourceNode node, DistributionPlanContext context) {
- List<SeriesSourceNode> ret = new ArrayList<>();
+ List<PlanNode> ret = new ArrayList<>();
List<TRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
if (dataDistribution.size() == 1) {
@@ -358,24 +428,24 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
@Override
- public PlanNode visitSeriesAggregationScan(
+ public List<PlanNode> visitSeriesAggregationScan(
SeriesAggregationScanNode node, DistributionPlanContext context) {
return processSeriesAggregationSource(node, context);
}
@Override
- public PlanNode visitAlignedSeriesAggregationScan(
+ public List<PlanNode> visitAlignedSeriesAggregationScan(
AlignedSeriesAggregationScanNode node, DistributionPlanContext context) {
return processSeriesAggregationSource(node, context);
}
- private PlanNode processSeriesAggregationSource(
+ private List<PlanNode> processSeriesAggregationSource(
SeriesAggregationSourceNode node, DistributionPlanContext context) {
List<TRegionReplicaSet> dataDistribution =
analysis.getPartitionInfo(node.getPartitionPath(), node.getPartitionTimeFilter());
if (dataDistribution.size() == 1) {
node.setRegionReplicaSet(dataDistribution.get(0));
- return node;
+ return Collections.singletonList(node);
}
List<AggregationDescriptor> leafAggDescriptorList = new ArrayList<>();
node.getAggregationDescriptorList()
@@ -415,11 +485,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
split.setRegionReplicaSet(dataRegion);
aggregationNode.addChild(split);
}
- return aggregationNode;
+ return Collections.singletonList(aggregationNode);
}
@Override
- public PlanNode visitSchemaFetchMerge(
+ public List<PlanNode> visitSchemaFetchMerge(
SchemaFetchMergeNode node, DistributionPlanContext context) {
SchemaFetchMergeNode root = (SchemaFetchMergeNode) node.clone();
Map<String, Set<TRegionReplicaSet>> storageGroupSchemaRegionMap = new HashMap<>();
@@ -444,11 +514,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
root.addChild(schemaFetchScanNode);
}
}
- return root;
+ return Collections.singletonList(root);
}
@Override
- public PlanNode visitLastQuery(LastQueryNode node, DistributionPlanContext context) {
+ public List<PlanNode> visitLastQuery(LastQueryNode node, DistributionPlanContext context) {
// For last query, we need to keep every FI's root node is LastQueryMergeNode. So we
// force every region group have a parent node even if there is only 1 child for it.
context.setForceAddParent(true);
@@ -456,9 +526,9 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
if (context.queryMultiRegion) {
PlanNode newRoot = genLastQueryRootNode(node, context);
root.getChildren().forEach(newRoot::addChild);
- return newRoot;
+ return Collections.singletonList(newRoot);
} else {
- return root;
+ return Collections.singletonList(root);
}
}
@@ -471,14 +541,14 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
@Override
- public PlanNode visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
+ public List<PlanNode> visitTimeJoin(TimeJoinNode node, DistributionPlanContext context) {
// Although some logic is similar between Aggregation and RawDataQuery,
// we still use separate method to process the distribution planning now
// to make the planning procedure more clear
if (containsAggregationSource(node)) {
return planAggregationWithTimeJoin(node, context);
}
- return processRawMultiChildNode(node, context);
+ return Collections.singletonList(processRawMultiChildNode(node, context));
}
private PlanNode processRawMultiChildNode(
@@ -557,7 +627,8 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// In a general logical query plan, the children of TimeJoinNode should only be
// SeriesScanNode or SeriesAggregateScanNode
// So this branch should not be touched.
- root.addChild(visit(child, context));
+ List<PlanNode> children = visit(child, context);
+ children.forEach(root::addChild);
}
}
return root;
@@ -579,16 +650,17 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// This method is only used to process the PlanNodeTree whose root is SlidingWindowAggregationNode
@Override
- public PlanNode visitSlidingWindowAggregation(
+ public List<PlanNode> visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, DistributionPlanContext context) {
DistributionPlanContext childContext = context.copy().setRoot(false);
- PlanNode child = visit(node.getChild(), childContext);
+ List<PlanNode> children = visit(node.getChild(), childContext);
PlanNode newRoot = node.clone();
- newRoot.addChild(child);
- return newRoot;
+ children.forEach(newRoot::addChild);
+ return Collections.singletonList(newRoot);
}
- private PlanNode planAggregationWithTimeJoin(TimeJoinNode root, DistributionPlanContext context) {
+ private List<PlanNode> planAggregationWithTimeJoin(
+ TimeJoinNode root, DistributionPlanContext context) {
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup;
// construct newRoot
@@ -676,11 +748,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
}
});
- return newRoot;
+ return Collections.singletonList(newRoot);
}
@Override
- public PlanNode visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
+ public List<PlanNode> visitGroupByLevel(GroupByLevelNode root, DistributionPlanContext context) {
if (shouldUseNaiveAggregation(root)) {
return defaultRewrite(root, context);
}
@@ -703,11 +775,11 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
// Then, we calculate the attributes for GroupByLevelNode in each level
calculateGroupByLevelNodeAttributes(newRoot, 0, context);
- return newRoot;
+ return Collections.singletonList(newRoot);
}
@Override
- public PlanNode visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
+ public List<PlanNode> visitGroupByTag(GroupByTagNode root, DistributionPlanContext context) {
Map<TRegionReplicaSet, List<SeriesAggregationSourceNode>> sourceGroup =
splitAggregationSourceByPartition(root, context);
@@ -716,10 +788,14 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
&& root.getChildren().get(0) instanceof SlidingWindowAggregationNode;
// TODO: use 2 phase aggregation to optimize the query
- return containsSlidingWindow
- ? groupSourcesForGroupByTagWithSlidingWindow(
- root, (SlidingWindowAggregationNode) root.getChildren().get(0), sourceGroup, context)
- : groupSourcesForGroupByTag(root, sourceGroup, context);
+ return Collections.singletonList(
+ containsSlidingWindow
+ ? groupSourcesForGroupByTagWithSlidingWindow(
+ root,
+ (SlidingWindowAggregationNode) root.getChildren().get(0),
+ sourceGroup,
+ context)
+ : groupSourcesForGroupByTag(root, sourceGroup, context));
}
// If the Aggregation Query contains value filter, we need to use the naive query plan
@@ -1040,7 +1116,7 @@ public class SourceRewriter extends SimplePlanNodeRewriter<DistributionPlanConte
return ret;
}
- public PlanNode visit(PlanNode node, DistributionPlanContext context) {
+ public List<PlanNode> visit(PlanNode node, DistributionPlanContext context) {
return node.accept(this, context);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
index 1ea9a94440..58bbe7b371 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanGraphPrinter.java
@@ -34,7 +34,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -54,6 +56,7 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationDescriptor
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.CrossSeriesAggregationDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.DeviceViewIntoPathDescriptor;
import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.IntoPathDescriptor;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.commons.lang3.Validate;
@@ -145,6 +148,14 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitSingleDeviceView(SingleDeviceViewNode node, GraphContext context) {
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("SingleDeviceView-%s", node.getPlanNodeId().getId()));
+ boxValue.add(String.format("DeviceName: %s", node.getDevice()));
+ return render(node, boxValue, context);
+ }
+
@Override
public List<String> visitDeviceView(DeviceViewNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
@@ -153,6 +164,20 @@ public class PlanGraphPrinter extends PlanVisitor<List<String>, PlanGraphPrinter
return render(node, boxValue, context);
}
+ @Override
+ public List<String> visitMergeSort(MergeSortNode node, GraphContext context) {
+ List<String> boxValue = new ArrayList<>();
+ boxValue.add(String.format("MergeSort-%s", node.getPlanNodeId().getId()));
+ boxValue.add(String.format("ChildrenCount: %d", node.getChildren().size()));
+ StringBuilder sortInfo = new StringBuilder("Order:");
+ for (SortItem sortItem : node.getMergeOrderParameter().getSortItemList()) {
+ sortInfo.append(" ");
+ sortInfo.append(sortItem.getSortKey()).append(" ").append(sortItem.getOrdering());
+ }
+ boxValue.add(sortInfo.toString());
+ return render(node, boxValue, context);
+ }
+
@Override
public List<String> visitDeviceMerge(DeviceMergeNode node, GraphContext context) {
List<String> boxValue = new ArrayList<>();
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
index 0f846ad25b..70999dc816 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanNodeType.java
@@ -58,8 +58,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -152,7 +154,9 @@ public enum PlanNodeType {
DEACTIVATE_TEMPLATE_NODE((short) 61),
INTO((short) 62),
DEVICE_VIEW_INTO((short) 63),
- VERTICALLY_CONCAT((short) 64);
+ VERTICALLY_CONCAT((short) 64),
+ SINGLE_DEVICE_VIEW((short) 65),
+ MERGE_SORT((short) 66);
public static final int BYTES = Short.BYTES;
@@ -331,6 +335,10 @@ public enum PlanNodeType {
return DeviceViewIntoNode.deserialize(buffer);
case 64:
return VerticallyConcatNode.deserialize(buffer);
+ case 65:
+ return SingleDeviceViewNode.deserialize(buffer);
+ case 66:
+ return MergeSortNode.deserialize(buffer);
default:
throw new IllegalArgumentException("Invalid node type: " + nodeType);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
index 8bf41c86ec..f7eb7d5704 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/PlanVisitor.java
@@ -56,8 +56,10 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByTagNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.IntoNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.LimitNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.OffsetNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.ProjectNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SingleDeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
@@ -328,6 +330,14 @@ public abstract class PlanVisitor<R, C> {
return visitPlan(node, context);
}
+ public R visitSingleDeviceView(SingleDeviceViewNode node, C context) {
+ return visitPlan(node, context);
+ }
+
+ public R visitMergeSort(MergeSortNode node, C context) {
+ return visitPlan(node, context);
+ }
+
public R visitVerticallyConcat(VerticallyConcatNode node, C context) {
return visitPlan(node, context);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java
index d7d77ac3ee..2737a22021 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/SimplePlanNodeRewriter.java
@@ -19,30 +19,34 @@
package org.apache.iotdb.db.mpp.plan.planner.plan.node;
+import java.util.Collections;
import java.util.List;
import static com.google.common.collect.ImmutableList.toImmutableList;
-public class SimplePlanNodeRewriter<C> extends PlanVisitor<PlanNode, C> {
+public class SimplePlanNodeRewriter<C> extends PlanVisitor<List<PlanNode>, C> {
@Override
- public PlanNode visitPlan(PlanNode node, C context) {
+ public List<PlanNode> visitPlan(PlanNode node, C context) {
// TODO: (xingtanzjr) we apply no action for IWritePlanNode currently
if (node instanceof WritePlanNode) {
- return node;
+ return Collections.singletonList(node);
}
return defaultRewrite(node, context);
}
- public PlanNode defaultRewrite(PlanNode node, C context) {
- List<PlanNode> children =
+ public List<PlanNode> defaultRewrite(PlanNode node, C context) {
+ List<List<PlanNode>> children =
node.getChildren().stream()
.map(child -> rewrite(child, context))
.collect(toImmutableList());
-
- return node.cloneWithChildren(children);
+ PlanNode newNode = node.clone();
+ for (List<PlanNode> planNodes : children) {
+ planNodes.forEach(newNode::addChild);
+ }
+ return Collections.singletonList(newNode);
}
- public PlanNode rewrite(PlanNode node, C userContext) {
+ public List<PlanNode> rewrite(PlanNode node, C userContext) {
return node.accept(this, userContext);
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
new file mode 100644
index 0000000000..564a5dbfa4
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/MergeSortNode.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.OrderByParameter;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class MergeSortNode extends MultiChildProcessNode {
+
+ private final OrderByParameter mergeOrderParameter;
+
+ private final List<String> outputColumns;
+
+ public MergeSortNode(
+ PlanNodeId id, OrderByParameter mergeOrderParameter, List<String> outputColumns) {
+ super(id);
+ this.mergeOrderParameter = mergeOrderParameter;
+ this.outputColumns = outputColumns;
+ }
+
+ public OrderByParameter getMergeOrderParameter() {
+ return mergeOrderParameter;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new MergeSortNode(getPlanNodeId(), getMergeOrderParameter(), outputColumns);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return outputColumns;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitMergeSort(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.MERGE_SORT.serialize(byteBuffer);
+ mergeOrderParameter.serializeAttributes(byteBuffer);
+ ReadWriteIOUtils.write(outputColumns.size(), byteBuffer);
+ for (String column : outputColumns) {
+ ReadWriteIOUtils.write(column, byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.MERGE_SORT.serialize(stream);
+ mergeOrderParameter.serializeAttributes(stream);
+ ReadWriteIOUtils.write(outputColumns.size(), stream);
+ for (String column : outputColumns) {
+ ReadWriteIOUtils.write(column, stream);
+ }
+ }
+
+ public static MergeSortNode deserialize(ByteBuffer byteBuffer) {
+ OrderByParameter orderByParameter = OrderByParameter.deserialize(byteBuffer);
+ int columnSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<String> outputColumns = new ArrayList<>();
+ while (columnSize > 0) {
+ outputColumns.add(ReadWriteIOUtils.readString(byteBuffer));
+ columnSize--;
+ }
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new MergeSortNode(planNodeId, orderByParameter, outputColumns);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ MergeSortNode that = (MergeSortNode) o;
+ return Objects.equals(mergeOrderParameter, that.getMergeOrderParameter());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), mergeOrderParameter);
+ }
+
+ @Override
+ public String toString() {
+ return "MergeSort-" + this.getPlanNodeId();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleDeviceViewNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleDeviceViewNode.java
new file mode 100644
index 0000000000..caefda0455
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/process/SingleDeviceViewNode.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.plan.planner.plan.node.process;
+
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeType;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanVisitor;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+public class SingleDeviceViewNode extends SingleChildProcessNode {
+
+ private final String device;
+
+ // To reduce memory cost, SingleDeviceViewNode doesn't serialize and deserialize
+ // outputColumnNames.
+ // It just rebuilds using the infos from parent node.
+ private List<String> outputColumnNames;
+ private final List<Integer> deviceToMeasurementIndexes;
+
+ public SingleDeviceViewNode(
+ PlanNodeId id,
+ List<String> outputColumnNames,
+ String device,
+ List<Integer> deviceToMeasurementIndexes) {
+ super(id);
+ this.device = device;
+ this.outputColumnNames = outputColumnNames;
+ this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
+ }
+
+ public SingleDeviceViewNode(
+ PlanNodeId id, String device, List<Integer> deviceToMeasurementIndexes) {
+ super(id);
+ this.device = device;
+ this.deviceToMeasurementIndexes = deviceToMeasurementIndexes;
+ }
+
+ @Override
+ public PlanNode clone() {
+ return new SingleDeviceViewNode(
+ getPlanNodeId(), outputColumnNames, device, deviceToMeasurementIndexes);
+ }
+
+ @Override
+ public List<String> getOutputColumnNames() {
+ return outputColumnNames;
+ }
+
+ public String getDevice() {
+ return device;
+ }
+
+ public List<Integer> getDeviceToMeasurementIndexes() {
+ return deviceToMeasurementIndexes;
+ }
+
+ @Override
+ public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
+ return visitor.visitSingleDeviceView(this, context);
+ }
+
+ @Override
+ protected void serializeAttributes(ByteBuffer byteBuffer) {
+ PlanNodeType.SINGLE_DEVICE_VIEW.serialize(byteBuffer);
+ ReadWriteIOUtils.write(device, byteBuffer);
+ ReadWriteIOUtils.write(deviceToMeasurementIndexes.size(), byteBuffer);
+ for (Integer index : deviceToMeasurementIndexes) {
+ ReadWriteIOUtils.write(index, byteBuffer);
+ }
+ }
+
+ @Override
+ protected void serializeAttributes(DataOutputStream stream) throws IOException {
+ PlanNodeType.SINGLE_DEVICE_VIEW.serialize(stream);
+ ReadWriteIOUtils.write(device, stream);
+ ReadWriteIOUtils.write(deviceToMeasurementIndexes.size(), stream);
+ for (Integer index : deviceToMeasurementIndexes) {
+ ReadWriteIOUtils.write(index, stream);
+ }
+ }
+
+ public static SingleDeviceViewNode deserialize(ByteBuffer byteBuffer) {
+ String device = ReadWriteIOUtils.readString(byteBuffer);
+ int listSize = ReadWriteIOUtils.readInt(byteBuffer);
+ List<Integer> deviceToMeasurementIndexes = new ArrayList<>(listSize);
+ while (listSize > 0) {
+ deviceToMeasurementIndexes.add(ReadWriteIOUtils.readInt(byteBuffer));
+ listSize--;
+ }
+
+ PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
+ return new SingleDeviceViewNode(planNodeId, device, deviceToMeasurementIndexes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || o.getClass() != getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ SingleDeviceViewNode that = (SingleDeviceViewNode) o;
+ return device.equals(that.device)
+ && outputColumnNames.equals(that.outputColumnNames)
+ && deviceToMeasurementIndexes.equals(that.deviceToMeasurementIndexes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), device, outputColumnNames, deviceToMeasurementIndexes);
+ }
+
+ @Override
+ public String toString() {
+ return "SingleDeviceView-" + this.getPlanNodeId();
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
index 3c1d10220a..a3784773ad 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/component/OrderByComponent.java
@@ -46,7 +46,6 @@ public class OrderByComponent extends StatementNode {
public void addSortItem(SortItem sortItem) {
this.sortItemList.add(sortItem);
-
if (sortItem.getSortKey() == SortKey.TIME) {
orderByTime = true;
timeOrderPriority = sortItemList.size() - 1;
@@ -86,7 +85,7 @@ public class OrderByComponent extends StatementNode {
}
public Ordering getDeviceOrder() {
- checkState(timeOrderPriority != -1, "The device order is not specified.");
+ checkState(deviceOrderPriority != -1, "The device order is not specified.");
return sortItemList.get(deviceOrderPriority).getOrdering();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
index 049c955829..f8c84f38e5 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/statement/crud/QueryStatement.java
@@ -299,6 +299,13 @@ public class QueryStatement extends Statement {
return orderByComponent.getTimeOrder();
}
+ public Ordering getResultDeviceOrder() {
+ if (orderByComponent == null || !orderByComponent.isOrderByDevice()) {
+ return Ordering.ASC;
+ }
+ return orderByComponent.getDeviceOrder();
+ }
+
public List<SortItem> getSortItemList() {
if (orderByComponent == null) {
return Collections.emptyList();
@@ -398,10 +405,6 @@ public class QueryStatement extends Statement {
if (isOrderByTimeseries()) {
throw new SemanticException("Sorting by timeseries is only supported in last queries.");
}
- if (isOrderByDevice()) {
- // TODO support sort by device
- throw new SemanticException("Sorting by device is not yet supported.");
- }
}
if (isLastQuery()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
new file mode 100644
index 0000000000..f80c9c8eb2
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortHeap.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import java.util.Arrays;
+import java.util.Comparator;
+
+public class MergeSortHeap {
+ private final MergeSortKey[] heap;
+ private int heapSize;
+
+ private final Comparator<MergeSortKey> comparator;
+
+ public MergeSortHeap(int childNum, Comparator<MergeSortKey> comparator) {
+ this.heap = new MergeSortKey[childNum];
+ this.heapSize = 0;
+ this.comparator = comparator;
+ }
+
+ public boolean isEmpty() {
+ return heapSize == 0;
+ }
+
+ public void push(MergeSortKey mergeSortKey) {
+ if (heapSize == 0) {
+ heap[0] = mergeSortKey;
+ }
+ shiftUp(heapSize, mergeSortKey);
+ ++heapSize;
+ }
+
+ public MergeSortKey poll() {
+ MergeSortKey res = heap[0];
+ heap[0] = heap[heapSize - 1];
+ shiftDown(0, heap[0]);
+ heapSize--;
+ return res;
+ }
+
+ public MergeSortKey peek() {
+ return heap[0];
+ }
+
+ @Override
+ public String toString() {
+ return Arrays.toString(heap);
+ }
+
+ private int getSmallerChildIndex(int index) {
+ final int leftChildIndex = (index << 1) + 1;
+ final int rightChildIndex = (index << 1) + 2;
+
+ int smallerChildIndex;
+ if (heapSize <= leftChildIndex) {
+ smallerChildIndex = -1;
+ } else if (heapSize <= rightChildIndex) {
+ smallerChildIndex = leftChildIndex;
+ } else {
+ smallerChildIndex =
+ comparator.compare(heap[leftChildIndex], heap[rightChildIndex]) > 0
+ ? rightChildIndex
+ : leftChildIndex;
+ }
+ return smallerChildIndex;
+ }
+
+ private void shiftDown(int parentIndex, MergeSortKey parent) {
+ if (parentIndex == heapSize - 1) return;
+
+ int childIndex = getSmallerChildIndex(parentIndex);
+
+ if (childIndex != -1) {
+ MergeSortKey child = heap[childIndex];
+ if (comparator.compare(parent, child) > 0) {
+ heap[parentIndex] = child;
+ heap[childIndex] = parent;
+ shiftDown(childIndex, parent);
+ }
+ }
+ }
+
+ private void shiftUp(int childIndex, MergeSortKey child) {
+ if (childIndex == 0) return;
+
+ int parentIndex = (childIndex - 1) >>> 1;
+ MergeSortKey parent = heap[parentIndex];
+
+ if (comparator.compare(parent, child) > 0) {
+ heap[parentIndex] = child;
+ heap[childIndex] = parent;
+ shiftUp(parentIndex, child);
+ } else {
+ heap[childIndex] = child;
+ }
+ }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
new file mode 100644
index 0000000000..542501d51c
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortKey.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils.datastructure;
+
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+
+public class MergeSortKey {
+
+ public TsBlock tsBlock;
+ public int rowIndex;
+
+ public int columnIndex;
+
+ public MergeSortKey(TsBlock tsBlock, int rowIndex) {
+ this.tsBlock = tsBlock;
+ this.rowIndex = rowIndex;
+ }
+
+ public MergeSortKey(TsBlock tsBlock, int rowIndex, int columnIndex) {
+ this.tsBlock = tsBlock;
+ this.rowIndex = rowIndex;
+ this.columnIndex = columnIndex;
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
new file mode 100644
index 0000000000..cd5831502e
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/MergeSortOperatorTest.java
@@ -0,0 +1,1475 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.DeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.MergeSortOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.DescTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.MergeSortComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortItem;
+import org.apache.iotdb.db.mpp.plan.statement.component.SortKey;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class MergeSortOperatorTest {
+
+ private static final String MERGE_SORT_OPERATOR_TEST_SG = "root.MergeSortOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ private static final String DEVICE0 = MERGE_SORT_OPERATOR_TEST_SG + ".device0";
+ private static final String DEVICE1 = MERGE_SORT_OPERATOR_TEST_SG + ".device1";
+ private static final String DEVICE2 = MERGE_SORT_OPERATOR_TEST_SG + ".device2";
+ private static final String DEVICE3 = MERGE_SORT_OPERATOR_TEST_SG + ".device3";
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas, deviceIds, seqResources, unSeqResources, MERGE_SORT_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ long getValue(long expectedTime) {
+ if (expectedTime < 200) {
+ return 20000 + expectedTime;
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ return 10000 + expectedTime;
+ } else {
+ return expectedTime;
+ }
+ }
+
+ // ------------------------------------------------------------------------------------------------
+ // order by time - 1
+ // ------------------------------------------------------------------------------------------------
+ // MergeSortOperator
+ // ____________|_______________
+ // / | \
+ // SingleDeviceViewOperator SingleDeviceViewOperator SingleDeviceViewOperator
+ // / | \
+ // SeriesScanOperator TimeJoinOperator TimeJoinOperator
+ // / \ / \
+ // SeriesScanOperator SeriesScanOperator SeriesScanOperator SeriesScanOperator
+ // ------------------------------------------------------------------------------------------------
+ public MergeSortOperator mergeSortOperatorTest(Ordering timeOrdering, Ordering deviceOrdering) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 6, new PlanNodeId("6"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 7, new PlanNodeId("7"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 8, new PlanNodeId("8"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 10, new PlanNodeId("10"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 11, new PlanNodeId("11"), MergeSortOperator.class.getSimpleName());
+
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath3 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+ MeasurementPath measurementPath4 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath5 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator3 =
+ new SeriesScanOperator(
+ planNodeId3,
+ measurementPath3,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator3
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator4 =
+ new SeriesScanOperator(
+ planNodeId4,
+ measurementPath4,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator4
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator5 =
+ new SeriesScanOperator(
+ planNodeId5,
+ measurementPath5,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator5
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ List<TSDataType> tsDataTypes =
+ new LinkedList<>(
+ Arrays.asList(
+ TSDataType.TEXT,
+ TSDataType.INT32,
+ TSDataType.INT32,
+ TSDataType.INT32,
+ TSDataType.INT32,
+ TSDataType.INT32));
+ SingleDeviceViewOperator singleDeviceViewOperator1 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ DEVICE0,
+ seriesScanOperator1,
+ Collections.singletonList(1),
+ tsDataTypes);
+
+ TimeJoinOperator timeJoinOperator1 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(6),
+ Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+ SingleDeviceViewOperator singleDeviceViewOperator2 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(7),
+ DEVICE1,
+ timeJoinOperator1,
+ Arrays.asList(2, 3),
+ tsDataTypes);
+
+ TimeJoinOperator timeJoinOperator2 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(8),
+ Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+ SingleDeviceViewOperator singleDeviceViewOperator3 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(9),
+ DEVICE2,
+ timeJoinOperator2,
+ Arrays.asList(4, 5),
+ tsDataTypes);
+
+ return new MergeSortOperator(
+ fragmentInstanceContext.getOperatorContexts().get(10),
+ Arrays.asList(
+ singleDeviceViewOperator1, singleDeviceViewOperator2, singleDeviceViewOperator3),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering))));
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ return null;
+ }
+ }
+
+ @Test
+ public void testOrderByTime1() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.ASC);
+ long lastTime = -1;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(6, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ // make sure the device column is by asc
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 2);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 1500);
+ }
+
+ @Test
+ public void testOrderByTime2() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.ASC, Ordering.DESC);
+ long lastTime = -1;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(6, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ // make sure the device column is by desc
+ assertEquals(checkDevice, 2);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 1500);
+ }
+
+ @Test
+ public void testOrderByTime3() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.DESC);
+ long lastTime = Long.MAX_VALUE;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(6, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ // make sure the device column is by desc
+ assertEquals(checkDevice, 2);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 1500);
+ }
+
+ @Test
+ public void testOrderByTime4() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest(Ordering.DESC, Ordering.ASC);
+ long lastTime = Long.MAX_VALUE;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(6, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ // make sure the device column is by asc
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(4).isNull(i));
+ assertTrue(tsBlock.getColumn(5).isNull(i));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(3).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertTrue(tsBlock.getColumn(1).isNull(i));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ assertEquals(tsBlock.getColumn(4).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(5).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 2);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 1500);
+ }
+
+ // ------------------------------------------------------------------------------------------------
+ // order by time - 2
+ // ------------------------------------------------------------------------------------------------
+ // [SSO]:SeriesScanOperator
+ // [SDO]:SingleDeviceOperator
+ // MergeSortOperator
+ // ___________|___________
+ // / \
+ // MergeSortOperator MergeSortOperator
+ // / | / \
+ // [SDO] [SDO] [SDO] [SDO]
+ // | | | |
+ // [SSO] TimeJoinOperator TimeJoinOperator TimeJoinOperator
+ // / \ / \ / \
+ // [SSO] [SSO] [SSO] [SSO] [SSO] [SSO]
+ // ------------------------------------------------------------------------------------------------
+ public MergeSortOperator mergeSortOperatorTest2(Ordering timeOrdering, Ordering deviceOrdering) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId6 = new PlanNodeId("6");
+ fragmentInstanceContext.addOperatorContext(
+ 6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId7 = new PlanNodeId("7");
+ fragmentInstanceContext.addOperatorContext(
+ 7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+
+ fragmentInstanceContext.addOperatorContext(
+ 8, new PlanNodeId("8"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 10, new PlanNodeId("10"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 11, new PlanNodeId("11"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 12, new PlanNodeId("12"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 13, new PlanNodeId("13"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 14, new PlanNodeId("14"), SingleDeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 15, new PlanNodeId("15"), MergeSortOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 16, new PlanNodeId("16"), MergeSortOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 17, new PlanNodeId("17"), MergeSortOperator.class.getSimpleName());
+
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath3 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+ MeasurementPath measurementPath4 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath5 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+ MeasurementPath measurementPath6 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath7 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor1", TSDataType.INT32);
+
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator3 =
+ new SeriesScanOperator(
+ planNodeId3,
+ measurementPath3,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator3
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator4 =
+ new SeriesScanOperator(
+ planNodeId4,
+ measurementPath4,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator4
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator5 =
+ new SeriesScanOperator(
+ planNodeId5,
+ measurementPath5,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator5
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator6 =
+ new SeriesScanOperator(
+ planNodeId6,
+ measurementPath6,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator6
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator7 =
+ new SeriesScanOperator(
+ planNodeId7,
+ measurementPath7,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(6),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator7
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ List<TSDataType> tsDataTypes =
+ new LinkedList<>(Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32));
+
+ TimeJoinOperator timeJoinOperator1 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(7),
+ Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+ TimeJoinOperator timeJoinOperator2 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(8),
+ Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+ TimeJoinOperator timeJoinOperator3 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(9),
+ Arrays.asList(seriesScanOperator6, seriesScanOperator7),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+ SingleDeviceViewOperator singleDeviceViewOperator1 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(10),
+ DEVICE0,
+ seriesScanOperator1,
+ Collections.singletonList(1),
+ tsDataTypes);
+ SingleDeviceViewOperator singleDeviceViewOperator2 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(11),
+ DEVICE1,
+ timeJoinOperator1,
+ Arrays.asList(1, 2),
+ tsDataTypes);
+ SingleDeviceViewOperator singleDeviceViewOperator3 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(12),
+ DEVICE2,
+ timeJoinOperator2,
+ Arrays.asList(1, 2),
+ tsDataTypes);
+ SingleDeviceViewOperator singleDeviceViewOperator4 =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(13),
+ DEVICE3,
+ timeJoinOperator3,
+ Arrays.asList(1, 2),
+ tsDataTypes);
+
+ MergeSortOperator mergeSortOperator1 =
+ new MergeSortOperator(
+ fragmentInstanceContext.getOperatorContexts().get(14),
+ Arrays.asList(singleDeviceViewOperator1, singleDeviceViewOperator2),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering))));
+ MergeSortOperator mergeSortOperator2 =
+ new MergeSortOperator(
+ fragmentInstanceContext.getOperatorContexts().get(15),
+ Arrays.asList(singleDeviceViewOperator3, singleDeviceViewOperator4),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering))));
+
+ return new MergeSortOperator(
+ fragmentInstanceContext.getOperatorContexts().get(16),
+ Arrays.asList(mergeSortOperator1, mergeSortOperator2),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.TIME, timeOrdering),
+ new SortItem(SortKey.DEVICE, deviceOrdering))));
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ return null;
+ }
+ }
+
+ @Test
+ public void testOrderByTime1_2() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.ASC);
+ long lastTime = -1;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by asc
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 2);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 3);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+
+ @Test
+ public void testOrderByTime2_2() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.ASC, Ordering.DESC);
+ long lastTime = -1;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 2);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by desc
+ assertEquals(checkDevice, 3);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+
+ @Test
+ public void testOrderByTime3_2() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.DESC);
+ long lastTime = Long.MAX_VALUE;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 2);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by desc
+ assertEquals(checkDevice, 3);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+
+ @Test
+ public void testOrderByTime4_2() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest2(Ordering.DESC, Ordering.ASC);
+ long lastTime = Long.MAX_VALUE;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by asc
+ assertEquals(checkDevice, 0);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 1);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 2);
+ checkDevice++;
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertEquals(checkDevice, 3);
+ checkDevice = 0;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+ // ------------------------------------------------------------------------------------------------
+ // order by device
+ // ------------------------------------------------------------------------------------------------
+ // [SSO]:SeriesScanOperator
+ // MergeSortOperator
+ // ___________|___________
+ // / \
+ // DeviceViewOperator DeviceViewOperator
+ // / | / \
+ // [SSO] TimeJoinOperator TimeJoinOperator TimeJoinOperator
+ // / \ / \ / \
+ // [SSO] [SSO] [SSO] [SSO] [SSO] [SSO]
+ // ------------------------------------------------------------------------------------------------
+ public MergeSortOperator mergeSortOperatorTest3(Ordering timeOrdering, Ordering deviceOrdering) {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // Construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId3 = new PlanNodeId("3");
+ fragmentInstanceContext.addOperatorContext(
+ 3, planNodeId3, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId4 = new PlanNodeId("4");
+ fragmentInstanceContext.addOperatorContext(
+ 4, planNodeId4, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId5 = new PlanNodeId("5");
+ fragmentInstanceContext.addOperatorContext(
+ 5, planNodeId5, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId6 = new PlanNodeId("6");
+ fragmentInstanceContext.addOperatorContext(
+ 6, planNodeId6, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId7 = new PlanNodeId("7");
+ fragmentInstanceContext.addOperatorContext(
+ 7, planNodeId7, SeriesScanOperator.class.getSimpleName());
+
+ fragmentInstanceContext.addOperatorContext(
+ 8, new PlanNodeId("8"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 9, new PlanNodeId("9"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 10, new PlanNodeId("10"), TimeJoinOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 11, new PlanNodeId("11"), DeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 12, new PlanNodeId("12"), DeviceViewOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 13, new PlanNodeId("13"), MergeSortOperator.class.getSimpleName());
+
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath3 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device1.sensor1", TSDataType.INT32);
+ MeasurementPath measurementPath4 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath5 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device2.sensor1", TSDataType.INT32);
+ MeasurementPath measurementPath6 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath7 =
+ new MeasurementPath(MERGE_SORT_OPERATOR_TEST_SG + ".device3.sensor1", TSDataType.INT32);
+
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator3 =
+ new SeriesScanOperator(
+ planNodeId3,
+ measurementPath3,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator3.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator3
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator4 =
+ new SeriesScanOperator(
+ planNodeId4,
+ measurementPath4,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator4.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator4
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator5 =
+ new SeriesScanOperator(
+ planNodeId5,
+ measurementPath5,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(4),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator5.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator5
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator6 =
+ new SeriesScanOperator(
+ planNodeId6,
+ measurementPath6,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(5),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator6.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator6
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+ SeriesScanOperator seriesScanOperator7 =
+ new SeriesScanOperator(
+ planNodeId7,
+ measurementPath7,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(6),
+ null,
+ null,
+ timeOrdering == Ordering.ASC);
+ seriesScanOperator7.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator7
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ List<TSDataType> tsDataTypes =
+ new LinkedList<>(Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32));
+
+ TimeJoinOperator timeJoinOperator1 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(7),
+ Arrays.asList(seriesScanOperator2, seriesScanOperator3),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+ TimeJoinOperator timeJoinOperator2 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(8),
+ Arrays.asList(seriesScanOperator4, seriesScanOperator5),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+ TimeJoinOperator timeJoinOperator3 =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(9),
+ Arrays.asList(seriesScanOperator6, seriesScanOperator7),
+ timeOrdering,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(
+ new InputLocation(0, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator()),
+ new SingleColumnMerger(
+ new InputLocation(1, 0),
+ timeOrdering == Ordering.ASC
+ ? new AscTimeComparator()
+ : new DescTimeComparator())),
+ timeOrdering == Ordering.ASC ? new AscTimeComparator() : new DescTimeComparator());
+
+ List<String> devices = new ArrayList<>(Arrays.asList(DEVICE0, DEVICE1, DEVICE2, DEVICE3));
+ if (deviceOrdering == Ordering.DESC) Collections.reverse(devices);
+ List<List<Integer>> deviceColumnIndex = new ArrayList<>();
+ deviceColumnIndex.add(Collections.singletonList(1));
+ deviceColumnIndex.add(Arrays.asList(1, 2));
+ if (deviceOrdering == Ordering.DESC) Collections.reverse(deviceColumnIndex);
+ DeviceViewOperator deviceViewOperator1 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(10),
+ deviceOrdering == Ordering.ASC
+ ? Arrays.asList(DEVICE0, DEVICE1)
+ : Arrays.asList(DEVICE1, DEVICE0),
+ deviceOrdering == Ordering.ASC
+ ? Arrays.asList(seriesScanOperator1, timeJoinOperator1)
+ : Arrays.asList(timeJoinOperator1, seriesScanOperator1),
+ deviceColumnIndex,
+ tsDataTypes);
+ deviceColumnIndex = new ArrayList<>();
+ deviceColumnIndex.add(Arrays.asList(1, 2));
+ deviceColumnIndex.add(Arrays.asList(1, 2));
+ DeviceViewOperator deviceViewOperator2 =
+ new DeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(11),
+ deviceOrdering == Ordering.ASC
+ ? Arrays.asList(DEVICE2, DEVICE3)
+ : Arrays.asList(DEVICE3, DEVICE2),
+ deviceOrdering == Ordering.ASC
+ ? Arrays.asList(timeJoinOperator2, timeJoinOperator3)
+ : Arrays.asList(timeJoinOperator3, timeJoinOperator2),
+ deviceColumnIndex,
+ tsDataTypes);
+ return new MergeSortOperator(
+ fragmentInstanceContext.getOperatorContexts().get(12),
+ Arrays.asList(deviceViewOperator1, deviceViewOperator2),
+ tsDataTypes,
+ MergeSortComparator.getComparator(
+ Arrays.asList(
+ new SortItem(SortKey.DEVICE, deviceOrdering),
+ new SortItem(SortKey.TIME, timeOrdering))));
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ return null;
+ }
+ }
+
+ @Test
+ public void testOrderByDevice1() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.ASC);
+ long lastTime = -1;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by asc
+ assertTrue(checkDevice < 500);
+ checkDevice++;
+ if (checkDevice == 500) {
+ lastTime = -1;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 500 && checkDevice < 1000);
+ checkDevice++;
+ if (checkDevice == 1000) {
+ lastTime = -1;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+ checkDevice++;
+ if (checkDevice == 1500) {
+ lastTime = -1;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+ checkDevice++;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+
+ @Test
+ public void testOrderByDevice2() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.ASC, Ordering.DESC);
+ long lastTime = -1;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) >= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice < 500);
+ checkDevice++;
+ if (checkDevice == 500) {
+ lastTime = -1;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 500 && checkDevice < 1000);
+ checkDevice++;
+ if (checkDevice == 1000) {
+ lastTime = -1;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+ checkDevice++;
+ if (checkDevice == 1500) {
+ lastTime = -1;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by desc
+ assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+ checkDevice++;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+
+ @Test
+ public void testOrderByDevice3() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.ASC);
+ long lastTime = Long.MAX_VALUE;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by asc
+ assertTrue(checkDevice < 500);
+ checkDevice++;
+ if (checkDevice == 500) {
+ lastTime = Long.MAX_VALUE;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 500 && checkDevice < 1000);
+ checkDevice++;
+ if (checkDevice == 1000) {
+ lastTime = Long.MAX_VALUE;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+ checkDevice++;
+ if (checkDevice == 1500) {
+ lastTime = Long.MAX_VALUE;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+ checkDevice++;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+
+ @Test
+ public void testOrderByDevice4() {
+ MergeSortOperator mergeSortOperator = mergeSortOperatorTest3(Ordering.DESC, Ordering.DESC);
+ long lastTime = Long.MAX_VALUE;
+ int checkDevice = 0;
+ int count = 0;
+ while (mergeSortOperator.hasNext()) {
+ TsBlock tsBlock = mergeSortOperator.next();
+ if (tsBlock == null) continue;
+ assertEquals(3, tsBlock.getValueColumnCount());
+ count += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ assertTrue(tsBlock.getTimeByIndex(i) <= lastTime);
+ lastTime = tsBlock.getTimeByIndex(i);
+ if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE3)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice < 500);
+ checkDevice++;
+ if (checkDevice == 500) {
+ lastTime = Long.MAX_VALUE;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE2)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 500 && checkDevice < 1000);
+ checkDevice++;
+ if (checkDevice == 1000) {
+ lastTime = Long.MAX_VALUE;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE1)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertEquals(tsBlock.getColumn(2).getInt(i), getValue(lastTime));
+ assertTrue(checkDevice >= 1000 && checkDevice < 1500);
+ checkDevice++;
+ if (checkDevice == 1500) {
+ lastTime = Long.MAX_VALUE;
+ }
+ } else if (Objects.equals(tsBlock.getColumn(0).getBinary(i).toString(), DEVICE0)) {
+ assertEquals(tsBlock.getColumn(1).getInt(i), getValue(lastTime));
+ assertTrue(tsBlock.getColumn(2).isNull(i));
+ // make sure the device column is by desc
+ assertTrue(checkDevice >= 1500 && checkDevice < 2000);
+ checkDevice++;
+ } else {
+ fail();
+ }
+ }
+ }
+ assertEquals(count, 2000);
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
new file mode 100644
index 0000000000..b77aed1e0b
--- /dev/null
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/SingleDeviceViewOperatorTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.mpp.execution.operator;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.exception.IllegalPathException;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
+import org.apache.iotdb.db.mpp.common.PlanFragmentId;
+import org.apache.iotdb.db.mpp.common.QueryId;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
+import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.mpp.execution.operator.process.SingleDeviceViewOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.TimeJoinOperator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.AscTimeComparator;
+import org.apache.iotdb.db.mpp.execution.operator.process.join.merge.SingleColumnMerger;
+import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.InputLocation;
+import org.apache.iotdb.db.mpp.plan.statement.component.Ordering;
+import org.apache.iotdb.db.query.reader.series.SeriesReaderTestUtil;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.block.TsBlock;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import io.airlift.units.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class SingleDeviceViewOperatorTest {
+ private static final String SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG =
+ "root.SingleDeviceViewOperatorTest";
+ private final List<String> deviceIds = new ArrayList<>();
+ private final List<MeasurementSchema> measurementSchemas = new ArrayList<>();
+
+ private final List<TsFileResource> seqResources = new ArrayList<>();
+ private final List<TsFileResource> unSeqResources = new ArrayList<>();
+
+ @Before
+ public void setUp() throws MetadataException, IOException, WriteProcessException {
+ SeriesReaderTestUtil.setUp(
+ measurementSchemas,
+ deviceIds,
+ seqResources,
+ unSeqResources,
+ SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG);
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ SeriesReaderTestUtil.tearDown(seqResources, unSeqResources);
+ }
+
+ /**
+ * Construct seriesScanOperator:[device0.sensor0, device1.sensor1], the result tsBlock should be
+ * like [Device, sensor0, sensor1]. The sensor1 column of device0 and the sensor0 column of
+ * device1 should be null.
+ */
+ @Test
+ public void singleDeviceViewOperatorTest() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
+ try {
+ // construct operator tree
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ fragmentInstanceContext.addOperatorContext(
+ 1, planNodeId1, SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ fragmentInstanceContext.addOperatorContext(
+ 2, planNodeId2, SeriesScanOperator.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 3, new PlanNodeId("3"), TimeJoinOperatorTest.class.getSimpleName());
+ fragmentInstanceContext.addOperatorContext(
+ 4, new PlanNodeId("4"), SingleDeviceViewOperator.class.getSimpleName());
+
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(
+ SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor0", TSDataType.INT32);
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(
+ SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0.sensor1", TSDataType.INT32);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ planNodeId1,
+ measurementPath1,
+ Collections.singleton("sensor0"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(0),
+ null,
+ null,
+ true);
+ seriesScanOperator1.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ planNodeId2,
+ measurementPath2,
+ Collections.singleton("sensor1"),
+ TSDataType.INT32,
+ fragmentInstanceContext.getOperatorContexts().get(1),
+ null,
+ null,
+ true);
+ seriesScanOperator2.initQueryDataSource(new QueryDataSource(seqResources, unSeqResources));
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ TimeJoinOperator timeJoinOperator =
+ new TimeJoinOperator(
+ fragmentInstanceContext.getOperatorContexts().get(2),
+ Arrays.asList(seriesScanOperator1, seriesScanOperator2),
+ Ordering.ASC,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ Arrays.asList(
+ new SingleColumnMerger(new InputLocation(0, 0), new AscTimeComparator()),
+ new SingleColumnMerger(new InputLocation(1, 0), new AscTimeComparator())),
+ new AscTimeComparator());
+ SingleDeviceViewOperator singleDeviceViewOperator =
+ new SingleDeviceViewOperator(
+ fragmentInstanceContext.getOperatorContexts().get(3),
+ SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
+ timeJoinOperator,
+ Arrays.asList(1, 2),
+ Arrays.asList(TSDataType.TEXT, TSDataType.INT32, TSDataType.INT32, TSDataType.INT32));
+ int count = 0;
+ int total = 0;
+ while (singleDeviceViewOperator.hasNext()) {
+ TsBlock tsBlock = singleDeviceViewOperator.next();
+ assertEquals(4, tsBlock.getValueColumnCount());
+ total += tsBlock.getPositionCount();
+ for (int i = 0; i < tsBlock.getPositionCount(); i++) {
+ long expectedTime = i + 20L * (count % 25);
+ assertEquals(expectedTime, tsBlock.getTimeByIndex(i));
+ assertEquals(
+ SINGLE_DEVICE_MERGE_OPERATOR_TEST_SG + ".device0",
+ tsBlock.getColumn(0).getBinary(i).toString());
+ if (expectedTime < 200) {
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(20000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ } else if (expectedTime < 260
+ || (expectedTime >= 300 && expectedTime < 380)
+ || expectedTime >= 400) {
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(10000 + expectedTime, tsBlock.getColumn(2).getInt(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ } else {
+ assertEquals(expectedTime, tsBlock.getColumn(1).getInt(i));
+ assertEquals(expectedTime, tsBlock.getColumn(2).getInt(i));
+ assertTrue(tsBlock.getColumn(3).isNull(i));
+ }
+ }
+ count++;
+ }
+ assertEquals(500, total);
+ } catch (IllegalPathException e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
index f6ca5a3f6b..8b5dd9a345 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/QueryLogicalPlanUtil.java
@@ -255,7 +255,7 @@ public class QueryLogicalPlanUtil {
static {
String sql =
"SELECT * FROM root.sg.* WHERE time > 100 and s1 > 10 "
- + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+ + "ORDER BY DEVICE,TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
QueryId queryId = new QueryId("test");
List<PlanNode> sourceNodeList1 = new ArrayList<>();
@@ -659,7 +659,7 @@ public class QueryLogicalPlanUtil {
static {
String sql =
"SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 "
- + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+ + "ORDER BY DEVICE,TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
QueryId queryId = new QueryId("test");
Filter timeFilter = TimeFilter.gt(100);
@@ -892,7 +892,7 @@ public class QueryLogicalPlanUtil {
static {
String sql =
"SELECT count(s1), max_value(s2), last_value(s1) FROM root.sg.* WHERE time > 100 and s2 > 10 "
- + "ORDER BY TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
+ + "ORDER BY DEVICE,TIME DESC LIMIT 100 OFFSET 100 ALIGN BY DEVICE";
QueryId queryId = new QueryId("test");
List<PlanNode> sourceNodeList1 = new ArrayList<>();
@@ -907,9 +907,7 @@ public class QueryLogicalPlanUtil {
(MeasurementPath) schemaMap.get("root.sg.d1.s2"),
Ordering.DESC));
sourceNodeList1.forEach(
- planNode -> {
- ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100));
- });
+ planNode -> ((SeriesScanNode) planNode).setTimeFilter(TimeFilter.gt(100)));
TimeJoinNode timeJoinNode1 =
new TimeJoinNode(queryId.genPlanNodeId(), Ordering.DESC, sourceNodeList1);
diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
index 991cebb099..81d9e0b29a 100644
--- a/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/mpp/plan/plan/distribution/AggregationDistributionTest.java
@@ -35,9 +35,9 @@ import org.apache.iotdb.db.mpp.plan.planner.plan.LogicalQueryPlan;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.AggregationNode;
-import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceMergeNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.DeviceViewNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.GroupByLevelNode;
+import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.MergeSortNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.SlidingWindowAggregationNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.TimeJoinNode;
import org.apache.iotdb.db.mpp.plan.planner.plan.node.process.VerticallyConcatNode;
@@ -757,7 +757,7 @@ public class AggregationDistributionTest {
plan.getInstances().get(1).getFragment().getPlanNodeTree().getChildren().get(0);
PlanNode f3Root =
plan.getInstances().get(2).getFragment().getPlanNodeTree().getChildren().get(0);
- assertTrue(f1Root instanceof DeviceMergeNode);
+ assertTrue(f1Root instanceof MergeSortNode);
assertTrue(f2Root instanceof VerticallyConcatNode);
assertTrue(f3Root instanceof DeviceViewNode);
assertTrue(f3Root.getChildren().get(0) instanceof VerticallyConcatNode);