You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/16 03:26:11 UTC
[incubator-iotdb] branch dev_merge updated: add
mergeChunkNumberThreshold
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new 0e2b10d add mergeChunkNumberThreshold
0e2b10d is described below
commit 0e2b10d958e5b4d4a26a1e815a081ad305ca3209
Author: 江天 <jt...@163.com>
AuthorDate: Tue Jul 16 11:23:48 2019 +0800
add mergeChunkNumberThreshold
---
iotdb/iotdb/conf/iotdb-engine.properties | 18 +++++++----
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 35 ++++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 2 ++
.../apache/iotdb/db/engine/merge/MergeTask.java | 28 +++++++++--------
.../iotdb/db/engine/merge/MergePerfTest.java | 21 ++++---------
.../iotdb/db/engine/merge/MergeTaskTest.java | 23 ++++++++++++++
.../apache/iotdb/db/engine/merge/MergeTest.java | 13 ++++++--
.../tsfile/file/metadata/ChunkGroupMetaData.java | 2 +-
.../iotdb/tsfile/write/writer/TsFileIOWriter.java | 2 +-
9 files changed, 106 insertions(+), 38 deletions(-)
diff --git a/iotdb/iotdb/conf/iotdb-engine.properties b/iotdb/iotdb/conf/iotdb-engine.properties
index c25bcc1..924e71e 100644
--- a/iotdb/iotdb/conf/iotdb-engine.properties
+++ b/iotdb/iotdb/conf/iotdb-engine.properties
@@ -146,21 +146,27 @@ merge_thread_num=1
# merge_memory_budget=2147483648
# When set to true, if some crashed merges are detected during system rebooting, such merges will
-# be continued, otherwise, the unfinished part of such merges will not be restarted while the
-# finished part still remains as it is.
+# be continued, otherwise, the unfinished parts of such merges will not be continued while the
+# finished parts still remains as they are.
# If you are feeling the rebooting is too slow, set this to false, true by default
continue_merge_after_reboot=true
# A global merge will be performed each such interval, that is, each storage group will be merged
-# (if proper merge candidates can be found). Unit: second, default: 2hours.
+# (if proper merge candidates can be found). Unit: second, default: 1hours.
# When less than or equal to 0, timed merge is disabled.
-merge_interval_sec=7200
+merge_interval_sec=3600
# When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how
-# much they are overflowed). This may increase merge overhead significantly, do not open until
-# you need full merge urgently.
+# much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
+# are overflowed.
force_full_merge=false
+# During a merge, if a chunk with less number of chunks than this parameter, the chunk will be
+# merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
+# this threshold and the new chunk will be flushed.
+# When less than 0, this mechanism is disabled.
+chunk_merge_point_threshold=4096
+
####################
### Statistics Monitor configuration
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 1408dd1..150848b 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -194,16 +194,43 @@ public class IoTDBConfig {
*/
private boolean chunkBufferPoolEnable = false;
+ /**
+ * How much memory (in byte) can be used by a single merge task.
+ */
private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
+ /**
+ * How many thread will be set up to perform merges.
+ */
private int mergeThreadNum = 1;
+ /**
+ * When set to true, if some crashed merges are detected during system rebooting, such merges will
+ * be continued, otherwise, the unfinished parts of such merges will not be continued while the
+ * finished parts still remain as they are.
+ */
private boolean continueMergeAfterReboot = true;
+ /**
+ * A global merge will be performed each such interval, that is, each storage group will be merged
+ * (if proper merge candidates can be found). Unit: second.
+ */
private long mergeIntervalSec = 2 * 3600L;
+ /**
+ * When set to true, all merges becomes full merge (the whole SeqFiles are re-written despite how
+ * much they are overflowed). This may increase merge overhead depending on how much the SeqFiles
+ * are overflowed.
+ */
private boolean forceFullMerge = false;
+ /**
+ * During a merge, if a chunk with less number of chunks than this parameter, the chunk will be
+ * merged with its succeeding chunks even if it is not overflowed, until the merged chunks reach
+ * this threshold and the new chunk will be flushed.
+ */
+ private int chunkMergePointThreshold = 512;
+
public IoTDBConfig() {
// empty constructor
}
@@ -576,4 +603,12 @@ public class IoTDBConfig {
public void setForceFullMerge(boolean forceFullMerge) {
this.forceFullMerge = forceFullMerge;
}
+
+ public int getChunkMergePointThreshold() {
+ return chunkMergePointThreshold;
+ }
+
+ public void setChunkMergePointThreshold(int chunkMergePointThreshold) {
+ this.chunkMergePointThreshold = chunkMergePointThreshold;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 568f3e1..49cbaba 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -210,6 +210,8 @@ public class IoTDBDescriptor {
Long.toString(conf.getMergeIntervalSec()))));
conf.setForceFullMerge(Boolean.parseBoolean(properties.getProperty("force_full_merge",
Boolean.toString(conf.isForceFullMerge()))));
+ conf.setChunkMergePointThreshold(Integer.parseInt(properties.getProperty(
+ "chunk_merge_point_threshold", Integer.toString(conf.getChunkMergePointThreshold()))));
conf.setEnablePerformanceStat(Boolean
.parseBoolean(properties.getProperty("enable_performance_stat",
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index ab83d2f..ad597aa 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -38,12 +38,14 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.factory.SeriesReaderFactoryImpl;
import org.apache.iotdb.db.query.reader.IPointReader;
import org.apache.iotdb.db.utils.TimeValuePair;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
import org.apache.iotdb.tsfile.exception.write.TsFileNotCompleteException;
import org.apache.iotdb.tsfile.file.metadata.ChunkGroupMetaData;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@ -74,7 +76,7 @@ import org.slf4j.LoggerFactory;
*/
public class MergeTask implements Callable<Void> {
- private static final int MIN_CHUNK_POINT_NUM = 4*1024*1024;
+ private static int minChunkPointNum = IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
public static final String MERGE_SUFFIX = ".merge";
private static final Logger logger = LoggerFactory.getLogger(MergeTask.class);
@@ -167,16 +169,16 @@ public class MergeTask implements Callable<Void> {
int unmergedChunkNum = unmergedChunkCnt.getOrDefault(seqFile, 0);
if (mergedChunkNum >= unmergedChunkNum) {
// move the unmerged data to the new file
- if (logger.isDebugEnabled()) {
- logger.debug("{} moving unmerged data of {} to the merged file", taskName,
- seqFile.getFile().getName());
+ if (logger.isInfoEnabled()) {
+ logger.info("{} moving unmerged data of {} to the merged file, {} merged chunks, {} "
+ + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum);
}
moveUnmergedToNew(seqFile);
} else {
// move the merged data to the old file
- if (logger.isDebugEnabled()) {
- logger.debug("{} moving merged data of {} to the old file", taskName,
- seqFile.getFile().getName());
+ if (logger.isInfoEnabled()) {
+ logger.info("{} moving merged data of {} to the old file {} merged chunks, {} "
+ + "unmerged chunks", taskName, seqFile.getFile().getName(), mergedChunkNum, unmergedChunkNum);
}
moveMergedToOld(seqFile);
}
@@ -401,7 +403,7 @@ public class MergeTask implements Callable<Void> {
currTimeValuePair = unseqReader.next();
}
for (int i = 0; i < seqFiles.size(); i++) {
- mergeOneFile(path, i, unseqReader, schema);
+ pathMergeOneFile(path, i, unseqReader, schema);
}
} catch (IOException e) {
logger.error("Cannot read unseq data of {} during merge", path, e);
@@ -414,7 +416,7 @@ public class MergeTask implements Callable<Void> {
}
}
- private void mergeOneFile(Path path, int seqFileIdx, IPointReader unseqReader,
+ private void pathMergeOneFile(Path path, int seqFileIdx, IPointReader unseqReader,
MeasurementSchema measurementSchema)
throws IOException {
TsFileResource currTsFile = seqFiles.get(seqFileIdx);
@@ -480,13 +482,13 @@ public class MergeTask implements Callable<Void> {
unmergedChunkStartTimes.get(currFile).get(path).add(currMeta.getStartTime());
}
- if (ptWritten >= MIN_CHUNK_POINT_NUM) {
+ if (minChunkPointNum >= 0 && ptWritten >= minChunkPointNum || ptWritten > 0 && minChunkPointNum < 0) {
// the new chunk's size is large enough and it should be flushed
chunkWriter.writeToFileWriter(mergeFileWriter);
ptWritten = 0;
}
}
- if (ptWritten >= 0) {
+ if (ptWritten > 0) {
// the last merged chunk may still be smaller than the threshold, flush it anyway
chunkWriter.writeToFileWriter(mergeFileWriter);
}
@@ -509,7 +511,7 @@ public class MergeTask implements Callable<Void> {
// a small chunk has been written, this chunk merge with it to create a larger chunk
// or this chunk is too small and it is not the last chunk, merge it with the next chunk
boolean chunkTooSmall =
- ptWritten > 0 || (currMeta.getNumOfPoints() < MIN_CHUNK_POINT_NUM && !isLastChunk);
+ ptWritten > 0 || (minChunkPointNum >= 0 && currMeta.getNumOfPoints() < minChunkPointNum && !isLastChunk);
boolean chunkModified = currMeta.getDeletedAt() > Long.MIN_VALUE;
int newPtWritten = ptWritten;
@@ -662,6 +664,6 @@ public class MergeTask implements Callable<Void> {
private IChunkWriter getChunkWriter(MeasurementSchema measurementSchema) {
return new ChunkWriterImpl(measurementSchema,
- new ChunkBuffer(measurementSchema), MIN_CHUNK_POINT_NUM);
+ new ChunkBuffer(measurementSchema), TSFileConfig.pageCheckSizeThreshold);
}
}
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
index 8f1bc59..1909a98 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergePerfTest.java
@@ -24,14 +24,12 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
-import org.junit.Before;
public class MergePerfTest extends MergeTaskTest{
- private int seqFileNum;
- private int unseqFileNum;
private double unseqRatio;
private Random random = new Random(System.currentTimeMillis());
@@ -39,16 +37,8 @@ public class MergePerfTest extends MergeTaskTest{
private long timeConsumption;
private boolean fullMerge;
- @Before
@Override
- public void setUp() throws IOException, WriteProcessException {
- tempSGDir = new File("tempSG");
- tempSGDir.mkdirs();
- prepareSeries();
- prepareFiles();
- }
-
- private void prepareFiles() throws IOException, WriteProcessException {
+ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
File file = new File(i + "seq.tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
@@ -59,7 +49,7 @@ public class MergePerfTest extends MergeTaskTest{
long unseqLength = (long) (timeRange * unseqRatio);
for (int i = 0; i < unseqFileNum; i++) {
long unseqOffset = (long) ((1.0 - unseqRatio) * random.nextDouble() * timeRange);
- System.out.println(unseqOffset + " " + unseqLength);
+ //System.out.println(unseqOffset + " " + unseqLength);
File file = new File(i + "unseq.tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
unseqResources.add(tsFileResource);
@@ -79,6 +69,7 @@ public class MergePerfTest extends MergeTaskTest{
}
public static void main(String[] args) throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
List<Long> timeConsumptions = new ArrayList<>();
MergePerfTest perfTest = new MergePerfTest();
@@ -90,7 +81,7 @@ public class MergePerfTest extends MergeTaskTest{
perfTest.unseqRatio = 0.2;
perfTest.ptNum = 10000;
perfTest.flushInterval = 1000;
- perfTest.fullMerge = true;
+ perfTest.fullMerge = false;
for (int i = 0; i < 3; i++) {
// cache warm-up
@@ -117,7 +108,7 @@ public class MergePerfTest extends MergeTaskTest{
// }
// double[] doubleParameters = new double[10];
// for (int i = 1; i <= 10; i++) {
-// doubleParameters[i-1] = 0.1 * i;
+// doubleParameters[i-1] = 0.01 * i;
// }
// for (double param : doubleParameters) {
// perfTest.unseqRatio = param;
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
index d9b3377..fa10f1e 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTaskTest.java
@@ -25,6 +25,7 @@ import java.io.File;
import java.io.IOException;
import java.util.Collections;
import org.apache.commons.io.FileUtils;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.query.context.QueryContext;
import org.apache.iotdb.db.query.reader.sequence.SequenceSeriesReader;
@@ -96,6 +97,28 @@ public class MergeTaskTest extends MergeTest {
}
@Test
+ public void testChunkNumThreshold() throws Exception {
+ IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(Integer.MAX_VALUE);
+ MergeTask mergeTask =
+ new MergeTask(seqResources, unseqResources, tempSGDir.getPath(), (k, v, l) -> {}, "test",
+ false);
+ mergeTask.call();
+
+ QueryContext context = new QueryContext();
+ Path path = new Path(deviceIds[0], measurementSchemas[0].getMeasurementId());
+ SequenceSeriesReader tsFilesReader = new SequenceSeriesReader(path,
+ Collections.singletonList(seqResources.get(0)),
+ null, context);
+ while (tsFilesReader.hasNext()) {
+ BatchData batchData = tsFilesReader.nextBatch();
+ for (int i = 0; i < batchData.length(); i++) {
+ assertEquals(batchData.getTimeByIndex(i) + 20000.0, batchData.getDoubleByIndex(i), 0.001);
+ }
+ }
+ tsFilesReader.close();
+ }
+
+ @Test
public void testPartialMerge1() throws Exception {
MergeTask mergeTask =
new MergeTask(seqResources, unseqResources.subList(0, 1), tempSGDir.getPath(),
diff --git a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
index 8972d9f..4f5fe00 100644
--- a/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/engine/merge/MergeTest.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
import org.apache.iotdb.db.query.control.FileReaderManager;
import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
@@ -38,6 +39,8 @@ import org.junit.Before;
abstract class MergeTest {
+ int seqFileNum = 5;
+ int unseqFileNum = 5;
int measurementNum = 10;
int deviceNum = 10;
long ptNum = 100;
@@ -49,10 +52,15 @@ abstract class MergeTest {
List<TsFileResource> seqResources = new ArrayList<>();
List<TsFileResource> unseqResources = new ArrayList<>();
+ private int prevMergeChunkThreshold;
+
@Before
public void setUp() throws IOException, WriteProcessException {
+ prevMergeChunkThreshold =
+ IoTDBDescriptor.getInstance().getConfig().getChunkMergePointThreshold();
+ IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(-1);
prepareSeries();
- prepareFiles(5, 5);
+ prepareFiles(seqFileNum, unseqFileNum);
}
@After
@@ -60,6 +68,7 @@ abstract class MergeTest {
removeFiles();
seqResources.clear();
unseqResources.clear();
+ IoTDBDescriptor.getInstance().getConfig().setChunkMergePointThreshold(prevMergeChunkThreshold);
}
void prepareSeries() {
@@ -74,7 +83,7 @@ abstract class MergeTest {
}
}
- private void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
+ void prepareFiles(int seqFileNum, int unseqFileNum) throws IOException, WriteProcessException {
for (int i = 0; i < seqFileNum; i++) {
File file = new File(i + "seq.tsfile");
TsFileResource tsFileResource = new TsFileResource(file);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
index 01cc0ab..dc17641 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/ChunkGroupMetaData.java
@@ -173,7 +173,7 @@ public class ChunkGroupMetaData {
}
public List<ChunkMetaData> getChunkMetaDataList() {
- return chunkMetaDataList == null ? null : Collections.unmodifiableList(chunkMetaDataList);
+ return chunkMetaDataList;
}
@Override
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
index f4750a3..84e1311 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/TsFileIOWriter.java
@@ -414,7 +414,7 @@ public class TsFileIOWriter {
boolean chunkValid = startTimeIdx < pathChunkStartTimes.size()
&& pathChunkStartTimes.get(startTimeIdx) == chunkMetaData.getStartTime();
if (!chunkValid) {
- chunkGroupMetaDataIterator.remove();
+ chunkMetaDataIterator.remove();
chunkNum--;
invalidChunkNum++;
} else {