You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/21 03:30:18 UTC

[incubator-iotdb] 01/01: fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch fix_duplicated_overlap_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 0df35ae8ac4966215ac2451111003b0628784451
Author: 江天 <jt...@163.com>
AuthorDate: Mon Oct 21 11:29:57 2019 +0800

    fix that CachedPriorityMergeReader fails to deduplicate the last element in the cache
---
 .../universal/CachedPriorityMergeReader.java       |  32 +++--
 .../reader/universal/PriorityMergeReader.java      |   2 +-
 .../{MergeTest.java => MergeOverLapTest.java}      | 146 +++++++++------------
 .../apache/iotdb/db/engine/merge/MergeTest.java    |   4 +-
 4 files changed, 81 insertions(+), 103 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
index e28d6f6..61aeea5 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/CachedPriorityMergeReader.java
@@ -34,10 +34,10 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
   private TimeValuePair[] timeValuePairCache = new TimeValuePair[CACHE_SIZE];
   private int cacheLimit = 0;
   private int cacheIdx = 0;
-  private TSDataType dataType;
+
+  private Long lastTimestamp = null;
 
   public CachedPriorityMergeReader(TSDataType dataType) {
-    this.dataType = dataType;
     for (int i = 0; i < CACHE_SIZE; i++) {
       timeValuePairCache[i] = TimeValuePairUtils.getEmptyTimeValuePair(dataType);
     }
@@ -53,22 +53,19 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
     cacheIdx = 0;
     while (!heap.isEmpty() && cacheLimit < CACHE_SIZE) {
       Element top = heap.poll();
-      if (cacheLimit == 0 || top.currTime() != timeValuePairCache[cacheLimit - 1].getTimestamp()) {
+      if (lastTimestamp == null || top.currTime() != lastTimestamp) {
         TimeValuePairUtils.setTimeValuePair(top.timeValuePair, timeValuePairCache[cacheLimit++]);
-        if (top.hasNext()) {
-          top.next();
-          heap.add(top);
-        } else {
-          top.close();
-        }
-      } else if (top.currTime() == timeValuePairCache[cacheLimit - 1].getTimestamp()) {
-        if (top.hasNext()) {
-          top.next();
-          heap.add(top);
-        } else {
-          top.close();
+        lastTimestamp = top.currTime();
+        while (heap.peek() != null && heap.peek().currTime() == lastTimestamp) {
+          heap.poll();
         }
       }
+      if (top.hasNext()) {
+        top.next();
+        heap.add(top);
+      } else {
+        top.close();
+      }
     }
   }
 
@@ -85,11 +82,12 @@ public class CachedPriorityMergeReader extends PriorityMergeReader {
   }
 
   @Override
-  public TimeValuePair current() {
+  public TimeValuePair current() throws IOException {
     if (0 <= cacheIdx && cacheIdx < cacheLimit) {
       return timeValuePairCache[cacheIdx];
     } else {
-      return heap.peek().timeValuePair;
+      fetch();
+      return timeValuePairCache[cacheIdx];
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
index c3d8096..1e1e5d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
+++ b/server/src/main/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReader.java
@@ -76,7 +76,7 @@ public class PriorityMergeReader implements IPointReader {
   }
 
   @Override
-  public TimeValuePair current() {
+  public TimeValuePair current() throws IOException {
     return heap.peek().timeValuePair;
   }
 
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
similarity index 51%
copy from server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
copy to server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
index 4d7978b..fd07863 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
@@ -15,105 +15,56 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
 
 package org.apache.iotdb.db.engine.merge;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.PATH_SEPARATOR;
+import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
-import org.apache.iotdb.db.engine.cache.TsFileMetaDataCache;
-import org.apache.iotdb.db.engine.merge.manage.MergeManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.engine.merge.manage.MergeResource;
+import org.apache.iotdb.db.engine.merge.task.MergeTask;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.exception.StorageEngineException;
-import org.apache.iotdb.db.metadata.MManager;
-import org.apache.iotdb.db.query.control.FileReaderManager;
-import org.apache.iotdb.db.utils.EnvironmentUtils;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.reader.resourceRelated.SeqResourceIterateReader;
 import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
-import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.tsfile.read.common.BatchData;
+import org.apache.iotdb.tsfile.read.common.Path;
 import org.apache.iotdb.tsfile.write.TsFileWriter;
 import org.apache.iotdb.tsfile.write.record.TSRecord;
 import org.apache.iotdb.tsfile.write.record.datapoint.DataPoint;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Test;
 
-abstract class MergeTest {
-
-  static final String MERGE_TEST_SG = "root.mergeTest";
-
-  int seqFileNum = 5;
-  int unseqFileNum = 5;
-  int measurementNum = 10;
-  int deviceNum = 10;
-  long ptNum = 100;
-  long flushInterval = 20;
-  TSEncoding encoding = TSEncoding.PLAIN;
-
-  String[] deviceIds;
-  MeasurementSchema[] measurementSchemas;
-
-  List<TsFileResource> seqResources = new ArrayList<>();
-  List<TsFileResource> unseqResources = new ArrayList<>();
+public class MergeOverLapTest extends MergeTest {
 
-  private int prevMergeChunkThreshold;
+  private File tempSGDir;
 
   @Before
   public void setUp() throws IOException, WriteProcessException, MetadataErrorException {
-    MManager.getInstance().init();
-    prevMergeChunkThreshold =
-        IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
-    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
-    prepareSeries();
-    prepareFiles(seqFileNum, unseqFileNum);
-    MergeManager.getINSTANCE().start();
+    ptNum = 1000;
+    super.setUp();
+    tempSGDir = new File("tempSG");
+    tempSGDir.mkdirs();
   }
 
   @After
   public void tearDown() throws IOException, StorageEngineException {
-    removeFiles();
-    seqResources.clear();
-    unseqResources.clear();
-    IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold);
-    TsFileMetaDataCache.getInstance().clear();
-    DeviceMetaDataCache.getInstance().clear();
-    MManager.getInstance().clear();
-    EnvironmentUtils.cleanAllDir();
-    MergeManager.getINSTANCE().stop();
-  }
-
-  private void prepareSeries() throws MetadataErrorException {
-    measurementSchemas = new MeasurementSchema[measurementNum];
-    for (int i = 0; i < measurementNum; i++) {
-      measurementSchemas[i] = new MeasurementSchema("sensor" + i, TSDataType.DOUBLE,
-          encoding, CompressionType.UNCOMPRESSED);
-    }
-    deviceIds = new String[deviceNum];
-    for (int i = 0; i < deviceNum; i++) {
-      deviceIds[i] = MERGE_TEST_SG + PATH_SEPARATOR + "device" + i;
-    }
-    MManager.getInstance().setStorageGroupToMTree(MERGE_TEST_SG);
-    for (String device : deviceIds) {
-      for (MeasurementSchema measurementSchema : measurementSchemas) {
-        MManager.getInstance().addPathToMTree(
-            device + PATH_SEPARATOR + measurementSchema.getMeasurementId(), measurementSchema
-            .getType(), measurementSchema.getEncodingType(), measurementSchema.getCompressor(),
-            Collections.emptyMap());
-      }
-    }
+    super.tearDown();
+    FileUtils.deleteDirectory(tempSGDir);
   }
 
-  private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+  @Override
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
@@ -132,18 +83,7 @@ abstract class MergeTest {
     prepareFile(tsFileResource, 0, ptNum * unseqFileNum, 20000);
   }
 
-  private void removeFiles() throws IOException {
-    for (TsFileResource tsFileResource : seqResources) {
-      tsFileResource.remove();
-    }
-    for (TsFileResource tsFileResource : unseqResources) {
-      tsFileResource.remove();
-    }
-    FileReaderManager.getInstance().closeAndRemoveAllOpenedReaders();
-    FileReaderManager.getInstance().stop();
-  }
-
-  private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
+  void prepareUnseqFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
       long valueOffset)
       throws IOException, WriteProcessException {
     TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());
@@ -161,10 +101,50 @@ abstract class MergeTest {
         tsFileResource.updateStartTime(deviceIds[j], i);
         tsFileResource.updateEndTime(deviceIds[j], i);
       }
+      // insert overlapping tuples
+      if ((i + 1) % 100 == 0) {
+        for (int j = 0; j < deviceNum; j++) {
+          TSRecord record = new TSRecord(i, deviceIds[j]);
+          for (int k = 0; k < measurementNum; k++) {
+            record.addTuple(DataPoint.getDataPoint(measurementSchemas[k].getType(),
+                measurementSchemas[k].getMeasurementId(), String.valueOf(i + valueOffset)));
+          }
+          fileWriter.write(record);
+          tsFileResource.updateStartTime(deviceIds[j], i);
+          tsFileResource.updateEndTime(deviceIds[j], i);
+        }
+      }
       if ((i + 1) % flushInterval == 0) {
         fileWriter.flushForTest();
       }
     }
     fileWriter.close();
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testFullMerge() throws Exception {
+    MergeTask mergeTask =
+        new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(), (k, v, l) -> {}, "test",
+            true, 1, MERGE_TEST_SG);
+    mergeTask.call();
+
+    QueryContext context = new QueryContext();
+    Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
+    SeqResourceIterateReader tsFilesReader = new SeqResourceIterateReader(path,
+        Collections.singletonList(seqResources.get(0)),
+        null, context);
+    int cnt = 0;
+    try {
+      while (tsFilesReader.hasNext()) {
+        BatchData batchData = tsFilesReader.nextBatch();
+        for (int i = 0; i < batchData.length(); i++) {
+          cnt ++;
+          assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001);
+        }
+      }
+      assertEquals(1000, cnt);
+    } finally {
+      tsFilesReader.close();
+    }
+  }
+}
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 4d7978b..bc3a7b6 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -113,7 +113,7 @@ abstract class MergeTest {
     }
   }
 
-  private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+  void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
     for (int i = 0; i < seqFileNum; i++) {
       File file = SystemFileFactory.INSTANCE.getFile(i + "seq.tsfile");
       TsFileResource tsFileResource = new TsFileResource(file);
@@ -143,7 +143,7 @@ abstract class MergeTest {
     FileReaderManager.getInstance().stop();
   }
 
-  private void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
+  void prepareFile(TsFileResource tsFileResource, long timeOffset, long ptNum,
       long valueOffset)
       throws IOException, WriteProcessException {
     TsFileWriter fileWriter = new TsFileWriter(tsFileResource.getFile());