You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/21 03:30:17 UTC

[incubator-iotdb] branch fix_duplicated_overlap_merge created (now 0df35ae)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch fix_duplicated_overlap_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 0df35ae  fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache

This branch includes the following new commits:

     new 0df35ae  fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_duplicated_overlap_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0df35ae8ac4966215ac2451111003b0628784451
Author: 江天 <jt...@163.com>
AuthorDate: Mon Oct 21 11:29:57 2019 +0800

    fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache
---
 .../universal/CachedPriorityMergeReader.java       |  32 +++--
 .../reader/universal/PriorityMergeReader.java      |   2 +-
 .../{MergeTest.java => MergeOverLapTest.java}      | 146 +++++++++------------
 .../apache/iotdb/db/engine/merge/MergeTest.java    |   4 +-
 4 files changed, 81 insertions(+), 103 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
index e28d6f6..61aeea5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
@@ -34,10 +34,10 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
   private TimeValuePair[] timeValuePairCache = new TimeValuePair[CACHE_SIZE];
   private int cacheLimit = 0;
   private int cacheIdx = 0;
-  private TSDataType dataType;
+
+  private Long lastTimestamp = null;
 
   public CachedPriorityMergeReader(TSDataType dataType) {
-    this.dataType = dataType;
     for (int i = 0; i < CACHE_SIZE; i++) {
       timeValuePairCache[i] = TimeValuePairUtils.getEmptyTimeValuePair(dataType);
     }
@@ -53,22 +53,19 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
     cacheIdx = 0;
     while (!heap.isEmpty() && cacheLimit < CACHE_SIZE) {
       Element top = heap.poll();
-      if (cacheLimit == 0 || top.currTime() != timeValuePairCache[cacheLimit - 1].getTimestamp()) {
+      if (lastTimestamp == null || top.currTime() != lastTimestamp) {
         TimeValuePairUtils.setTimeValuePair(top.timeValuePair, timeValuePairCache[cacheLimit++]);
-        if (top.hasNext()) {
-          top.next();
-          heap.add(top);
-        } else {
-          top.close();
-        }
-      } else if (top.currTime() == timeValuePairCache[cacheLimit - 1].getTimestamp()) {
-        if (top.hasNext()) {
-          top.next();
-          heap.add(top);
-        } else {
-          top.close();
+        lastTimestamp = top.currTime();
+        while (heap.peek() != null && heap.peek().currTime() == lastTimestamp) {
+          heap.poll();
         }
       }
+      if (top.hasNext()) {
+        top.next();
+        heap.add(top);
+      } else {
+        top.close();
+      }
     }
   }
 
@@ -85,11 +82,12 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
   }
 
   @Override
-  public TimeValuePair current() {
+  public TimeValuePair current() throws IOException {
     if (0 <= cacheIdx && cacheIdx < cacheLimit) {
       return timeValuePairCache[cacheIdx];
     } else {
-      return heap.peek().timeValuePair;
+      fetch();
+      return timeValuePairCache[cacheIdx];
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index c3d8096..1e1e5d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -76,7 +76,7 @@ public class PriorityMergeReader implements IPointReader {
   }
 
   @Override
-  public TimeValuePair current() {
+  public TimeValuePair current() throws IOException {
     return heap.peek().timeValuePair;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
similarity index 51%
copy from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
copy to server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 4d7978b..fd07863 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -15,105 +15,56 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
 
 package org.apache.iotdb.db.engine.merge;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
-import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
-abstract class MergeTest {
-
-  static final String MERGE_TEST_SG = "root.mergeTest";
-
-  int seqFileNum = 5;
-  int unseqFileNum = 5;
-  int measurementNum = 10;
-  int deviceNum = 10;
-  long ptNum = 100;
-  long flushInterval = 20;
-  TSEncoding encoding = TSEncoding.PLAIN;
-
-  String[] deviceIds;
-  MeasurementSchema[] measurementSchemas;
-
-  List<TsFileResource> seqResources = new ArrayList<>();
-  List<TsFileResource> unseqResources = new ArrayList<>();
+public class MergeOverLapTest extends MergeTest {
 
-  private int prevMergeChunkThreshold;
+  private File tempSGDir;
 
   @Before
   public void setUp() throws IOException, WriteProcessException, MetadataErrorException {
-    MManager.getInstance().init();
-    prevMergeChunkThreshold =
-        IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
-    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
-    prepareSeries();
-    prepareFiles(seqFileNum, unseqFileNum);
-    MergeManager.getINSTANCE().start();
+    ptNum = 1000;
+    super.setUp();
+    tempSGDir = new File("tempSG");
+    tempSGDir.mkdirs();
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
-    removeFiles();
-    seqResources.clear();
-    unseqResources.clear();
-    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold);
-    TsFileMetaDataCache.getInstance().clear();
-    DeviceMetaDataCache.getInstance().clear();
-    MManager.getInstance().clear();
-    EnvironmentUtils.cleanAllDir();
-    MergeManager.getINSTANCE().stop();
-  }
-
-  private void prepareSeries() throws MetadataErrorException {
-    measurementSchemas = new MeasurementSchema[measurementNum];
-    for (int i = 0; i < measurementNum; i++) {
-      measurementSchemas[i] = new MeasurementSchema("sensor" + i, TSDataType.DOUBLE,
-          encoding, CompressionType.UNCOMPRESSED);
-    }
-    deviceIds = new String[deviceNum];
-    for (int i = 0; i < deviceNum; i++) {
-      deviceIds[i] = MERGE_TEST_SG + PATH_SEPARATOR + "device" + i;
-    }
-    MManager.getInstance().setStorageGroupToMTree(MERGE_TEST_SG);
-    for (String device : deviceIds) {
-      for (MeasurementSchema measurementSchema : measurementSchemas) {
-        MManager.getInstance().addPathToMTree(
-            device + PATH_SEPARATOR + measurementSchema.getMeasurementId(), measurementSchema
-            .getType(), measurementSchema.getEncodingType(), measurementSchema.getCompressor(),
-            Collections.emptyMap());
-      }
-    }
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
   }
 
-  private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+  @Override
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
@@ -132,18 +83,7 @@ abstract class MergeTest {
     prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
   }
 
-  private void removeFiles() throws IOException {
-    for (TsFileResource tsFileResource : seqResources) {
-      tsFileResource.remove();
-    }
-    for (TsFileResource tsFileResource : unseqResources) {
-      tsFileResource.remove();
-    }
-    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
-  }
-
-  private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
+  void prepareUnseqFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
       long valueOffset)
       throws IOException, WriteProcessException {
     TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());
@@ -161,10 +101,50 @@ abstract class MergeTest {
         tsFileResource.updateStartTime(deviceIds[j], i);
         tsFileResource.updateEndTime(deviceIds[j], i);
       }
+      // insert overlapping tuples
+      if ((i + 1) % 100 == 0) {
+        for (int j = 0; j < deviceNum; j++) {
+          TSRecord record = new TSRecord(i, deviceIds[j]);
+          for (int k = 0; k < measurementNum; k++) {
+            record.addTuple(DataPoint.getDataPoint(measurementSchemas[k].getType(),
+                measurementSchemas[k].getMeasurementId(), String.valueOf(i + valueOffset)));
+          }
+          fileWriter.write(record);
+          tsFileResource.updateStartTime(deviceIds[j], i);
+          tsFileResource.updateEndTime(deviceIds[j], i);
+        }
+      }
       if ((i + 1) % flushInterval == 0) {
         fileWriter.flushForTest();
       }
     }
     fileWriter.close();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testFullMerge() throws Exception {
+    MergeTask mergeTask =
+        new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+            true, 1, MERGE_TEST_SG);
+    mergeTask.call();
+
+    QueryContext context = new QueryContext();
+    Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
+    SeqResourceIterateReader tsFilesReader = new SeqResourceIterateReader(path,
+        Collections.singletonList(seqResources.get(0)),
+        null, context);
+    int cnt = 0;
+    try {
+      while (tsFilesReader.hasNext()) {
+        BatchData batchData = tsFilesReader.nextBatch();
+        for (int i = 0; i < batchData.length(); i++) {
+          cnt ++;
+          assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001);
+        }
+      }
+      assertEquals(1000, cnt);
+    } finally {
+      tsFilesReader.close();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 4d7978b..bc3a7b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -113,7 +113,7 @@ abstract class MergeTest {
     }
   }
 
-  private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
@@ -143,7 +143,7 @@ abstract class MergeTest {
     FileReaderManager.getInstance().stop();
   }
 
-  private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
       long valueOffset)
       throws IOException, WriteProcessException {
     TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());