You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ej...@apache.org on 2021/01/12 10:02:32 UTC
[iotdb] branch master updated: Revert inplace merge (#2398)
This is an automated email from the ASF dual-hosted git repository.
ejttianyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 06b576e Revert inplace merge (#2398)
06b576e is described below
commit 06b576eeee4912e751b36e58f52069fe00c4b111
Author: zhanglingzhe0820 <44...@qq.com>
AuthorDate: Tue Jan 12 18:02:06 2021 +0800
Revert inplace merge (#2398)
* add enable unseq compaction
* revert inplace merge
Co-authored-by: zhanglingzhe <su...@foxmail.com>
---
.../compaction/CompactionMergeTaskPoolManager.java | 2 +-
.../db/engine/compaction/TsFileManagement.java | 11 ++
.../level/LevelCompactionTsFileManagement.java | 3 +
.../iotdb/db/engine/merge/task/MergeFileTask.java | 136 +++++++++++++++++++--
.../apache/iotdb/db/integration/IoTDBFillIT.java | 6 +
.../db/integration/IoTDBLevelCompactionIT.java | 4 -
.../iotdb/db/integration/IoTDBMergeTest.java | 5 +
.../iotdb/db/integration/IoTDBSensorUpdateIT.java | 3 -
8 files changed, 153 insertions(+), 17 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
index 893c667..8506d4e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/CompactionMergeTaskPoolManager.java
@@ -86,7 +86,7 @@ public class CompactionMergeTaskPoolManager implements IService {
FilePathUtils.regularizePath(IoTDBDescriptor.getInstance().getConfig().getSystemDir())
+ "storage_groups");
File[] subDirList = sgDir.listFiles();
- if(subDirList!=null) {
+ if (subDirList != null) {
for (File subDir : subDirList) {
while (FSFactoryProducer.getFSFactory().getFile(
subDir.getAbsoluteFile() + File.separator + subDir.getName() + COMPACTION_LOG_NAME)
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
index 1dd4fa5..3d8e755 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/TsFileManagement.java
@@ -61,6 +61,7 @@ public abstract class TsFileManagement {
private final ReadWriteLock compactionMergeLock = new ReentrantReadWriteLock();
public volatile boolean isUnseqMerging = false;
+ public volatile boolean isSeqMerging = false;
/**
* This is the modification file of the result of the current merge. Because the merged file may
* be invisible at this moment, without this, deletion/update during merge could be lost.
@@ -202,6 +203,16 @@ public abstract class TsFileManagement {
}
return;
}
+ // wait until seq merge has finished
+ while (isSeqMerging) {
+ try {
+ wait(200);
+ } catch (InterruptedException e) {
+ logger.error("{} [Compaction] shutdown", storageGroupName, e);
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
isUnseqMerging = true;
if (seqMergeList.isEmpty()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
index ed79a0e..940c841 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/level/LevelCompactionTsFileManagement.java
@@ -526,6 +526,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
return;
}
}
+ isSeqMerging = true;
long startTimeMillis = System.currentTimeMillis();
try {
logger.info("{} start to filter compaction condition", storageGroupName);
@@ -534,6 +535,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
// level is numbered from 0
if (enableUnseqCompaction && !sequence && i == currMaxLevel - 2) {
// do not merge current unseq file level to upper level and just merge all of them to seq file
+ isSeqMerging = false;
merge(isForceFullMerge, getTsFileList(true), mergeResources.get(i), Long.MAX_VALUE);
} else {
CompactionLogger compactionLogger = new CompactionLogger(storageGroupDir,
@@ -589,6 +591,7 @@ public class LevelCompactionTsFileManagement extends TsFileManagement {
} catch (Exception e) {
logger.error("Error occurred in Compaction Merge thread", e);
} finally {
+ isSeqMerging = false;
// reset the merge working state to false
logger.info("{} [Compaction] merge end time isSeq = {}, consumption: {} ms",
storageGroupName, sequence,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
index 3f6370c..673ac34 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeFileTask.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.engine.merge.task;
import java.io.File;
import java.io.IOException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -34,12 +35,15 @@ import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.query.control.FileReaderManager;
+import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
import org.apache.iotdb.tsfile.read.common.Chunk;
+import org.apache.iotdb.tsfile.write.writer.ForceAppendTsFileWriter;
import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.apache.iotdb.tsfile.write.writer.TsFileIOWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,13 +90,23 @@ class MergeFileTask {
int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
int unmergedChunkNum = context.getUnmergedChunkCnt().getOrDefault(seqFile, 0);
-
- if (logger.isInfoEnabled()) {
- logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
- + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
- unmergedChunkNum);
+ if (mergedChunkNum >= unmergedChunkNum) {
+ // move the unmerged data to the new file
+ if (logger.isInfoEnabled()) {
+ logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+ + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+ unmergedChunkNum);
+ }
+ moveUnmergedToNew(seqFile);
+ } else {
+ // move the merged data to the old file
+ if (logger.isInfoEnabled()) {
+ logger.info("{} moving merged data of {} to the old file {} merged chunks, {} "
+ + "unmerged chunks", taskName, seqFile.getTsFile().getName(), mergedChunkNum,
+ unmergedChunkNum);
+ }
+ moveMergedToOld(seqFile);
}
- moveUnmergedToNew(seqFile);
if (Thread.interrupted()) {
Thread.currentThread().interrupt();
@@ -120,6 +134,95 @@ class MergeFileTask {
currentMergeIndex + 1, unmergedFiles.size());
}
+ private void moveMergedToOld(TsFileResource seqFile) throws IOException {
+ int mergedChunkNum = context.getMergedChunkCnt().getOrDefault(seqFile, 0);
+ if (mergedChunkNum == 0) {
+ resource.removeFileAndWriter(seqFile);
+ return;
+ }
+
+ seqFile.writeLock();
+ try {
+ ChunkMetadataCache.getInstance().remove(seqFile);
+ FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
+
+ resource.removeFileReader(seqFile);
+ TsFileIOWriter oldFileWriter = getOldFileWriter(seqFile);
+
+ // filter the chunks that have been merged
+ oldFileWriter.filterChunks(new HashMap<>(context.getUnmergedChunkStartTimes().get(seqFile))
+ );
+
+ RestorableTsFileIOWriter newFileWriter = resource.getMergeFileWriter(seqFile);
+ newFileWriter.close();
+ try (TsFileSequenceReader newFileReader =
+ new TsFileSequenceReader(newFileWriter.getFile().getPath())) {
+ Map<String, List<ChunkMetadata>> chunkMetadataListInChunkGroups =
+ newFileWriter.getDeviceChunkMetadataMap();
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} find {} merged chunk groups", taskName,
+ chunkMetadataListInChunkGroups.size());
+ }
+ for (Map.Entry<String, List<ChunkMetadata>> entry : chunkMetadataListInChunkGroups
+ .entrySet()) {
+ String deviceId = entry.getKey();
+ List<ChunkMetadata> chunkMetadataList = entry.getValue();
+ writeMergedChunkGroup(chunkMetadataList, deviceId, newFileReader, oldFileWriter);
+
+ if (Thread.interrupted()) {
+ Thread.currentThread().interrupt();
+ oldFileWriter.close();
+ restoreOldFile(seqFile);
+ return;
+ }
+ }
+ }
+ oldFileWriter.endFile();
+
+ updatePlanIndexes(seqFile);
+ seqFile.serialize();
+ mergeLogger.logFileMergeEnd();
+ logger.debug("{} moved merged chunks of {} to the old file", taskName, seqFile);
+ } catch (Exception e) {
+ restoreOldFile(seqFile);
+ throw e;
+ } finally {
+ seqFile.writeUnlock();
+ }
+ }
+
+ /**
+ * Restore an old seq file which is being written new chunks when exceptions occur or the task is
+ * aborted.
+ */
+ private void restoreOldFile(TsFileResource seqFile) throws IOException {
+ RestorableTsFileIOWriter oldFileRecoverWriter = new RestorableTsFileIOWriter(
+ seqFile.getTsFile());
+ if (oldFileRecoverWriter.hasCrashed() && oldFileRecoverWriter.canWrite()) {
+ oldFileRecoverWriter.endFile();
+ } else {
+ oldFileRecoverWriter.close();
+ }
+ }
+
+ /**
+ * Open an appending writer for an old seq file so we can add new chunks to it.
+ */
+ private TsFileIOWriter getOldFileWriter(TsFileResource seqFile) throws IOException {
+ TsFileIOWriter oldFileWriter;
+ try {
+ oldFileWriter = new ForceAppendTsFileWriter(seqFile.getTsFile());
+ mergeLogger.logFileMergeStart(seqFile.getTsFile(),
+ ((ForceAppendTsFileWriter) oldFileWriter).getTruncatePosition());
+ logger.debug("{} moving merged chunks of {} to the old file", taskName, seqFile);
+ ((ForceAppendTsFileWriter) oldFileWriter).doTruncate();
+ } catch (TsFileNotCompleteException e) {
+ // this file may already be truncated if this merge is a system reboot merge
+ oldFileWriter = new RestorableTsFileIOWriter(seqFile.getTsFile());
+ }
+ return oldFileWriter;
+ }
+
private void updatePlanIndexes(TsFileResource seqFile) {
// as the new file contains data of other files, track their plan indexes in the new file
// so that we will be able to compare data across different IoTDBs that share the same index
@@ -132,6 +235,21 @@ class MergeFileTask {
}
}
+ private void writeMergedChunkGroup(List<ChunkMetadata> chunkMetadataList, String device,
+ TsFileSequenceReader reader, TsFileIOWriter fileWriter)
+ throws IOException {
+ fileWriter.startChunkGroup(device);
+ long maxVersion = 0;
+ for (ChunkMetadata chunkMetaData : chunkMetadataList) {
+ Chunk chunk = reader.readMemChunk(chunkMetaData);
+ fileWriter.writeChunk(chunk, chunkMetaData);
+ maxVersion = Math.max(chunkMetaData.getVersion(), maxVersion);
+ context.incTotalPointWritten(chunkMetaData.getNumOfPoints());
+ }
+ fileWriter.writeVersion(maxVersion);
+ fileWriter.endChunkGroup();
+ }
+
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
Map<PartialPath, List<Long>> fileUnmergedChunkStartTimes =
context.getUnmergedChunkStartTimes().get(seqFile);
@@ -181,7 +299,6 @@ class MergeFileTask {
try {
resource.removeFileReader(seqFile);
ChunkMetadataCache.getInstance().remove(seqFile);
- FileReaderManager.getInstance().closeFileAndRemoveReader(seqFile.getTsFilePath());
File newMergeFile = seqFile.getTsFile();
newMergeFile.delete();
@@ -212,7 +329,7 @@ class MergeFileTask {
if (metaData.getStartTime() == startTime) {
Chunk chunk = reader.readMemChunk(metaData);
fileWriter.writeChunk(chunk, metaData);
- maxVersion = Math.max(metaData.getVersion(), maxVersion);
+ maxVersion = metaData.getVersion() > maxVersion ? metaData.getVersion() : maxVersion;
context.incTotalPointWritten(metaData.getNumOfPoints());
break;
}
@@ -225,4 +342,5 @@ class MergeFileTask {
}
return maxVersion;
}
-}
+
+}
\ No newline at end of file
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
index 284c17b..1410b61 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBFillIT.java
@@ -25,6 +25,7 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.jdbc.Config;
import org.junit.After;
@@ -129,9 +130,13 @@ public class IoTDBFillIT {
private static final String STATUS_STR_2 = "root.ln.wf01.wt02.status";
private static final String HARDWARE_STR = "root.ln.wf01.wt01.hardware";
+ boolean prevEnableUnseqCompaction;
+
@Before
public void setUp() throws Exception {
EnvironmentUtils.closeStatMonitor();
+ prevEnableUnseqCompaction = IoTDBDescriptor.getInstance().getConfig().isEnableUnseqCompaction();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(false);
EnvironmentUtils.envSetUp();
Class.forName(Config.JDBC_DRIVER_NAME);
prepareData();
@@ -140,6 +145,7 @@ public class IoTDBFillIT {
@After
public void tearDown() throws Exception {
EnvironmentUtils.cleanEnv();
+ IoTDBDescriptor.getInstance().getConfig().setEnableUnseqCompaction(prevEnableUnseqCompaction);
}
@Test
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
index 022f2cd..327dbfd 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLevelCompactionIT.java
@@ -18,7 +18,6 @@
*/
package org.apache.iotdb.db.integration;
-import static org.apache.iotdb.db.engine.compaction.utils.CompactionLogger.COMPACTION_LOG_NAME;
import static org.junit.Assert.assertEquals;
import java.sql.Connection;
@@ -26,13 +25,10 @@ import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Random;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.compaction.CompactionStrategy;
import org.apache.iotdb.db.utils.EnvironmentUtils;
-import org.apache.iotdb.db.utils.FilePathUtils;
import org.apache.iotdb.jdbc.Config;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
index 9f67377..3c86a2f 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBMergeTest.java
@@ -95,6 +95,11 @@ public class IoTDBMergeTest {
}
statement.execute("FLUSH");
statement.execute("MERGE");
+ try{
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
int cnt;
try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.mergeTest")) {
diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
index c404297..b35afbd 100644
--- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
+++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBSensorUpdateIT.java
@@ -79,9 +79,6 @@ public class IoTDBSensorUpdateIT {
}
assertEquals(1, cnt);
}
-
- // after merge completes
- statement.execute("DELETE FROM root.demo.d1");
}
}
}