You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ja...@apache.org on 2021/06/03 02:22:58 UTC
[iotdb] branch master updated: Fix group by data inconsistence bug
(#3317)
This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9883106 Fix group by data inconsistence bug (#3317)
9883106 is described below
commit 9883106d9324067bcc5c302cd2fd901809cac695
Author: Xiangwei Wei <34...@users.noreply.github.com>
AuthorDate: Thu Jun 3 10:22:35 2021 +0800
Fix group by data inconsistence bug (#3317)
Fix group by data inconsistence bug
---
.../db/query/aggregation/impl/CountAggrResult.java | 24 ++---
.../iotdb/db/query/reader/series/SeriesReader.java | 3 +-
.../iotdb/db/integration/IoTDBGroupByUnseqIT.java | 103 ++++++++++++++++-----
3 files changed, 87 insertions(+), 43 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
index 7b40fda..00bfcad 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/CountAggrResult.java
@@ -45,29 +45,25 @@ public class CountAggrResult extends AggregateResult {
@Override
public void updateResultFromStatistics(Statistics statistics) {
- long preValue = getLongValue();
- preValue += statistics.getCount();
- setLongValue(preValue);
+ setLongValue(getLongValue() + statistics.getCount());
}
@Override
public void updateResultFromPageData(BatchData dataInThisPage) {
- int cnt = dataInThisPage.length();
- long preValue = getLongValue();
- preValue += cnt;
- setLongValue(preValue);
+ setLongValue(getLongValue() + dataInThisPage.length());
}
@Override
public void updateResultFromPageData(BatchData dataInThisPage, long minBound, long maxBound) {
+ int cnt = 0;
while (dataInThisPage.hasCurrent()) {
if (dataInThisPage.currentTime() >= maxBound || dataInThisPage.currentTime() < minBound) {
break;
}
- long preValue = getLongValue();
- setLongValue(++preValue);
+ cnt++;
dataInThisPage.next();
}
+ setLongValue(getLongValue() + cnt);
}
@Override
@@ -80,10 +76,7 @@ public class CountAggrResult extends AggregateResult {
cnt++;
}
}
-
- long preValue = getLongValue();
- preValue += cnt;
- setLongValue(preValue);
+ setLongValue(getLongValue() + cnt);
}
@Override
@@ -94,10 +87,7 @@ public class CountAggrResult extends AggregateResult {
cnt++;
}
}
-
- long preValue = getLongValue();
- preValue += cnt;
- setLongValue(preValue);
+ setLongValue(getLongValue() + cnt);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
index f03f835..e4e0894 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/series/SeriesReader.java
@@ -297,11 +297,12 @@ public class SeriesReader {
* first time series metadata is already unpacked, consume cached ChunkMetadata
*/
if (!cachedChunkMetadata.isEmpty()) {
- firstChunkMetadata = cachedChunkMetadata.poll();
+ firstChunkMetadata = cachedChunkMetadata.peek();
unpackAllOverlappedTsFilesToTimeSeriesMetadata(
orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()));
unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata(
orderUtils.getOverlapCheckTime(firstChunkMetadata.getStatistics()), false);
+ firstChunkMetadata = cachedChunkMetadata.poll();
}
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
index c8c69b3..e515612 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBGroupByUnseqIT.java
@@ -33,14 +33,10 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
+import static org.apache.iotdb.db.constant.TestConstant.TIMESTAMP_STR;
import static org.apache.iotdb.db.constant.TestConstant.count;
import static org.junit.Assert.fail;
-/**
- * This test contains one seq file and one unseq file. In the seq file, it contains two pages:
- * 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page: 7,
- * 9. The unseq page is overlapped with the second seq page.
- */
public class IoTDBGroupByUnseqIT {
private static String[] dataSet1 =
@@ -61,13 +57,29 @@ public class IoTDBGroupByUnseqIT {
"flush"
};
- private static final String TIMESTAMP_STR = "Time";
+ private static String[] dataSet2 =
+ new String[] {
+ "SET STORAGE GROUP TO root.sg2",
+ "CREATE TIMESERIES root.sg2.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "INSERT INTO root.sg2.d1(time,s1) values(1, 1)",
+ "INSERT INTO root.sg2.d1(time,s1) values(10, 10)",
+ "flush",
+ "INSERT INTO root.sg2.d1(time,s1) values(19, 19)",
+ "INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
+ "flush",
+ "INSERT INTO root.sg2.d1(time,s1) values(5, 5)",
+ "INSERT INTO root.sg2.d1(time,s1) values(15, 15)",
+ "INSERT INTO root.sg2.d1(time,s1) values(26, 26)",
+ "INSERT INTO root.sg2.d1(time,s1) values(30, 30)",
+ "flush"
+ };
private boolean enableUnseqCompaction;
private int maxNumberOfPointsInPage;
@Before
public void setUp() throws Exception {
+ Class.forName(Config.JDBC_DRIVER_NAME);
EnvironmentUtils.closeStatMonitor();
enableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
@@ -75,23 +87,6 @@ public class IoTDBGroupByUnseqIT {
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(4);
EnvironmentUtils.envSetUp();
- Class.forName(Config.JDBC_DRIVER_NAME);
- prepareData();
- }
-
- private void prepareData() {
- try (Connection connection =
- DriverManager.getConnection(
- Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
- Statement statement = connection.createStatement(); ) {
-
- for (String sql : dataSet1) {
- statement.execute(sql);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- }
}
@After
@@ -101,6 +96,11 @@ public class IoTDBGroupByUnseqIT {
TSFileDescriptor.getInstance().getConfig().setMaxNumberOfPointsInPage(maxNumberOfPointsInPage);
}
+ /**
+ * This test contains one seq file and one unseq file. In the seq file, it contains two pages:
+ * 1,2,3,4 in one page, 8,10,11,12 in another page. In the unseq file, it only contains one page:
+ * 7, 9. The unseq page is overlapped with the second seq page.
+ */
@Test
public void test1() {
String[] retArray1 =
@@ -111,13 +111,17 @@ public class IoTDBGroupByUnseqIT {
try (Connection connection =
DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
Statement statement = connection.createStatement()) {
+
+ for (String sql : dataSet1) {
+ statement.execute(sql);
+ }
+
boolean hasResultSet =
statement.execute("select count(s1) from root.sg1.d1 group by ([1, 13), 3ms)");
Assert.assertTrue(hasResultSet);
- int cnt;
+ int cnt = 0;
try (ResultSet resultSet = statement.getResultSet()) {
- cnt = 0;
while (resultSet.next()) {
String ans =
resultSet.getString(TIMESTAMP_STR)
@@ -133,4 +137,53 @@ public class IoTDBGroupByUnseqIT {
fail(e.getMessage());
}
}
+
+ /**
+ * This test contains two seq files and one unseq file. In the first seq file, it contains two
+ * points: [1, 10]. In the second seq file, it contains two points: [19, 30]. In the unseq file,
+ * it contains two CHUNKS: [5, 15], [26, 30]. The unseq file is overlapped with two seq files.
+ * While the chunk [19,30] in the second seq file is unpacked, it should replace [26,30] as the
+ * first chunk.
+ */
+ @Test
+ public void test2() {
+ String[] retArray1 = new String[] {"5,1", "10,1", "15,2", "20,0", "25,1"};
+
+ int preAvgSeriesPointNumberThreshold =
+ IoTDBDescriptor.getInstance().getConfig().getAvgSeriesPointNumberThreshold();
+ try (Connection connection =
+ DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root");
+ Statement statement = connection.createStatement()) {
+
+ IoTDBDescriptor.getInstance().getConfig().setAvgSeriesPointNumberThreshold(2);
+
+ for (String sql : dataSet2) {
+ statement.execute(sql);
+ }
+
+ boolean hasResultSet =
+ statement.execute("select count(s1) from root.sg2.d1 group by ([5, 30), 5ms)");
+
+ Assert.assertTrue(hasResultSet);
+ int cnt = 0;
+ try (ResultSet resultSet = statement.getResultSet()) {
+ while (resultSet.next()) {
+ String ans =
+ resultSet.getString(TIMESTAMP_STR)
+ + ","
+ + resultSet.getString(count("root.sg2.d1.s1"));
+ Assert.assertEquals(retArray1[cnt], ans);
+ cnt++;
+ }
+ Assert.assertEquals(retArray1.length, cnt);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ } finally {
+ IoTDBDescriptor.getInstance()
+ .getConfig()
+ .setAvgSeriesPointNumberThreshold(preAvgSeriesPointNumberThreshold);
+ }
+ }
}