You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/04 08:12:17 UTC
[iotdb] 02/04: Bug Fix: Overlapped data should be consumed first
(#3270)
This is an automated email from the ASF dual-hosted git repository.
xiangweiwei pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 824b4c6a7b4f51feb8ffefb4f7ea84c9c2a8e11b
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jun 3 10:15:01 2021 +0800
Bug Fix: Overlapped data should be consumed first (#3270)
Overlapped data should be consumed first
(cherry picked from commit 22086d318668c344aa60dae997c6f45a1db5a522)
---
.../iotdb/db/query/reader/series/SeriesReader.java | 5 +-
.../db/integration/IoTDBOverlappedPageIT.java | 86 ++++++++++++++++++----
2 files changed, 75 insertions(+), 16 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 01cb4f1..aa78aeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -480,7 +480,7 @@ public class SeriesReader {
firstPageReader.getStatistics(), unSeqPageReaders.peek().getStatistics())
|| (mergeReader.hasNextTimeValuePair()
&& mergeReader.currentTimeValuePair().getTimestamp()
- > firstPageReader.getStatistics().getStartTime()));
+ >= firstPageReader.getStatistics().getStartTime()));
}
private void unpackAllOverlappedChunkMetadataToPageReaders(long endpointTime, boolean init)
@@ -683,6 +683,9 @@ public class SeriesReader {
unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
+ // update if there are unpacked unSeqPageReaders
+ timeValuePair = mergeReader.currentTimeValuePair();
+
// from now, the unsequence reader is all unpacked, so we don't need to consider it
// we has first page reader now
if (firstPageReader != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
index d62c1fc..8cb9f27 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
@@ -45,37 +45,57 @@ import static org.junit.Assert.fail;
public class IoTDBOverlappedPageIT {
private static int beforeMaxNumberOfPointsInPage;
- private static long beforeMemtableSizeThreshold;
+ private static boolean enableUnseqCompaction;
+
+ private static String[] dataSet1 =
+ new String[] {
+ "SET STORAGE GROUP TO root.sg1",
+ "CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "INSERT INTO root.sg1.d1(time,s1) values(1, 1)",
+ "INSERT INTO root.sg1.d1(time,s1) values(10, 10)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(20, 20)",
+ "INSERT INTO root.sg1.d1(time,s1) values(30, 30)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(110, 110)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(5, 5)",
+ "INSERT INTO root.sg1.d1(time,s1) values(50, 50)",
+ "INSERT INTO root.sg1.d1(time,s1) values(100, 100)",
+ "flush",
+ "INSERT INTO root.sg1.d1(time,s1) values(15, 15)",
+ "INSERT INTO root.sg1.d1(time,s1) values(25, 25)",
+ "flush",
+ };
@BeforeClass
public static void setUp() throws Exception {
+ Class.forName(Config.JDBC_DRIVER_NAME);
EnvironmentUtils.closeStatMonitor();
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
- beforeMemtableSizeThreshold =
- IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
- IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
+ enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+
// max_number_of_points_in_page = 10
beforeMaxNumberOfPointsInPage =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
- insertData();
}
@AfterClass
public static void tearDown() throws Exception {
// recovery value
- TSFileDescriptor.getInstance()
- .getConfig()
- .setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
EnvironmentUtils.cleanEnv();
- IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
IoTDBDescriptor.getInstance()
.getConfig()
.setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(enableUnseqCompaction);
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setMemtableSizeThreshold(beforeMaxNumberOfPointsInPage);
}
@Test
@@ -89,19 +109,55 @@ public class IoTDBOverlappedPageIT {
DriverManager.getConnection(
Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
+ insertData();
+
String sql =
"select s0 from root.vehicle.d0 where time >= 1 and time <= 110 AND root.vehicle.d0.s0 > 110";
- ResultSet resultSet = statement.executeQuery(sql);
- int cnt = 0;
- try {
+ try (ResultSet resultSet = statement.executeQuery(sql)) {
+ int cnt = 0;
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString("root.vehicle.d0.s0");
Assert.assertEquals(res[cnt], ans);
cnt++;
}
- } finally {
- resultSet.close();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ /**
+ * Test to check if timeValuePair is updated when there are unSeqPageReaders are unpacked in
+ * method hasNextOverlappedPage() in SeriesReader.java
+ */
+ @Test
+ public void selectOverlappedPageTest2() {
+ String[] res = {"0,10"};
+
+ try (Connection connection =
+ DriverManager.getConnection(
+ Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ for (String insertSql : dataSet1) {
+ statement.execute(insertSql);
+ }
+
+ boolean hasResultSet = statement.execute("select count(s1) from root.sg1.d1");
+ Assert.assertTrue(hasResultSet);
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString("count(root.sg1.d1.s1)");
+ Assert.assertEquals(res[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(res.length, cnt);
}
} catch (Exception e) {
e.printStackTrace();