You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by su...@apache.org on 2020/02/26 10:57:30 UTC

[incubator-iotdb] 01/01: [IOTDB-520] Result of IBatchReader should not cross partition

This is an automated email from the ASF dual-hosted git repository.

sunzesong pushed a commit to branch jira_520
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit c4fc9c3ee437712e9b5ca4116dc7290d43de44e1
Author: samperson1997 <sz...@mails.tsinghua.edu.cn>
AuthorDate: Wed Feb 26 18:55:04 2020 +0800

    [IOTDB-520] Result of IBatchReader should not cross partition
---
 .../SystemDesign/5-DataQuery/2-SeriesReader.md     |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java |  4 ++--
 .../reader/universal/PriorityMergeReader.java      | 26 ++++++++++++++++++----
 .../iotdb/db/integration/IoTDBSeriesReaderIT.java  |  4 ++++
 4 files changed, 29 insertions(+), 7 deletions(-)

diff --git a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
index 32a9964..54b0f38 100644
--- a/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
+++ b/docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md
@@ -78,7 +78,7 @@ BatchData nextBatch() throws IOException;
 #### 一般使用流程
 
 ```
-while (batchReader. hasNextBatch()) {
+while (batchReader.hasNextBatch()) {
 	BatchData batchData = batchReader.nextBatch();
 	
 	// use batchData to do some work
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 547b9c8..c1f83c3 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -143,8 +143,8 @@ public class StorageGroupProcessor {
   // includes sealed and unsealed sequence TsFiles
   private TreeSet<TsFileResource> sequenceFileTreeSet = new TreeSet<>(
       (o1, o2) -> {
-        int rangeCompare = o1.getFile().getParentFile().getName()
-            .compareTo(o2.getFile().getParentFile().getName());
+        int rangeCompare = Long.compare(Long.parseLong(o1.getFile().getParentFile().getName()),
+            Long.parseLong(o2.getFile().getParentFile().getName()));
         return rangeCompare == 0 ? compareFileName(o1.getFile(), o2.getFile()) : rangeCompare;
       });
 
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index 6901250..33b9fc4 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -18,12 +18,13 @@
  */
 package org.apache.iotdb.db.query.reader.universal;
 
-import org.apache.iotdb.tsfile.read.reader.IPointReader;
-import org.apache.iotdb.tsfile.read.TimeValuePair;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.PriorityQueue;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.read.reader.IPointReader;
 
 /**
  * This class implements {@link IPointReader} for data sources with different priorities.
@@ -33,6 +34,8 @@ public class PriorityMergeReader implements IPointReader {
   // largest end time of all added readers
   private long currentLargestEndTime;
 
+  private final static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+
   PriorityQueue<Element> heap = new PriorityQueue<>((o1, o2) -> {
     int timeCompare = Long.compare(o1.timeValuePair.getTimestamp(),
         o2.timeValuePair.getTimestamp());
@@ -58,9 +61,24 @@ public class PriorityMergeReader implements IPointReader {
   }
 
   public void addReader(IPointReader reader, long priority, long endTime) throws IOException {
+    long partitionInterval = config.getPartitionInterval();
+    switch (config.getTimestampPrecision()) {
+      case "ns":
+        partitionInterval *= 1000_000_000L;
+        break;
+      case "us":
+        partitionInterval *= 1000_000L;
+        break;
+      default:
+        partitionInterval *= 1000;
+        break;
+    }
     if (reader.hasNextTimeValuePair()) {
       heap.add(new Element(reader, reader.nextTimeValuePair(), priority));
-      currentLargestEndTime = Math.max(currentLargestEndTime, endTime);
+      int partition = Math.round(reader.currentTimeValuePair().getTimestamp() / partitionInterval);
+      // set end time before current partition ends
+      currentLargestEndTime = Math.min((partition + 1) * partitionInterval - 1,
+          Math.max(currentLargestEndTime, endTime));
     } else {
       reader.close();
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
index f722651..b63feb7 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
@@ -80,6 +80,9 @@ public class IoTDBSeriesReaderIT {
     tsFileConfig.setGroupSizeInByte(1024 * 1024 * 150);
     IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(1024 * 16);
 
+    // test result of IBatchReader should not cross partition
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(6);
+
     EnvironmentUtils.envSetUp();
 
     insertData();
@@ -96,6 +99,7 @@ public class IoTDBSeriesReaderIT {
     tsFileConfig.setPageSizeInByte(pageSizeInByte);
     tsFileConfig.setGroupSizeInByte(groupSizeInByte);
     IoTDBDescriptor.getInstance().getConfig().setMemtableSizeThreshold(groupSizeInByte);
+    IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(604800);
 
     EnvironmentUtils.cleanEnv();
   }