You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by qi...@apache.org on 2020/02/26 13:48:43 UTC
[incubator-iotdb] branch master updated: [IOTDB-520] Result of
IBatchReader should not cross partition (#845)
This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 2b8b154 [IOTDB-520] Result of IBatchReader should not cross partition (#845)
2b8b154 is described below
commit 2b8b15418da3431d59302856c65fe93fae61768f
Author: Zesong Sun <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Feb 26 21:48:32 2020 +0800
[IOTDB-520] Result of IBatchReader should not cross partition (#845)
* [IOTDB-520] Result of IBatchReader should not cross partition
---
.../SystemDesign/5-DataQuery/2-SeriesReader.md | 2 +-
.../reader/universal/PriorityMergeReader.java | 26 ++++++++++++++++++----
.../iotdb/db/integration/IoTDBSeriesReaderIT.java | 4 ++++
3 files changed, 27 insertions(+), 5 deletions(-)
diff --git a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
index 32a9964..54b0f38 100644
--- a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
+++ b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
@@ -78,7 +78,7 @@ BatchData nextBatch() throws IOException;
#### 一般使用流程
```
-while (batchReader. hasNextBatch()) {
+while (batchReader.hasNextBatch()) {
BatchData batchData = batchReader.nextBatch();
// use batchData to do some work
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 6901250..e67bd77 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,12 +18,13 @@
*/
package org.apache.iotdb.db.query.reader.universal;
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-
import java.io.IOException;
import java.util.List;
import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
/**
* This class implements {@link IPointReader} for data sources with different priorities.
@@ -33,6 +34,8 @@ public class PriorityMergeReader implements IPointReader {
// largest end time of all added readers
private long currentLargestEndTime;
+ private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
o2.timeValuePair.getTimestamp());
@@ -58,9 +61,24 @@ public class PriorityMergeReader implements IPointReader {
}
public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
+ long partitionInterval = config.getPartitionInterval();
+ switch (config.getTimestampPrecision()) {
+ case "ns":
+ partitionInterval *= 1000_000_000L;
+ break;
+ case "us":
+ partitionInterval *= 1000_000L;
+ break;
+ default:
+ partitionInterval *= 1000;
+ break;
+ }
if (reader.hasNextTimeValuePair()) {
heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
- currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
+ long partition = reader.currentTimeValuePair().getTimestamp() / partitionInterval;
+ // set end time before current partition ends
+ currentLargestEndTime = Math.min((partition + 1) * partitionInterval - 1,
+ Math.max(currentLargestEndTime, endTime));
} else {
reader.close();
}
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index f722651..a90de85 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -80,6 +80,9 @@ public class IoTDBSeriesReaderIT {
tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
+ // test result of IBatchReader should not cross partition
+ IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(2);
+
EnvironmentUtils.envSetUp();
insertData();
@@ -96,6 +99,7 @@ public class IoTDBSeriesReaderIT {
tsFileConfig.setPageSizeInByte(pageSizeInByte);
tsFileConfig.setGroupSizeInByte(groupSizeInByte);
IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+ IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(604800);
EnvironmentUtils.cleanEnv();
}