You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ha...@apache.org on 2021/01/26 07:07:55 UTC

[iotdb] branch rel/0.11 updated: [To rel/0.11]cherry pick fix unseq merge end time bug (#2569)

This is an automated email from the ASF dual-hosted git repository.

haonan pushed a commit to branch rel/0.11
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/0.11 by this push:
     new 8998ff1  [To rel/0.11]cherry pick fix unseq merge end time bug (#2569)
8998ff1 is described below

commit 8998ff14aed9a8ca0572c036008b8e5606e70233
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Jan 26 15:07:16 2021 +0800

    [To rel/0.11]cherry pick fix unseq merge end time bug (#2569)
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 15 ++++-
 .../db/engine/merge/task/MergeMultiChunkTask.java  |  2 +-
 .../java/org/apache/iotdb/db/utils/MergeUtils.java |  2 +-
 .../iotdb/db/engine/merge/MergeTaskTest.java       | 65 +++++++++++++++-------
 .../iotdb/db/integration/IoTDBMergeTest.java       | 51 ++++++++++++++++-
 5 files changed, 108 insertions(+), 27 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 673ac34..367dd6c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -177,6 +177,7 @@ class MergeFileTask {
           }
         }
       }
+      updateStartTimeAndEndTime(seqFile, oldFileWriter);
       oldFileWriter.endFile();
 
       updatePlanIndexes(seqFile);
@@ -191,6 +192,18 @@ class MergeFileTask {
     }
   }
 
+  private void updateStartTimeAndEndTime(TsFileResource seqFile, TsFileIOWriter fileWriter) {
+    //TODO change to get one timeseries block each time
+    for (Entry<String, List<ChunkMetadata>> deviceChunkMetadataEntry : fileWriter
+        .getDeviceChunkMetadataMap().entrySet()) {
+      String device = deviceChunkMetadataEntry.getKey();
+      for (ChunkMetadata chunkMetadata : deviceChunkMetadataEntry.getValue()) {
+        seqFile.updateStartTime(device, chunkMetadata.getStartTime());
+        seqFile.updateEndTime(device, chunkMetadata.getEndTime());
+      }
+    }
+  }
+
   /**
    * Restore an old seq file which is being written new chunks when exceptions occur or the task is
    * aborted.
@@ -287,7 +300,7 @@ class MergeFileTask {
         fileWriter.endChunkGroup();
       }
     }
-
+    updateStartTimeAndEndTime(seqFile, fileWriter);
     fileWriter.endFile();
 
     updatePlanIndexes(seqFile);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
index 5d47170..b1b03d4 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeMultiChunkTask.java
@@ -230,7 +230,7 @@ public class MergeMultiChunkTask {
     // series should also be written into a new chunk
     List<Integer> ret = new ArrayList<>();
     for (int i = 0; i < currMergingPaths.size(); i++) {
-      if (seqChunkMeta[i].isEmpty()
+      if (seqChunkMeta[i] == null || seqChunkMeta[i].isEmpty()
           && !(seqFileIdx + 1 == resource.getSeqFiles().size() && currTimeValuePairs[i] != null)) {
         continue;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
index ce063f4..c9070f4 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/MergeUtils.java
@@ -217,7 +217,7 @@ public class MergeUtils {
 
   public static boolean isChunkOverflowed(TimeValuePair timeValuePair, ChunkMetadata metaData) {
     return timeValuePair != null
-        && timeValuePair.getTimestamp() < metaData.getEndTime();
+        && timeValuePair.getTimestamp() <= metaData.getEndTime();
   }
 
   public static boolean isChunkTooSmall(int ptWritten, ChunkMetadata chunkMetaData,
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index fb73a27..4ecae39 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -50,7 +50,8 @@ public class MergeTaskTest extends MergeTest {
   private File tempSGDir;
 
   @Before
-  public void setUp() throws IOException, WriteProcessException, MetadataException, MetadataException {
+  public void setUp()
+      throws IOException, WriteProcessException, MetadataException, MetadataException {
     super.setUp();
     tempSGDir = new File(TestConstant.BASE_OUTPUT_PATH.concat("tempSG"));
     tempSGDir.mkdirs();
@@ -71,11 +72,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> list = new ArrayList<>();
     list.add(seqResources.get(0));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
-        list, new ArrayList<>(), null, null, true);
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context, list, new ArrayList<>(), null, null, true);
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
       for (int i = 0; i < batchData.length(); i++) {
@@ -86,6 +88,18 @@ public class MergeTaskTest extends MergeTest {
   }
 
   @Test
+  public void testMergeEndTime() throws Exception {
+    List<TsFileResource> testSeqResources = seqResources.subList(0, 3);
+    List<TsFileResource> testUnseqResource = unseqResources.subList(5, 6);
+    MergeTask mergeTask =
+        new MergeTask(new MergeResource(testSeqResources, testUnseqResource), tempSGDir.getPath(),
+            (k, v, l) -> {
+              assertEquals(499, k.get(2).getEndTime("root.mergeTest.device1"));
+            }, "test", false, 1, MERGE_TEST_SG);
+    mergeTask.call();
+  }
+
+  @Test
   public void testFullMerge() throws Exception {
     MergeTask mergeTask =
         new MergeTask(new MergeResource(seqResources, unseqResources), tempSGDir.getPath(),
@@ -95,10 +109,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> list = new ArrayList<>();
     list.add(seqResources.get(0));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context,
         list, new ArrayList<>(), null, null, true);
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
@@ -120,10 +136,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> resources = new ArrayList<>();
     resources.add(seqResources.get(0));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context,
         resources, new ArrayList<>(), null, null, true);
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
@@ -144,11 +162,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> list = new ArrayList<>();
     list.add(seqResources.get(0));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
-        list, new ArrayList<>(), null, null, true);
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context, list, new ArrayList<>(), null, null, true);
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
       for (int i = 0; i < batchData.length(); i++) {
@@ -172,11 +191,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> list = new ArrayList<>();
     list.add(seqResources.get(0));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
-        list, new ArrayList<>(), null, null, true);
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context, list, new ArrayList<>(), null, null, true);
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
       for (int i = 0; i < batchData.length(); i++) {
@@ -196,11 +216,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> list = new ArrayList<>();
     list.add(seqResources.get(2));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
-        list, new ArrayList<>(), null, null, true);
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context, list, new ArrayList<>(), null, null, true);
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
       for (int i = 0; i < batchData.length(); i++) {
@@ -218,7 +239,8 @@ public class MergeTaskTest extends MergeTest {
   public void mergeWithDeletionTest() throws Exception {
     try {
       PartialPath device = new PartialPath(deviceIds[0]);
-      seqResources.get(0).getModFile().write(new Deletion(device.concatNode(measurementSchemas[0].getMeasurementId()), 10000, 0, 49));
+      seqResources.get(0).getModFile().write(
+          new Deletion(device.concatNode(measurementSchemas[0].getMeasurementId()), 10000, 0, 49));
     } finally {
       seqResources.get(0).getModFile().close();
     }
@@ -236,11 +258,12 @@ public class MergeTaskTest extends MergeTest {
     mergeTask.call();
 
     QueryContext context = new QueryContext();
-    PartialPath path = new PartialPath(deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
+    PartialPath path = new PartialPath(
+        deviceIds[0] + TsFileConstant.PATH_SEPARATOR + measurementSchemas[0].getMeasurementId());
     List<TsFileResource> resources = new ArrayList<>();
     resources.add(seqResources.get(0));
-    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(), context,
-        resources, new ArrayList<>(), null, null, true);
+    IBatchReader tsFilesReader = new SeriesRawDataBatchReader(path, measurementSchemas[0].getType(),
+        context, resources, new ArrayList<>(), null, null, true);
     int count = 0;
     while (tsFilesReader.hasNextBatch()) {
       BatchData batchData = tsFilesReader.nextBatch();
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 3c86a2f..500c9ce 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -51,7 +51,7 @@ public class IoTDBMergeTest {
         .getCompactionStrategy();
     IoTDBDescriptor.getInstance().getConfig().setPartitionInterval(1);
     IoTDBDescriptor.getInstance().getConfig()
-        .setCompactionStrategy(CompactionStrategy.NO_COMPACTION);
+        .setCompactionStrategy(CompactionStrategy.LEVEL_COMPACTION);
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
   }
@@ -65,6 +65,52 @@ public class IoTDBMergeTest {
   }
 
   @Test
+  public void testOverlap() throws SQLException {
+    logger.info("test...");
+    try (Connection connection = DriverManager
+        .getConnection(Config.IOTDB_URL_PREFIX + "127.0.0.1:6667/", "root", "root");
+        Statement statement = connection.createStatement()) {
+      statement.execute("SET STORAGE GROUP TO root.mergeTest");
+      try {
+        statement.execute("CREATE TIMESERIES root.mergeTest.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+      } catch (SQLException e) {
+        // ignore
+      }
+
+      statement.execute(String
+          .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 1, 1));
+      statement.execute(String
+          .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 2));
+      statement.execute("FLUSH");
+      statement.execute(String
+          .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 5, 5));
+      statement.execute(String
+          .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 6, 6));
+      statement.execute("FLUSH");
+      statement.execute(String
+          .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 2, 3));
+      statement.execute(String
+          .format("INSERT INTO root.mergeTest(timestamp,s1) VALUES (%d,%d)", 3, 3));
+      statement.execute("FLUSH");
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          long time = resultSet.getLong("Time");
+          long s1 = resultSet.getLong("root.mergeTest.s1");
+          if (time == 2) {
+            assertEquals(3, s1);
+          } else {
+            assertEquals(time, s1);
+          }
+          cnt++;
+        }
+        assertEquals(5, cnt);
+      }
+    }
+  }
+
+  @Test
   public void test() throws SQLException {
     logger.info("test...");
     try (Connection connection = DriverManager
@@ -94,8 +140,7 @@ public class IoTDBMergeTest {
                   + "%d,%d)", j, j + 10, j + 20, j + 30));
         }
         statement.execute("FLUSH");
-        statement.execute("MERGE");
-        try{
+        try {
           Thread.sleep(1000);
         } catch (InterruptedException e) {
           e.printStackTrace();