You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/04 08:12:17 UTC

[iotdb] 02/04: Bug Fix: Overlapped data should be consumed first (#3270)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch rel/0.12
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 824b4c6a7b4f51feb8ffefb4f7ea84c9c2a8e11b
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jun 3 10:15:01 2021 +0800

    Bug Fix: Overlapped data should be consumed first (#3270)
    
    Overlapped data should be consumed first
    
    (cherry picked from commit 22086d318668c344aa60dae997c6f45a1db5a522)
---
 .../iotdb/db/query/reader/series/SeriesReader.java |  5 +-
 .../db/integration/IoTDBOverlappedPageIT.java      | 86 ++++++++++++++++++----
 2 files changed, 75 insertions(+), 16 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index 01cb4f1..aa78aeb 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -480,7 +480,7 @@ public class SeriesReader {
                     firstPageReader.getStatistics(), unSeqPageReaders.peek().getStatistics())
             || (mergeReader.hasNextTimeValuePair()
                 && mergeReader.currentTimeValuePair().getTimestamp()
-                    > firstPageReader.getStatistics().getStartTime()));
+                    >= firstPageReader.getStatistics().getStartTime()));
   }
 
   private void unpackAllOverlappedChunkMetadataToPageReaders(long endpointTime, boolean init)
@@ -683,6 +683,9 @@ public class SeriesReader {
           unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false);
           unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp());
 
+          // update if there are unpacked unSeqPageReaders
+          timeValuePair = mergeReader.currentTimeValuePair();
+
           // from now, the unsequence reader is all unpacked, so we don't need to consider it
           // we has first page reader now
           if (firstPageReader != null) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
index d62c1fc..8cb9f27 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBOverlappedPageIT.java
@@ -45,37 +45,57 @@ import static org.junit.Assert.fail;
 public class IoTDBOverlappedPageIT {
 
   private static int beforeMaxNumberOfPointsInPage;
-  private static long beforeMemtableSizeThreshold;
+  private static boolean enableUnseqCompaction;
+
+  private static String[] dataSet1 =
+      new String[] {
+        "SET STORAGE GROUP TO root.sg1",
+        "CREATE TIMESERIES root.sg1.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.sg1.d1(time,s1) values(1, 1)",
+        "INSERT INTO root.sg1.d1(time,s1) values(10, 10)",
+        "flush",
+        "INSERT INTO root.sg1.d1(time,s1) values(20, 20)",
+        "INSERT INTO root.sg1.d1(time,s1) values(30, 30)",
+        "flush",
+        "INSERT INTO root.sg1.d1(time,s1) values(110, 110)",
+        "flush",
+        "INSERT INTO root.sg1.d1(time,s1) values(5, 5)",
+        "INSERT INTO root.sg1.d1(time,s1) values(50, 50)",
+        "INSERT INTO root.sg1.d1(time,s1) values(100, 100)",
+        "flush",
+        "INSERT INTO root.sg1.d1(time,s1) values(15, 15)",
+        "INSERT INTO root.sg1.d1(time,s1) values(25, 25)",
+        "flush",
+      };
 
   @BeforeClass
   public static void setUp() throws Exception {
+    Class.forName(Config.JDBC_DRIVER_NAME);
     EnvironmentUtils.closeStatMonitor();
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
-    beforeMemtableSizeThreshold =
-        IoTDBDescriptor.getInstance().getConfig().getMemtableSizeThreshold();
-    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
+    enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
+
     // max_number_of_points_in_page = 10
     beforeMaxNumberOfPointsInPage =
         TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(10);
     EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    insertData();
   }
 
   @AfterClass
   public static void tearDown() throws Exception {
     // recovery value
-    TSFileDescriptor.getInstance()
-        .getConfig()
-        .setMaxNumberOfPointsInPage(beforeMaxNumberOfPointsInPage);
     EnvironmentUtils.cleanEnv();
-    IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(beforeMemtableSizeThreshold);
     IoTDBDescriptor.getInstance()
         .getConfig()
         .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(enableUnseqCompaction);
+    IoTDBDescriptor.getInstance()
+        .getConfig()
+        .setMemtableSizeThreshold(beforeMaxNumberOfPointsInPage);
   }
 
   @Test
@@ -89,19 +109,55 @@ public class IoTDBOverlappedPageIT {
             DriverManager.getConnection(
                 Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
+      insertData();
+
       String sql =
           "select s0 from root.vehicle.d0 where time >= 1 and time <= 110 AND root.vehicle.d0.s0 > 110";
-      ResultSet resultSet = statement.executeQuery(sql);
-      int cnt = 0;
-      try {
+      try (ResultSet resultSet = statement.executeQuery(sql)) {
+        int cnt = 0;
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TIMESTAMP_STR) + "," + resultSet.getString("root.vehicle.d0.s0");
           Assert.assertEquals(res[cnt], ans);
           cnt++;
         }
-      } finally {
-        resultSet.close();
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  /**
+   * Test to check if timeValuePair is updated when there are unSeqPageReaders are unpacked in
+   * method hasNextOverlappedPage() in SeriesReader.java
+   */
+  @Test
+  public void selectOverlappedPageTest2() {
+    String[] res = {"0,10"};
+
+    try (Connection connection =
+            DriverManager.getConnection(
+                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      for (String insertSql : dataSet1) {
+        statement.execute(insertSql);
+      }
+
+      boolean hasResultSet = statement.execute("select count(s1) from root.sg1.d1");
+      Assert.assertTrue(hasResultSet);
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString("count(root.sg1.d1.s1)");
+          Assert.assertEquals(res[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(res.length, cnt);
       }
     } catch (Exception e) {
       e.printStackTrace();