You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2021/06/02 11:17:54 UTC

[iotdb] branch groupbybug created (now ef46fdd)

This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a change to branch groupbybug
in repository https://gitbox.apache.org/repos/asf/iotdb.git.


      at ef46fdd  Fix group by data inconsistence bug

This branch includes the following new commits:

     new ef46fdd  Fix group by data inconsistence bug

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[iotdb] 01/01: Fix group by data inconsistence bug

Posted by xi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

xiangweiwei pushed a commit to branch groupbybug
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit ef46fdd96844b3932a6c390e5472b5a738f0e579
Author: Alima777 <wx...@gmail.com>
AuthorDate: Wed Jun 2 19:16:56 2021 +0800

    Fix group by data inconsistence bug
---
 .../db/query/aggregation/impl/CountAggrResult.java |  24 ++---
 .../iotdb/db/query/reader/series/SeriesReader.java |   3 +-
 .../iotdb/db/integration/IoTDBGroupByUnseqIT.java  | 103 ++++++++++++++++-----
 3 files changed, 87 insertions(+), 43 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 7b40fda..00bfcad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -45,29 +45,25 @@ public class CountAggrResult extends AggregateResult {
 
   @Override
   public void updateResultFromStatistics(Statistics statistics) {
-    long preValue = getLongValue();
-    preValue += statistics.getCount();
-    setLongValue(preValue);
+    setLongValue(getLongValue() + statistics.getCount());
   }
 
   @Override
   public void updateResultFromPageData(BatchData dataInThisPage) {
-    int cnt = dataInThisPage.length();
-    long preValue = getLongValue();
-    preValue += cnt;
-    setLongValue(preValue);
+    setLongValue(getLongValue() + dataInThisPage.length());
   }
 
   @Override
   public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+    int cnt = 0;
     while (dataInThisPage.hasCurrent()) {
       if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
         break;
       }
-      long preValue = getLongValue();
-      setLongValue(++preValue);
+      cnt++;
       dataInThisPage.next();
     }
+    setLongValue(getLongValue() + cnt);
   }
 
   @Override
@@ -80,10 +76,7 @@ public class CountAggrResult extends AggregateResult {
         cnt++;
       }
     }
-
-    long preValue = getLongValue();
-    preValue += cnt;
-    setLongValue(preValue);
+    setLongValue(getLongValue() + cnt);
   }
 
   @Override
@@ -94,10 +87,7 @@ public class CountAggrResult extends AggregateResult {
         cnt++;
       }
     }
-
-    long preValue = getLongValue();
-    preValue += cnt;
-    setLongValue(preValue);
+    setLongValue(getLongValue() + cnt);
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index f6ac803..21a2c08 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -297,11 +297,12 @@ public class SeriesReader {
        * first time series metadata is already unpacked, consume cached ChunkMetadata
        */
       if (!cachedChunkMetadata.isEmpty()) {
-        firstChunkMetadata = cachedChunkMetadata.poll();
+        firstChunkMetadata = cachedChunkMetadata.peek();
         unpackAllOverlappedTsFilesToTimeSeriesMetadata(
             orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
         unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
             orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
+        firstChunkMetadata = cachedChunkMetadata.poll();
       }
     }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
index c8c69b3..e515612 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
@@ -33,14 +33,10 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.Statement;
 
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
 import static org.apache.iotdb.db.constant.TestConstant.count;
 import static org.junit.Assert.fail;
 
-/**
- * This test contains one seq file and one unseq file. In the seq file, it contains two pages:
- * 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page: 7,
- * 9. The unseq page is overlapped with the second seq page.
- */
 public class IoTDBGroupByUnseqIT {
 
   private static String[] dataSet1 =
@@ -61,13 +57,29 @@ public class IoTDBGroupByUnseqIT {
         "flush"
       };
 
-  private static final String TIMESTAMP_STR = "Time";
+  private static String[] dataSet2 =
+      new String[] {
+        "SET STORAGE GROUP TO root.sg2",
+        "CREATE TIMESERIES root.sg2.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+        "INSERT INTO root.sg2.d1(time,s1) values(1, 1)",
+        "INSERT INTO root.sg2.d1(time,s1) values(10, 10)",
+        "flush",
+        "INSERT INTO root.sg2.d1(time,s1) values(19, 19)",
+        "INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
+        "flush",
+        "INSERT INTO root.sg2.d1(time,s1) values(5, 5)",
+        "INSERT INTO root.sg2.d1(time,s1) values(15, 15)",
+        "INSERT INTO root.sg2.d1(time,s1) values(26, 26)",
+        "INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
+        "flush"
+      };
 
   private boolean enableUnseqCompaction;
   private int maxNumberOfPointsInPage;
 
   @Before
   public void setUp() throws Exception {
+    Class.forName(Config.JDBC_DRIVER_NAME);
     EnvironmentUtils.closeStatMonitor();
     enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
     IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
@@ -75,23 +87,6 @@ public class IoTDBGroupByUnseqIT {
         TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(4);
     EnvironmentUtils.envSetUp();
-    Class.forName(Config.JDBC_DRIVER_NAME);
-    prepareData();
-  }
-
-  private void prepareData() {
-    try (Connection connection =
-            DriverManager.getConnection(
-                Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
-        Statement statement = connection.createStatement(); ) {
-
-      for (String sql : dataSet1) {
-        statement.execute(sql);
-      }
-
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
   }
 
   @After
@@ -101,6 +96,11 @@ public class IoTDBGroupByUnseqIT {
     TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
   }
 
+  /**
+   * This test contains one seq file and one unseq file. In the seq file, it contains two pages:
+   * 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page:
+   * 7, 9. The unseq page is overlapped with the second seq page.
+   */
   @Test
   public void test1() {
     String[] retArray1 =
@@ -111,13 +111,17 @@ public class IoTDBGroupByUnseqIT {
     try (Connection connection =
             DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
         Statement statement = connection.createStatement()) {
+
+      for (String sql : dataSet1) {
+        statement.execute(sql);
+      }
+
       boolean hasResultSet =
           statement.execute("select count(s1) from root.sg1.d1 group by ([1, 13), 3ms)");
 
       Assert.assertTrue(hasResultSet);
-      int cnt;
+      int cnt = 0;
       try (ResultSet resultSet = statement.getResultSet()) {
-        cnt = 0;
         while (resultSet.next()) {
           String ans =
               resultSet.getString(TIMESTAMP_STR)
@@ -133,4 +137,53 @@ public class IoTDBGroupByUnseqIT {
       fail(e.getMessage());
     }
   }
+
+  /**
+   * This test contains two seq files and one unseq file. In the first seq file, it contains two
+   * points: [1, 10]. In the second seq file, it contains two points: [19, 30]. In the unseq file,
+   * it contains two CHUNKS: [5, 15], [26, 30]. The unseq file is overlapped with two seq files.
+   * While the chunk [19,30] in the second seq file is unpacked, it should replace [26,30] as the
+   * first chunk.
+   */
+  @Test
+  public void test2() {
+    String[] retArray1 = new String[] {"5,1", "10,1", "15,2", "20,0", "25,1"};
+
+    int preAvgSeriesPointNumberThreshold =
+        IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
+    try (Connection connection =
+            DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+
+      IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
+
+      for (String sql : dataSet2) {
+        statement.execute(sql);
+      }
+
+      boolean hasResultSet =
+          statement.execute("select count(s1) from root.sg2.d1 group by ([5, 30), 5ms)");
+
+      Assert.assertTrue(hasResultSet);
+      int cnt = 0;
+      try (ResultSet resultSet = statement.getResultSet()) {
+        while (resultSet.next()) {
+          String ans =
+              resultSet.getString(TIMESTAMP_STR)
+                  + ","
+                  + resultSet.getString(count("root.sg2.d1.s1"));
+          Assert.assertEquals(retArray1[cnt], ans);
+          cnt++;
+        }
+        Assert.assertEquals(retArray1.length, cnt);
+      }
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    } finally {
+      IoTDBDescriptor.getInstance()
+          .getConfig()
+          .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
+    }
+  }
 }