You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2022/07/20 06:22:26 UTC
[iotdb] branch rel/0.13 updated: [To rel/0.13] [IOTDB-3873] Aligned timeseries support single point fill query (#6712)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.13
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.13 by this push:
new 04ed5004e9 [To rel/0.13] [IOTDB-3873] Aligned timeseries support single point fill query (#6712)
04ed5004e9 is described below
commit 04ed5004e9e2f411125f04a3b683dc255780d9b8
Author: liuminghui233 <36...@users.noreply.github.com>
AuthorDate: Wed Jul 20 14:22:22 2022 +0800
[To rel/0.13] [IOTDB-3873] Aligned timeseries support single point fill query (#6712)
---
.../apache/iotdb/db/integration/IoTDBFillIT.java | 511 ++++++++++++++++++---
.../db/query/executor/AggregationExecutor.java | 4 +-
.../iotdb/db/query/executor/FillQueryExecutor.java | 37 +-
.../db/query/executor/fill/LastPointReader.java | 3 +-
.../iotdb/db/query/executor/fill/LinearFill.java | 81 +++-
.../iotdb/db/query/executor/fill/PreviousFill.java | 23 +-
6 files changed, 559 insertions(+), 100 deletions(-)
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index 060d2d10ea..7219d4a24b 100644
--- a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -38,92 +38,97 @@ import static org.junit.Assert.fail;
@Category({LocalStandaloneTest.class, ClusterTest.class})
public class IoTDBFillIT {
- private static String[] dataSet1 =
+ private static final String[] dataSet1 =
new String[] {
"SET STORAGE GROUP TO root.ln.wf01.wt01",
"CREATE TIMESERIES root.ln.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt01.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt01.hardware WITH DATATYPE=INT32, ENCODING=PLAIN",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(1, 1.1, false, 11)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(2, 2.2, true, 22)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(3, 3.3, false, 33 )",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(4, 4.4, false, 44)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(5, 5.5, false, 55)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(5, 5.5, false, 55)",
"flush",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(100, 100.1, false, 110)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(150, 200.2, true, 220)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(200, 300.3, false, 330 )",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(250, 400.4, false, 440)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(300, 500.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(100, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(150, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(200, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(250, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(300, 500.5, false, 550)",
"flush",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(10, 10.1, false, 110)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(20, 20.2, true, 220)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(30, 30.3, false, 330 )",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(40, 40.4, false, 440)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(50, 50.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(10, 10.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(20, 20.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(30, 30.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(40, 40.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(50, 50.5, false, 550)",
"flush",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(500, 100.1, false, 110)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(510, 200.2, true, 220)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(520, 300.3, false, 330 )",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(530, 400.4, false, 440)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(540, 500.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(500, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(510, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(520, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(530, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(540, 500.5, false, 550)",
"flush",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(580, 100.1, false, 110)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(590, 200.2, true, 220)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(600, 300.3, false, 330 )",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(610, 400.4, false, 440)",
- "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) "
- + "values(620, 500.5, false, 550)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(580, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(590, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(600, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(610, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf01.wt01(timestamp,temperature,status, hardware) values(620, 500.5, false, 550)",
};
- private static String[] dataSet2 =
+ private static final String[] dataSet2 =
new String[] {
"SET STORAGE GROUP TO root.ln.wf01.wt02",
"CREATE TIMESERIES root.ln.wf01.wt02.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN",
"CREATE TIMESERIES root.ln.wf01.wt02.temperature WITH DATATYPE=DOUBLE, ENCODING=PLAIN",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
- + "values(100, 100.1, false)",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) " + "values(150, 200.2, true)",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
- + "values(300, 500.5, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(100, 100.1, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(150, 200.2, true)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(300, 500.5, false)",
"flush",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) " + "values(600, 31.1, false)",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) " + "values(750, 55.2, true)",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
- + "values(900, 1020.5, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(600, 31.1, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(750, 55.2, true)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(900, 1020.5, false)",
"flush",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
- + "values(1100, 98.41, false)",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) "
- + "values(1250, 220.2, true)",
- "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) " + "values(1400, 31, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(1100, 98.41, false)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(1250, 220.2, true)",
+ "INSERT INTO root.ln.wf01.wt02(timestamp,temperature,status) values(1400, 31, false)",
"flush",
};
+ private static final String[] dataSet3 =
+ new String[] {
+ "SET STORAGE GROUP TO root.ln.wf02.wt01",
+ "CREATE ALIGNED TIMESERIES root.ln.wf02.wt01(status BOOLEAN, temperature DOUBLE,hardware INT32)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(1, 1.1, false, 11)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(2, 2.2, true, 22)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(3, 3.3, false, 33 )",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(4, 4.4, false, 44)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(5, 5.5, false, 55)",
+ "flush",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(100, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(150, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(200, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(250, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(300, 500.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(10, 10.1, false, 110)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(20, 20.2, true, 220)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(30, 30.3, false, 330 )",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(40, 40.4, false, 440)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(50, 50.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(500, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(510, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(520, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(530, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(540, 500.5, false, 550)",
+ "flush",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(580, 100.1, false, 110)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(590, 200.2, true, 220)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(600, 300.3, false, 330 )",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(610, 400.4, false, 440)",
+ "INSERT INTO root.ln.wf02.wt01(timestamp,temperature,status, hardware) aligned values(620, 500.5, false, 550)",
+ };
+
private static final String TIMESTAMP_STR = "Time";
private static final String TEMPERATURE_STR_1 = "root.ln.wf01.wt01.temperature";
private static final String STATUS_STR_1 = "root.ln.wf01.wt01.status";
@@ -131,6 +136,10 @@ public class IoTDBFillIT {
private static final String STATUS_STR_2 = "root.ln.wf01.wt02.status";
private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
+ private static final String ALIGNED_TEMPERATURE_STR = "root.ln.wf02.wt01.temperature";
+ private static final String ALIGNED_STATUS_STR = "root.ln.wf02.wt01.status";
+ private static final String ALIGNED_HARDWARE_STR = "root.ln.wf02.wt01.hardware";
+
@Before
public void setUp() throws Exception {
EnvFactory.getEnv().initBeforeTest();
@@ -351,6 +360,87 @@ public class IoTDBFillIT {
}
}
+ @Test
+ public void LinearFillCommonAlignedTest() {
+ String[] retArray1 = new String[] {"3,3.3,false,33", "70,70.34,null,374", "70,70.34,null,374"};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select temperature, status, hardware from "
+ + "root.ln.wf02.wt01 where time = 3 "
+ + "Fill(linear, 5ms, 5ms)");
+
+ Assert.assertTrue(hasResultSet);
+
+ ResultSet resultSet = statement.getResultSet();
+ int cnt = 0;
+ try {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select temperature, status, hardware "
+ + "from root.ln.wf02.wt01 where time = 70 "
+ + "Fill(linear, 500ms, 500ms)");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select temperature, status, hardware "
+ + "from root.ln.wf02.wt01 where time = 70 "
+ + "Fill(linear)");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ } finally {
+ resultSet.close();
+ }
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void LinearFillWithBeforeOrAfterValueNullTest() {
String[] retArray1 =
@@ -431,6 +521,86 @@ public class IoTDBFillIT {
}
}
+ @Test
+ public void LinearFillWithBeforeOrAfterValueNullAlignedTest() {
+ String[] retArray1 =
+ new String[] {"70,null,null,null", "80,null,null,null", "625,null,false,null"};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select temperature,status, hardware "
+ + "from root.ln.wf02.wt01 where time = 70 "
+ + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], boolean[previous, 5ms])");
+
+ int cnt = 0;
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ try {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select temperature,status, hardware "
+ + "from root.ln.wf02.wt01 where time = 80 "
+ + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], boolean[previous, 5ms])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select temperature,status, hardware "
+ + "from root.ln.wf02.wt01 where time = 625 "
+ + "Fill(int32[linear, 25ms, 25ms], double[linear, 25ms, 25ms], boolean[previous, 5ms])");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ } finally {
+ resultSet.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void oldTypeValueFillTest() {
String res = "7,7.0,true,7";
@@ -586,6 +756,130 @@ public class IoTDBFillIT {
}
}
+ @Test
+ public void valueFillAlignedTest() {
+ String res = "7,7.0,null,7";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select temperature, status, hardware "
+ + "from root.ln.wf02.wt01 where time = 7 "
+ + "Fill(7)");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(res, ans);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void stringValueFillAlignedTest() {
+ String res = "7,null,null,null";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select temperature, status, hardware "
+ + "from root.ln.wf02.wt01 where time = 7 "
+ + "Fill('test string')");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(res, ans);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void boolValueFillAlignedTest() {
+ String res = "7,null,true,null";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select temperature, status, hardware "
+ + "from root.ln.wf02.wt01 where time = 7 "
+ + "Fill(true)");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(res, ans);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ @Test
+ public void valueFillNonNullAlignedTest() {
+ String res = "1,1.1,false,11";
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "SELECT temperature, status, hardware"
+ + " FROM root.ln.wf02.wt01"
+ + " WHERE time = 1 FILL(int32[7], double[7], boolean[true])");
+
+ Assert.assertTrue(hasResultSet);
+ ResultSet resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(res, ans);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void oldTypePreviousFillTest() {
String[] retArray1 = new String[] {"3,3.3,false,33", "70,50.5,false,550", "70,null,null,null"};
@@ -748,6 +1042,87 @@ public class IoTDBFillIT {
}
}
+ @Test
+ public void PreviousFillAlignedTest() {
+ String[] retArray1 = new String[] {"3,3.3,false,33", "70,50.5,false,550", "70,null,null,null"};
+ try (Connection connection = EnvFactory.getEnv().getConnection();
+ Statement statement = connection.createStatement()) {
+
+ boolean hasResultSet =
+ statement.execute(
+ "select temperature,status, hardware "
+ + "from root.ln.wf02.wt01 where time = 3 "
+ + "Fill(previous, 5ms)");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt;
+ ResultSet resultSet = statement.getResultSet();
+ try {
+ cnt = 0;
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select temperature,status, hardware "
+ + "from root.ln.wf02.wt01 where time = 70 "
+ + "Fill(previous, 500ms)");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+
+ hasResultSet =
+ statement.execute(
+ "select temperature,status, hardware "
+ + "from root.ln.wf02.wt01 where time = 70 "
+ + "Fill(previous, 5ms)");
+
+ Assert.assertTrue(hasResultSet);
+ resultSet = statement.getResultSet();
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(ALIGNED_TEMPERATURE_STR)
+ + ","
+ + resultSet.getString(ALIGNED_STATUS_STR)
+ + ","
+ + resultSet.getString(ALIGNED_HARDWARE_STR);
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ } finally {
+ resultSet.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
@Test
public void EmptyTimeRangeFillTest() {
String[] retArray1 = new String[] {"3,3.3,false,33", "70,70.34,false,374"};
@@ -1219,7 +1594,9 @@ public class IoTDBFillIT {
for (String sql : dataSet2) {
statement.execute(sql);
}
-
+ for (String sql : dataSet3) {
+ statement.execute(sql);
+ }
} catch (Exception e) {
e.printStackTrace();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
index 2f4e65e8db..124ca0f5b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
@@ -341,7 +341,7 @@ public class AggregationExecutor {
// update filter by TTL
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
- if (!isAggregateResultEmpty(ascAggregateResultList)) {
+ if (ascAggregateResultList != null && !isAggregateResultEmpty(ascAggregateResultList)) {
AlignedSeriesAggregateReader seriesReader =
new AlignedSeriesAggregateReader(
alignedPath,
@@ -355,7 +355,7 @@ public class AggregationExecutor {
true);
aggregateFromAlignedReader(seriesReader, ascAggregateResultList);
}
- if (!isAggregateResultEmpty(descAggregateResultList)) {
+ if (descAggregateResultList != null && !isAggregateResultEmpty(descAggregateResultList)) {
AlignedSeriesAggregateReader seriesReader =
new AlignedSeriesAggregateReader(
alignedPath,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
index e2c3e60038..2366383881 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/FillQueryExecutor.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.storagegroup.VirtualStorageGroupProcessor;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.physical.crud.FillQueryPlan;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -45,6 +47,7 @@ import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.read.filter.factory.FilterFactory;
import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -91,8 +94,18 @@ public class FillQueryExecutor {
Filter timeFilter = initFillExecutorsAndContructTimeFilter(context);
+ List<PartialPath> groupedSeries = new ArrayList<>();
+ for (PartialPath series : selectedSeries) {
+ MeasurementPath measurementPath = (MeasurementPath) series;
+ if (measurementPath.isUnderAlignedEntity()) {
+ groupedSeries.add(new AlignedPath(measurementPath));
+ } else {
+ groupedSeries.add(measurementPath);
+ }
+ }
+
Pair<List<VirtualStorageGroupProcessor>, Map<VirtualStorageGroupProcessor, List<PartialPath>>>
- lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(selectedSeries);
+ lockListAndProcessorToSeriesMapPair = StorageEngine.getInstance().mergeLock(groupedSeries);
List<VirtualStorageGroupProcessor> lockList = lockListAndProcessorToSeriesMapPair.left;
Map<VirtualStorageGroupProcessor, List<PartialPath>> processorToSeriesMap =
lockListAndProcessorToSeriesMapPair.right;
@@ -114,7 +127,7 @@ public class FillQueryExecutor {
if (timeValuePairs.get(i) != null) {
// No need to fill
- record.addField(timeValuePairs.get(i).getValue().getValue(), dataType);
+ addTimeValueToResult(record, timeValuePairs.get(i), dataType);
continue;
}
@@ -143,7 +156,7 @@ public class FillQueryExecutor {
if (timeValuePair == null || timeValuePair.getValue() == null) {
record.addField(null);
} else {
- record.addField(timeValuePair.getValue().getValue(), dataType);
+ addTimeValueToResult(record, timeValuePair, dataType);
}
}
@@ -152,6 +165,16 @@ public class FillQueryExecutor {
return dataSet;
}
+ private void addTimeValueToResult(
+ RowRecord record, TimeValuePair timeValuePair, TSDataType dataType) {
+ TsPrimitiveType value = timeValuePair.getValue();
+ if (value.getDataType() == TSDataType.VECTOR) {
+ record.addField(value.getVector()[0].getValue(), dataType);
+ } else {
+ record.addField(value.getValue(), dataType);
+ }
+ }
+
private Filter initFillExecutorsAndContructTimeFilter(QueryContext context)
throws UnsupportedDataTypeException, QueryProcessException, StorageEngineException {
long lowerBound = Long.MAX_VALUE;
@@ -248,12 +271,18 @@ public class FillQueryExecutor {
Filter timeFilter = TimeFilter.eq(queryTime);
List<ManagedSeriesReader> readers = new ArrayList<>();
for (int i = 0; i < selectedSeries.size(); i++) {
- PartialPath path = selectedSeries.get(i);
TSDataType dataType = dataTypes.get(i);
+ MeasurementPath measurementPath = (MeasurementPath) selectedSeries.get(i);
+ PartialPath path =
+ measurementPath.isUnderAlignedEntity()
+ ? new AlignedPath(measurementPath)
+ : measurementPath;
+
QueryDataSource queryDataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(path, context, timeFilter, plan.isAscending());
timeFilter = queryDataSource.updateFilterUsingTTL(timeFilter);
+
ManagedSeriesReader reader =
new SeriesRawDataBatchReader(
path,
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
index fc59ddf66b..606cc7fd16 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LastPointReader.java
@@ -106,8 +106,7 @@ public class LastPointReader {
TimeValuePair lastPoint = new TimeValuePair(Long.MIN_VALUE, null);
for (int index = seqFileResource.size() - 1; index >= 0; index--) {
TsFileResource resource = seqFileResource.get(index);
- ITimeSeriesMetadata timeseriesMetadata;
- timeseriesMetadata =
+ ITimeSeriesMetadata timeseriesMetadata =
loadTimeSeriesMetadata(resource, seriesPath, context, timeFilter, measurements);
if (timeseriesMetadata != null) {
if (!timeseriesMetadata.isModified()
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java
index f810780897..9e04d1ca23 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/LinearFill.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.UnSupportedFillTypeException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.aggregation.AggregateResult;
@@ -40,6 +42,7 @@ import org.apache.iotdb.tsfile.utils.TsPrimitiveType;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -52,6 +55,8 @@ public class LinearFill extends IFill {
// all measurements sharing the same device as "seriesPath"
protected Set<String> deviceMeasurements;
+ protected boolean isAligned;
+
public LinearFill(long beforeRange, long afterRange) {
this.beforeRange = beforeRange;
this.afterRange = afterRange;
@@ -124,6 +129,7 @@ public class LinearFill extends IFill {
this.queryStartTime = queryTime;
this.context = context;
this.deviceMeasurements = sensors;
+ this.isAligned = ((MeasurementPath) seriesPath).isUnderAlignedEntity();
constructFilter();
}
@@ -157,15 +163,25 @@ public class LinearFill extends IFill {
QueryDataSource dataSource =
QueryResourceManager.getInstance()
.getQueryDataSource(seriesPath, context, beforeFilter, false);
+
LastPointReader lastReader =
- new LastPointReader(
- seriesPath,
- dataType,
- deviceMeasurements,
- context,
- dataSource,
- queryStartTime,
- beforeFilter);
+ isAligned
+ ? new AlignedLastPointReader(
+ new AlignedPath((MeasurementPath) seriesPath),
+ dataType,
+ deviceMeasurements,
+ context,
+ dataSource,
+ queryStartTime,
+ beforeFilter)
+ : new LastPointReader(
+ seriesPath,
+ dataType,
+ deviceMeasurements,
+ context,
+ dataSource,
+ queryStartTime,
+ beforeFilter);
return lastReader.readLastPoint();
}
@@ -178,17 +194,30 @@ public class LinearFill extends IFill {
AggregateResult firstValueResult = new FirstValueAggrResult(dataType);
aggregateResultList.add(minTimeResult);
aggregateResultList.add(firstValueResult);
- AggregationExecutor.aggregateOneSeries(
- seriesPath,
- deviceMeasurements,
- context,
- afterFilter,
- dataType,
- aggregateResultList,
- null,
- null,
- true);
+ if (isAligned) {
+ AggregationExecutor.aggregateOneAlignedSeries(
+ new AlignedPath((MeasurementPath) seriesPath),
+ deviceMeasurements,
+ context,
+ afterFilter,
+ dataType,
+ Collections.singletonList(aggregateResultList),
+ null,
+ null,
+ true);
+ } else {
+ AggregationExecutor.aggregateOneSeries(
+ seriesPath,
+ deviceMeasurements,
+ context,
+ afterFilter,
+ dataType,
+ aggregateResultList,
+ null,
+ null,
+ true);
+ }
return convertToResult(minTimeResult, firstValueResult);
}
@@ -211,9 +240,14 @@ public class LinearFill extends IFill {
throws UnSupportedFillTypeException {
double totalTimeLength = (double) afterPair.getTimestamp() - beforePair.getTimestamp();
double beforeTimeLength = (double) (queryStartTime - beforePair.getTimestamp());
+
+ TsPrimitiveType beforeValue = beforePair.getValue();
+ boolean isAlignedValue = beforeValue.getDataType() == TSDataType.VECTOR;
+
switch (dataType) {
case INT32:
- int startIntValue = beforePair.getValue().getInt();
+ int startIntValue =
+ isAlignedValue ? beforeValue.getVector()[0].getInt() : beforeValue.getInt();
int endIntValue = afterPair.getValue().getInt();
int fillIntValue =
startIntValue
@@ -222,7 +256,8 @@ public class LinearFill extends IFill {
beforePair.setValue(TsPrimitiveType.getByType(TSDataType.INT32, fillIntValue));
break;
case INT64:
- long startLongValue = beforePair.getValue().getLong();
+ long startLongValue =
+ isAlignedValue ? beforeValue.getVector()[0].getLong() : beforeValue.getLong();
long endLongValue = afterPair.getValue().getLong();
long fillLongValue =
startLongValue
@@ -231,7 +266,8 @@ public class LinearFill extends IFill {
beforePair.setValue(TsPrimitiveType.getByType(TSDataType.INT64, fillLongValue));
break;
case FLOAT:
- float startFloatValue = beforePair.getValue().getFloat();
+ float startFloatValue =
+ isAlignedValue ? beforeValue.getVector()[0].getFloat() : beforeValue.getFloat();
float endFloatValue = afterPair.getValue().getFloat();
float fillFloatValue =
startFloatValue
@@ -239,7 +275,8 @@ public class LinearFill extends IFill {
beforePair.setValue(TsPrimitiveType.getByType(TSDataType.FLOAT, fillFloatValue));
break;
case DOUBLE:
- double startDoubleValue = beforePair.getValue().getDouble();
+ double startDoubleValue =
+ isAlignedValue ? beforeValue.getVector()[0].getDouble() : beforeValue.getDouble();
double endDoubleValue = afterPair.getValue().getDouble();
double fillDoubleValue =
startDoubleValue
diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java
index f41e351fab..3c222d5060 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/executor/fill/PreviousFill.java
@@ -21,6 +21,8 @@ package org.apache.iotdb.db.query.executor.fill;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
+import org.apache.iotdb.db.metadata.path.AlignedPath;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
import org.apache.iotdb.db.metadata.path.PartialPath;
import org.apache.iotdb.db.qp.utils.DatetimeUtils;
import org.apache.iotdb.db.query.context.QueryContext;
@@ -121,9 +123,24 @@ public class PreviousFill extends IFill {
.getQueryDataSource(seriesPath, context, timeFilter, false);
// update filter by TTL
timeFilter = dataSource.updateFilterUsingTTL(timeFilter);
- LastPointReader lastReader =
- new LastPointReader(
- seriesPath, dataType, allSensors, context, dataSource, queryStartTime, timeFilter);
+
+ LastPointReader lastReader;
+ MeasurementPath measurementPath = (MeasurementPath) seriesPath;
+ if (measurementPath.isUnderAlignedEntity()) {
+ lastReader =
+ new AlignedLastPointReader(
+ new AlignedPath(measurementPath),
+ dataType,
+ allSensors,
+ context,
+ dataSource,
+ queryStartTime,
+ timeFilter);
+ } else {
+ lastReader =
+ new LastPointReader(
+ seriesPath, dataType, allSensors, context, dataSource, queryStartTime, timeFilter);
+ }
return lastReader.readLastPoint();
}