You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/02 11:17:55 UTC

[iotdb] 01/01: Fix group by data inconsistence bug

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch groupbybug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ef46fdd96844b3932a6c390e5472b5a738f0e579
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Jun 2 19:16:56 2021 +0800

    Fix group by data inconsistence bug
---
 .../db/query/aggregation/impl/CountAggrResult.java |  24 ++---
 .../iotdb/db/query/reader/series/SeriesReader.java |   3 +-
 .../iotdb/db/integration/IoTDBGroupByUnseqIT.java  | 103 ++++++++++++++++-----
 3 files changed, 87 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 7b40fda..00bfcad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -45,29 +45,25 @@ public class CountAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromStatistics(Statistics statistics) {
-    long preValue = getLongValue();
-    preValue += statistics.getCount();
-    setLongValue(preValue);
+    setLongValue(getLongValue() + statistics.getCount());
   }
 
   @Override
   public void updateResultFromPageData(BatchData dataInThisPage) {
-    int cnt = dataInThisPage.length();
-    long preValue = getLongValue();
-    preValue += cnt;
-    setLongValue(preValue);
+    setLongValue(getLongValue() + dataInThisPage.length());
   }
 
   @Override
   public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+    int cnt = 0;
     while (dataInThisPage.hasCurrent()) {
       if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
         break;
       }
-      long preValue = getLongValue();
-      setLongValue(++preValue);
+      cnt++;
       dataInThisPage.next();
     }
+    setLongValue(getLongValue() + cnt);
   }
 
   @Override
@@ -80,10 +76,7 @@ public class CountAggrResult extends AggregateResult {
         cnt++;
       }
     }
-
-    long preValue = getLongValue();
-    preValue += cnt;
-    setLongValue(preValue);
+    setLongValue(getLongValue() + cnt);
   }
 
   @Override
@@ -94,10 +87,7 @@ public class CountAggrResult extends AggregateResult {
         cnt++;
       }
     }
-
-    long preValue = getLongValue();
-    preValue += cnt;
-    setLongValue(preValue);
+    setLongValue(getLongValue() + cnt);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index f6ac803..21a2c08 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -297,11 +297,12 @@ public class SeriesReader {
        * first time series metadata is already unpacked, consume cached ChunkMetadata
        */
       if (!cachedChunkMetadata.isEmpty()) {
-        firstChunkMetadata = cachedChunkMetadata.poll();
+        firstChunkMetadata = cachedChunkMetadata.peek();
         unpackAllOverlappedTsFilesToTimeSeriesMetadata(
             orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
         unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
             orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
+        firstChunkMetadata = cachedChunkMetadata.poll();
       }
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
index c8c69b3..e515612 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
@@ -33,14 +33,10 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
 
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
 import static org.apache.iotdb.db.constant.TestConstant.count;
 import static org.junit.Assert.fail;
 
-/**
- * This test contains one seq file and one unseq file. In the seq file, it contains two pages:
- * 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page: 7,
- * 9. The unseq page is overlapped with the second seq page.
- */
 public class IoTDBGroupByUnseqIT {
 
   private static String[] dataSet1 =
@@ -61,13 +57,29 @@ public class IoTDBGroupByUnseqIT {
         "flush"
       };
 
-  private static final String TIMESTAMP_STR = "Time";
+  private static String[] dataSet2 =
+      new String[] {
+        "SET STORAGE GROUP TO root.sg2",
+        "CREATE TIMESERIES root.sg2.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.sg2.d1(time,s1) values(1, 1)",
+        "INSERT INTO root.sg2.d1(time,s1) values(10, 10)",
+        "flush",
+        "INSERT INTO root.sg2.d1(time,s1) values(19, 19)",
+        "INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
+        "flush",
+        "INSERT INTO root.sg2.d1(time,s1) values(5, 5)",
+        "INSERT INTO root.sg2.d1(time,s1) values(15, 15)",
+        "INSERT INTO root.sg2.d1(time,s1) values(26, 26)",
+        "INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
+        "flush"
+      };
 
   private boolean enableUnseqCompaction;
   private int maxNumberOfPointsInPage;
 
   @Before
   public void setUp() throws Exception {
+    Class.forName(Config.JDBC_DRIVER_NAME);
     EnvironmentUtils.closeStatMonitor();
     enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
     IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
@@ -75,23 +87,6 @@ public class IoTDBGroupByUnseqIT {
         TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(4);
     EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    prepareData();
-  }
-
-  private void prepareData() {
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement(); ) {
-
-      for (String sql : dataSet1) {
-        statement.execute(sql);
-      }
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
   }
 
   @After
@@ -101,6 +96,11 @@ public class IoTDBGroupByUnseqIT {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
   }
 
+  /**
+   * This test contains one seq file and one unseq file. In the seq file, it contains two pages:
+   * 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page:
+   * 7, 9. The unseq page is overlapped with the second seq page.
+   */
   @Test
   public void test1() {
     String[] retArray1 =
@@ -111,13 +111,17 @@ public class IoTDBGroupByUnseqIT {
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
+
+      for (String sql : dataSet1) {
+        statement.execute(sql);
+      }
+
       boolean hasResultSet =
           statement.execute("select count(s1) from root.sg1.d1 group by ([1, 13), 3ms)");
 
       Assert.assertTrue(hasResultSet);
-      int cnt;
+      int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
-        cnt = 0;
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TIMESTAMP_STR)
@@ -133,4 +137,53 @@ public class IoTDBGroupByUnseqIT {
       fail(e.getMessage());
     }
   }
+
+  /**
+   * This test contains two seq files and one unseq file. In the first seq file, it contains two
+   * points: [1, 10]. In the second seq file, it contains two points: [19, 30]. In the unseq file,
+   * it contains two CHUNKS: [5, 15], [26, 30]. The unseq file is overlapped with two seq files.
+   * While the chunk [19,30] in the second seq file is unpacked, it should replace [26,30] as the
+   * first chunk.
+   */
+  @Test
+  public void test2() {
+    String[] retArray1 = new String[] {"5,1", "10,1", "15,2", "20,0", "25,1"};
+
+    int preAvgSeriesPointNumberThreshold =
+        IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
+
+      for (String sql : dataSet2) {
+        statement.execute(sql);
+      }
+
+      boolean hasResultSet =
+          statement.execute("select count(s1) from root.sg2.d1 group by ([5, 30), 5ms)");
+
+      Assert.assertTrue(hasResultSet);
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(count("root.sg2.d1.s1"));
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray1.length, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
+    }
+  }
 }