You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/06/02 09:14:53 UTC
[iotdb] branch rel/0.11 updated: [To rel/0.11] Overlapped data
should be consumed first (#3303)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rel/0.11 by this push:
new e77d06d [To rel/0.11] Overlapped data should be consumed first (#3303)
e77d06d is described below
commit e77d06daf1dc38edf3bd242e465129c6bc2eb8e5
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Wed Jun 2 17:14:19 2021 +0800
[To rel/0.11] Overlapped data should be consumed first (#3303)
Overlapped data should be consumed first
---
.../iotdb/db/query/reader/series/SeriesReader.java | 3 +
.../db/integration/IoTDBOverlappedPageIT.java | 82 ++++++++++++++++++----
2 files changed, 71 insertions(+), 14 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 46b8c1c..31b2369 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -617,6 +617,9 @@ public class SeriesReader {
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+ // update if there are unSeqPageReaders unpacked
+ timeValuePair = mergeReader.currentTimeValuePair();
+
if (firstPageReader != null) {
if ((orderUtils.getAscending() && timeValuePair.getTimestamp() > firstPageReader
.getStatistics().getEndTime()) || (!orderUtils.getAscending()
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
index 76131ad..7f70d67 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
@@ -44,31 +44,50 @@ import static org.junit.Assert.fail;
public class IoTDBOverlappedPageIT {
private static int beforeMaxNumberOfPointsInPage;
- private static long beforeMemtableSizeThreshold;
+ private static boolean enableUnseqCompaction;
+
+ private static String[] dataSet1 = new String[]{
+ "SET STORAGE GROUP TO root.sg1",
+ "CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "INSERT INTO root.sg1.d1(time,s1) values(1, 1)",
+ "INSERT INTO root.sg1.d1(time,s1) values(10, 10)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(20, 20)",
+ "INSERT INTO root.sg1.d1(time,s1) values(30, 30)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(110, 110)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(5, 5)",
+ "INSERT INTO root.sg1.d1(time,s1) values(50, 50)",
+ "INSERT INTO root.sg1.d1(time,s1) values(100, 100)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(15, 15)",
+ "INSERT INTO root.sg1.d1(time,s1) values(25, 25)",
+ "flush",
+ };
@BeforeClass
public static void setUp() throws Exception {
+ Class.forName(Config.JDBC_DRIVER_NAME);
EnvironmentUtils.closeStatMonitor();
IoTDBDescriptor.getInstance().getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
- beforeMemtableSizeThreshold = IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
- IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
+ enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+
// max_number_of_points_in_page = 10
beforeMaxNumberOfPointsInPage = TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
- insertData();
}
@AfterClass
public static void tearDown() throws Exception {
// recovery value
- TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
- IoTDBDescriptor.getInstance().getConfig()
- .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(enableUnseqCompaction);
+ IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMaxNumberOfPointsInPage);
}
@Test
@@ -89,17 +108,52 @@ public class IoTDBOverlappedPageIT {
try (Connection connection = DriverManager
.getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
+ insertData();
+
String sql = "select s0 from root.vehicle.d0 where time >= 1 and time <= 110 AND root.vehicle.d0.s0 > 110";
- ResultSet resultSet = statement.executeQuery(sql);
- int cnt = 0;
- try {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int cnt = 0;
while (resultSet.next()) {
String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString("root.vehicle.d0.s0");
Assert.assertEquals(res[cnt], ans);
cnt++;
}
- } finally {
- resultSet.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test to check if timeValuePair is updated when there are unSeqPageReaders are unpacked
+ * in method hasNextOverlappedPage() in SeriesReader.java
+ */
+ @Test
+ public void selectOverlappedPageTest2() {
+ String[] res = {
+ "0,10"
+ };
+
+ try (Connection connection = DriverManager
+ .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String insertSql : dataSet1) {
+ statement.execute(insertSql);
+ }
+
+ boolean hasResultSet = statement.execute("select count(s1) from root.sg1.d1");
+ Assert.assertTrue(hasResultSet);
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans = resultSet.getString(TIMESTAMP_STR) + "," + resultSet
+ .getString("count(root.sg1.d1.s1)");
+ Assert.assertEquals(res[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(res.length, cnt);
}
} catch (Exception e) {
e.printStackTrace();