You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/01/12 10:02:32 UTC

[iotdb] branch master updated: Revert inplace merge (#2398)

This is an automated email from the ASF dual-hosted git repository.

ejttianyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 06b576e  Revert inplace merge (#2398)
06b576e is described below

commit 06b576eeee4912e751b36e58f52069fe00c4b111
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Jan 12 18:02:06 2021 +0800

    Revert inplace merge (#2398)
    
    * add enable unseq compaction
    
    * revert inplace merge
    
    Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
 .../compaction/CompactionMergeTaskPoolManager.java |   2 +-
 .../db/engine/compaction/TsFileManagement.java     |  11 ++
 .../level/LevelCompactionTsFileManagement.java     |   3 +
 .../iotdb/db/engine/merge/task/MergeFileTask.java  | 136 +++++++++++++++++++--
 .../apache/iotdb/db/integration/IoTDBFillIT.java   |   6 +
 .../db/integration/IoTDBLevelCompactionIT.java     |   4 -
 .../iotdb/db/integration/IoTDBMergeTest.java       |   5 +
 .../iotdb/db/integration/IoTDBSensorUpdateIT.java  |   3 -
 8 files changed, 153 insertions(+), 17 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 893c667..8506d4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -86,7 +86,7 @@ public class CompactionMergeTaskPoolManager implements IService {
           FilePathUtils.regularizePath(IoTDBDescriptor.getInstance().getConfig().getSystemDir())
               + "storage_groups");
       File[] subDirList = sgDir.listFiles();
-      if(subDirList!=null) {
+      if (subDirList != null) {
         for (File subDir : subDirList) {
           while (FSFactoryProducer.getFSFactory().getFile(
               subDir.getAbsoluteFile() + File.separator + subDir.getName() + COMPACTION_LOG_NAME)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 1dd4fa5..3d8e755 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -61,6 +61,7 @@ public abstract class TsFileManagement {
   private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
 
   public volatile boolean isUnseqMerging = false;
+  public volatile boolean isSeqMerging = false;
   /**
    * This is the modification file of the result of the current merge. Because the merged file may
    * be invisible at this moment, without this, deletion/update during merge could be lost.
@@ -202,6 +203,16 @@ public abstract class TsFileManagement {
       }
       return;
     }
+    // wait until seq merge has finished
+    while (isSeqMerging) {
+      try {
+        wait(200);
+      } catch (InterruptedException e) {
+        logger.error("{} [Compaction] shutdown", storageGroupName, e);
+        Thread.currentThread().interrupt();
+        return;
+      }
+    }
     isUnseqMerging = true;
 
     if (seqMergeList.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index ed79a0e..940c841 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -526,6 +526,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
         return;
       }
     }
+    isSeqMerging = true;
     long startTimeMillis = System.currentTimeMillis();
     try {
       logger.info("{} start to filter compaction condition", storageGroupName);
@@ -534,6 +535,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
           // level is numbered from 0
           if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
             // do not merge current unseq file level to upper level and just merge all of them to seq file
+            isSeqMerging = false;
             merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE);
           } else {
             CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
@@ -589,6 +591,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
     } catch (Exception e) {
       logger.error("Error occurred in Compaction Merge thread", e);
     } finally {
+      isSeqMerging = false;
       // reset the merge working state to false
       logger.info("{} [Compaction] merge end time isSeq = {}, consumption: {} ms",
           storageGroupName, sequence,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 3f6370c..673ac34 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.merge.task;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -34,12 +35,15 @@ import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.metadata.PartialPath;
 import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
 import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,13 +90,23 @@ class MergeFileTask {
 
       int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
       int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
-
-      if (logger.isInfoEnabled()) {
-        logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
-                + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
-            unmergedChunkNum);
+      if (mergedChunkNum >= unmergedChunkNum) {
+        // move the unmerged data to the new file
+        if (logger.isInfoEnabled()) {
+          logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+                  + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+              unmergedChunkNum);
+        }
+        moveUnmergedToNew(seqFile);
+      } else {
+        // move the merged data to the old file
+        if (logger.isInfoEnabled()) {
+          logger.info("{} moving merged data of {} to the old file {} merged chunks, {} "
+                  + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+              unmergedChunkNum);
+        }
+        moveMergedToOld(seqFile);
       }
-      moveUnmergedToNew(seqFile);
 
       if (Thread.interrupted()) {
         Thread.currentThread().interrupt();
@@ -120,6 +134,95 @@ class MergeFileTask {
         currentMergeIndex + 1, unmergedFiles.size());
   }
 
+  private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+    int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
+    if (mergedChunkNum == 0) {
+      resource.removeFileAndWriter(seqFile);
+      return;
+    }
+
+    seqFile.writeLock();
+    try {
+      ChunkMetadataCache.getInstance().remove(seqFile);
+      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+
+      resource.removeFileReader(seqFile);
+      TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
+
+      // filter the chunks that have been merged
+      oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile))
+      );
+
+      RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
+      newFileWriter.close();
+      try (TsFileSequenceReader newFileReader =
+          new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
+        Map<String, List<ChunkMetadata>> chunkMetadataListInChunkGroups =
+            newFileWriter.getDeviceChunkMetadataMap();
+        if (logger.isDebugEnabled()) {
+          logger.debug("{} find {} merged chunk groups", taskName,
+              chunkMetadataListInChunkGroups.size());
+        }
+        for (Map.Entry<String, List<ChunkMetadata>> entry : chunkMetadataListInChunkGroups
+            .entrySet()) {
+          String deviceId = entry.getKey();
+          List<ChunkMetadata> chunkMetadataList = entry.getValue();
+          writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter);
+
+          if (Thread.interrupted()) {
+            Thread.currentThread().interrupt();
+            oldFileWriter.close();
+            restoreOldFile(seqFile);
+            return;
+          }
+        }
+      }
+      oldFileWriter.endFile();
+
+      updatePlanIndexes(seqFile);
+      seqFile.serialize();
+      mergeLogger.logFileMergeEnd();
+      logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
+    } catch (Exception e) {
+      restoreOldFile(seqFile);
+      throw e;
+    } finally {
+      seqFile.writeUnlock();
+    }
+  }
+
+  /**
+   * Restore an old seq file which is being written new chunks when exceptions occur or the task is
+   * aborted.
+   */
+  private void restoreOldFile(TsFileResource seqFile) throws IOException {
+    RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter(
+        seqFile.getTsFile());
+    if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
+      oldFileRecoverWriter.endFile();
+    } else {
+      oldFileRecoverWriter.close();
+    }
+  }
+
+  /**
+   * Open an appending writer for an old seq file so we can add new chunks to it.
+   */
+  private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException {
+    TsFileIOWriter oldFileWriter;
+    try {
+      oldFileWriter = new ForceAppendTsFileWriter(seqFile.getTsFile());
+      mergeLogger.logFileMergeStart(seqFile.getTsFile(),
+          ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+      logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
+      ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+    } catch (TsFileNotCompleteException e) {
+      // this file may already be truncated if this merge is a system reboot merge
+      oldFileWriter = new RestorableTsFileIOWriter(seqFile.getTsFile());
+    }
+    return oldFileWriter;
+  }
+
   private void updatePlanIndexes(TsFileResource seqFile) {
     // as the new file contains data of other files, track their plan indexes in the new file
     // so that we will be able to compare data across different IoTDBs that share the same index
@@ -132,6 +235,21 @@ class MergeFileTask {
     }
   }
 
+  private void writeMergedChunkGroup(List<ChunkMetadata> chunkMetadataList, String device,
+      TsFileSequenceReader reader, TsFileIOWriter fileWriter)
+      throws IOException {
+    fileWriter.startChunkGroup(device);
+    long maxVersion = 0;
+    for (ChunkMetadata chunkMetaData : chunkMetadataList) {
+      Chunk chunk = reader.readMemChunk(chunkMetaData);
+      fileWriter.writeChunk(chunk, chunkMetaData);
+      maxVersion = Math.max(chunkMetaData.getVersion(), maxVersion);
+      context.incTotalPointWritten(chunkMetaData.getNumOfPoints());
+    }
+    fileWriter.writeVersion(maxVersion);
+    fileWriter.endChunkGroup();
+  }
+
   private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
     Map<PartialPath, List<Long>> fileUnmergedChunkStartTimes =
         context.getUnmergedChunkStartTimes().get(seqFile);
@@ -181,7 +299,6 @@ class MergeFileTask {
     try {
       resource.removeFileReader(seqFile);
       ChunkMetadataCache.getInstance().remove(seqFile);
-      FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
 
       File newMergeFile = seqFile.getTsFile();
       newMergeFile.delete();
@@ -212,7 +329,7 @@ class MergeFileTask {
         if (metaData.getStartTime() == startTime) {
           Chunk chunk = reader.readMemChunk(metaData);
           fileWriter.writeChunk(chunk, metaData);
-          maxVersion = Math.max(metaData.getVersion(), maxVersion);
+          maxVersion = metaData.getVersion() > maxVersion ? metaData.getVersion() : maxVersion;
           context.incTotalPointWritten(metaData.getNumOfPoints());
           break;
         }
@@ -225,4 +342,5 @@ class MergeFileTask {
     }
     return maxVersion;
   }
-}
+
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index 284c17b..1410b61 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -25,6 +25,7 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.jdbc.Config;
 import org.junit.After;
@@ -129,9 +130,13 @@ public class IoTDBFillIT {
   private static final String STATUS_STR_2 = "root.ln.wf01.wt02.status";
   private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
 
+  boolean prevEnableUnseqCompaction;
+
   @Before
   public void setUp() throws Exception {
     EnvironmentUtils.closeStatMonitor();
+    prevEnableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
     EnvironmentUtils.envSetUp();
     Class.forName(Config.JDBC_DRIVER_NAME);
     prepareData();
@@ -140,6 +145,7 @@ public class IoTDBFillIT {
   @After
   public void tearDown() throws Exception {
     EnvironmentUtils.cleanEnv();
+    IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
   }
 
   @Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
index 022f2cd..327dbfd 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
@@ -18,7 +18,6 @@
  */
 package org.apache.iotdb.db.integration;
 
-import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
 import static org.junit.Assert.assertEquals;
 
 import java.sql.Connection;
@@ -26,13 +25,10 @@ import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.Random;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FilePathUtils;
 import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 9f67377..3c86a2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -95,6 +95,11 @@ public class IoTDBMergeTest {
         }
         statement.execute("FLUSH");
         statement.execute("MERGE");
+        try{
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
 
         int cnt;
         try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
index c404297..b35afbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -79,9 +79,6 @@ public class IoTDBSensorUpdateIT {
         }
         assertEquals(1, cnt);
       }
-
-      // after merge completes
-      statement.execute("DELETE FROM root.demo.d1");
     }
   }
 }