You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/12 05:36:19 UTC
[incubator-iotdb] 01/01: Merge branch 'master' into dev_merge
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
commit 4eca66e31e626846d505daefe0a3a1ac2e04cd6f
Merge: ca9e047 1e9449b
Author: 江天 <jt...@163.com>
AuthorDate: Fri Jul 12 13:33:57 2019 +0800
Merge branch 'master' into dev_merge
# Conflicts:
# iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
# iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
# iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
# iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
# iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
# tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
.../Frequently asked questions.md} | 4 +-
docs/Documentation-CHN/OtherMaterial-Examples.md | 83 ++++
docs/Documentation-CHN/OtherMaterial-Reference.md | 31 ++
.../OtherMaterial-ReleaseNotesV0.7.0.md} | 4 +-
docs/Documentation-CHN/QuickStart.md | 290 ++++++++++++
.../UserGuideV0.7.0/1-Overview/1-What is IoTDB.md} | 6 +-
.../UserGuideV0.7.0/1-Overview/2-Architecture.md | 36 ++
.../UserGuideV0.7.0/1-Overview/3-Scenario.md | 78 ++++
.../UserGuideV0.7.0/1-Overview/4-Features.md | 51 +++
.../1-Key Concepts and Terminology.md | 179 ++++++++
.../2-Data Type.md | 34 ++
.../3-Encoding.md | 63 +++
.../4-Compression.md | 28 ++
.../3-Operation Manual/1-Sample Data.md} | 8 +-
.../3-Operation Manual/2-Data Model Selection.md | 118 +++++
.../3-Operation Manual/3-Data Import.md | 85 ++++
.../3-Operation Manual/4-Data Query.md | 508 +++++++++++++++++++++
.../3-Operation Manual/5-Data Maintenance.md | 83 ++++
.../3-Operation Manual/6-Priviledge Management.md | 134 ++++++
.../4-Deployment and Management/1-Deployment.md | 99 ++++
.../4-Deployment and Management/2-Configuration.md | 414 +++++++++++++++++
.../3-System Monitor.md | 152 ++++++
.../4-Performance Monitor.md | 78 ++++
.../4-Deployment and Management/5-System log.md | 64 +++
.../6-Data Management.md | 74 +++
.../7-Build and use IoTDB by Dockerfile.md} | 4 +-
.../1-IoTDB Query Statement.md | 477 +++++++++++++++++++
.../5-IoTDB SQL Documentation/2-Reference.md | 137 ++++++
.../UserGuideV0.7.0/6-JDBC API/1-JDBC API.md | 4 +-
.../UserGuideV0.7.0/7-Tools-Cli.md | 77 ++++
.../UserGuideV0.7.0/7-Tools-Grafana.md | 118 +++++
.../UserGuideV0.7.0/7-Tools-Hadoop.md} | 4 +-
.../UserGuideV0.7.0/7-Tools-spark.md} | 4 +-
.../1-Key Concepts and Terminology.md | 2 +-
.../2-Data Type.md | 2 +-
.../3-Encoding.md | 4 +-
.../4-Compression.md | 2 +-
.../3-Operation Manual/1-Sample Data.md | 4 +-
.../3-Operation Manual/2-Data Model Selection.md | 10 +-
.../3-Operation Manual/3-Data Import.md | 6 +-
.../3-Operation Manual/4-Data Query.md | 12 +-
.../3-Operation Manual/5-Data Maintenance.md | 8 +-
.../3-Operation Manual/6-Priviledge Management.md | 4 +-
.../4-Deployment and Management/2-Configuration.md | 2 +-
.../3-System Monitor.md | 6 +-
.../4-Performance Monitor.md | 79 ++++
.../{4-System log.md => 5-System log.md} | 2 +-
.../{5-Data Management.md => 6-Data Management.md} | 8 +-
...e.md => 7-Build and use IoTDB by Dockerfile.md} | 0
.../UserGuideV0.7.0/6-JDBC API/1-JDBC API.md | 4 +-
.../Documentation/UserGuideV0.7.0/7-Tools-spark.md | 2 +-
iotdb/iotdb/conf/iotdb-engine.properties | 12 +
iotdb/iotdb/conf/logback.xml | 20 +
iotdb/pom.xml | 19 +
.../org/apache/iotdb/db/concurrent/ThreadName.java | 3 +-
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 39 ++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 10 +
.../directories/strategy/DirectoryStrategy.java | 31 +-
.../strategy/MaxDiskUsableSpaceFirstStrategy.java | 8 +-
.../MinFolderOccupiedSpaceFirstStrategy.java | 35 +-
.../directories/strategy/SequenceStrategy.java | 12 +-
.../db/cost/statistic/ConcurrentCircularArray.java | 69 +++
.../iotdb/db/cost/statistic/Measurement.java | 428 +++++++++++++++++
.../iotdb/db/cost/statistic/MeasurementMBean.java | 62 +++
.../statistic/Operation.java} | 28 +-
.../apache/iotdb/db/engine/merge/MergeManager.java | 5 +-
.../iotdb/db/engine/merge/RecoverMergeTask.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 11 +-
.../db/engine/storagegroup/TsFileProcessor.java | 6 +-
.../db/engine/storagegroup/TsFileResource.java | 22 +
.../org/apache/iotdb/db/qp/QueryProcessor.java | 3 +-
.../qp/executor/AbstractQueryProcessExecutor.java | 13 -
.../db/qp/executor/IQueryProcessExecutor.java | 4 -
.../iotdb/db/qp/strategy/LogicalGenerator.java | 6 +-
.../org/apache/iotdb/db/rescon/MemTablePool.java | 2 +-
.../java/org/apache/iotdb/db/service/IoTDB.java | 2 +
.../org/apache/iotdb/db/service/ServiceType.java | 3 +-
.../org/apache/iotdb/db/service/TSServiceImpl.java | 35 +-
.../org/apache/iotdb/db/utils/CommonUtils.java | 20 +
.../writelog/recover/TsFileRecoverPerformer.java | 35 +-
.../strategy/DirectoryStrategyTest.java | 177 +++++++
.../db/cost/statistic/PerformanceStatTest.java | 71 +++
.../db/engine/memtable/MemTableFlushTaskTest.java | 6 +-
.../engine/storagegroup/TsFileProcessorTest.java | 84 +++-
.../iotdb/db/qp/plan/LogicalPlanSmallTest.java | 10 +-
.../apache/iotdb/db/qp/plan/PhysicalPlanTest.java | 10 +-
.../apache/iotdb/db/qp/utils/MemIntQpExecutor.java | 4 -
.../db/query/control/FileReaderManagerTest.java | 19 +-
.../apache/iotdb/db/utils/EnvironmentUtils.java | 2 +-
.../java/org/apache/iotdb/jdbc/IoTDBStatement.java | 14 +-
pom.xml | 2 +-
spark/README.md | 2 +-
.../scala/org/apache/iotdb/tsfile/Converter.scala | 24 +-
.../apache/iotdb/tsfile/TsFileOutputWriter.scala | 6 +-
.../apache/iotdb/tsfile/TsFileWriterFactory.scala | 5 +-
.../org/apache/iotdb/tsfile/ConverterTest.scala | 7 +-
tsfile/example/readme.md | 21 +-
.../java/org/apache/iotdb/tsfile/TsFileWrite.java | 11 +-
.../iotdb/tsfile/common/conf/TSFileDescriptor.java | 6 +-
.../{SystemConstant.java => TsFileConstant.java} | 4 +-
.../iotdb/tsfile/file/metadata/ChunkMetaData.java | 26 +-
.../iotdb/tsfile/file/metadata/TsDigest.java | 24 +
.../file/metadata/statistics/Statistics.java | 14 +-
.../iotdb/tsfile/read/TsFileSequenceReader.java | 17 +
.../org/apache/iotdb/tsfile/read/common/Path.java | 30 +-
.../java/org/apache/iotdb/tsfile/utils/Binary.java | 2 +-
.../write/writer/RestorableTsFileIOWriter.java | 17 +-
.../tsfile/file/metadata/utils/TestHelper.java | 5 +-
108 files changed, 4979 insertions(+), 272 deletions(-)
diff --cc iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 79e1c25,fa402d3..fe1f9a2
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@@ -496,35 -502,27 +510,60 @@@ public class IoTDBConfig
this.chunkBufferPoolEnable = chunkBufferPoolEnable;
}
++
+ public long getMergeMemoryBudget() {
+ return mergeMemoryBudget;
+ }
+
+ public void setMergeMemoryBudget(long mergeMemoryBudget) {
+ this.mergeMemoryBudget = mergeMemoryBudget;
+ }
+
+ public int getMergeThreadNum() {
+ return mergeThreadNum;
+ }
+
+ public void setMergeThreadNum(int mergeThreadNum) {
+ this.mergeThreadNum = mergeThreadNum;
+ }
+
+ public boolean isContinueMergeAfterReboot() {
+ return continueMergeAfterReboot;
+ }
+
+ public void setContinueMergeAfterReboot(boolean continueMergeAfterReboot) {
+ this.continueMergeAfterReboot = continueMergeAfterReboot;
+ }
+
+ public long getMergeIntervalSec() {
+ return mergeIntervalSec;
+ }
+
+ public void setMergeIntervalSec(long mergeIntervalSec) {
+ this.mergeIntervalSec = mergeIntervalSec;
+ }
++
+ public boolean isEnablePerformanceStat() {
+ return enablePerformanceStat;
+ }
+
+ public void setEnablePerformanceStat(boolean enablePerformanceStat) {
+ this.enablePerformanceStat = enablePerformanceStat;
+ }
+
+ public long getPerformanceStatDisplayInterval() {
+ return performanceStatDisplayInterval;
+ }
+
+ public void setPerformanceStatDisplayInterval(long performanceStatDisplayInterval) {
+ this.performanceStatDisplayInterval = performanceStatDisplayInterval;
+ }
+
+ public int getPerformance_stat_memory_in_kb() {
+ return performance_stat_memory_in_kb;
+ }
+
+ public void setPerformance_stat_memory_in_kb(int performance_stat_memory_in_kb) {
+ this.performance_stat_memory_in_kb = performance_stat_memory_in_kb;
+ }
}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 589a2f7,4f7e208..b0a2dd2
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@@ -200,15 -200,16 +200,25 @@@ public class IoTDBDescriptor
conf.setZoneID(ZoneId.of(tmpTimeZone.trim()));
logger.info("Time zone has been set to {}", conf.getZoneID());
+ conf.setMergeMemoryBudget(Long.parseLong(properties.getProperty("merge_memory_budget",
+ Long.toString(conf.getMergeMemoryBudget()))));
+ conf.setMergeThreadNum(Integer.parseInt(properties.getProperty("merge_thread_num",
+ Integer.toString(conf.getMergeThreadNum()))));
+ conf.setContinueMergeAfterReboot(Boolean.parseBoolean(properties.getProperty(
+ "continue_merge_after_reboot", Boolean.toString(conf.isContinueMergeAfterReboot()))));
+ conf.setMergeIntervalSec(Long.parseLong(properties.getProperty("merge_interval_sec",
+ Long.toString(conf.getMergeIntervalSec()))));
+
+ conf.setEnablePerformanceStat(Boolean
+ .parseBoolean(properties.getProperty("enable_performance_stat",
+ Boolean.toString(conf.isEnablePerformanceStat())).trim()));
+
+ conf.setPerformanceStatDisplayInterval(Long
+ .parseLong(properties.getProperty("performance_stat_display_interval",
+ Long.toString(conf.getPerformanceStatDisplayInterval())).trim()));
+ conf.setPerformance_stat_memory_in_kb(Integer
+ .parseInt(properties.getProperty("performance_stat_memory_in_kb",
+ Integer.toString(conf.getPerformance_stat_memory_in_kb())).trim()));
} catch (IOException e) {
logger.warn("Cannot load config file because, use default configuration", e);
} catch (Exception e) {
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
index 4e9a949,0000000..e488b72
mode 100644,000000..100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
@@@ -1,105 -1,0 +1,106 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.service.IService;
+import org.apache.iotdb.db.service.ServiceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeManager implements IService {
+
+ private static final Logger logger = LoggerFactory.getLogger(MergeManager.class);
+ private static final MergeManager INSTANCE = new MergeManager();
+
+ private AtomicInteger threadCnt = new AtomicInteger();
+ private ThreadPoolExecutor mergeTaskPool;
+ private ScheduledExecutorService timedMergeThreadPool;
+
+ private MergeManager() {
+ start();
+ }
+
+ public static MergeManager getINSTANCE() {
+ return INSTANCE;
+ }
+
+ public void submit(MergeTask mergeTask) {
+ mergeTaskPool.submit(mergeTask);
+ }
+
+ @Override
+ public void start() {
+ if (mergeTaskPool == null) {
+ int threadNum = IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads();
+ if (threadNum <= 0) {
+ threadNum = 1;
+ }
+ mergeTaskPool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(threadNum,
- r -> new Thread(r, "mergeThread-" + threadCnt.getAndIncrement()));
++ r -> new Thread(r, "MergeThread-" + threadCnt.getAndIncrement()));
+ long mergeInterval = IoTDBDescriptor.getInstance().getConfig().getMergeIntervalSec();
+ if (mergeInterval > 0) {
- timedMergeThreadPool = (ScheduledExecutorService) Executors.newSingleThreadExecutor();
++ timedMergeThreadPool = Executors.newSingleThreadScheduledExecutor( r -> new Thread(r,
++ "TimedMergeThread"));
+ timedMergeThreadPool.scheduleAtFixedRate(this::flushAll, 0,
+ mergeInterval, TimeUnit.SECONDS);
+ }
+ }
+ logger.info("MergeManager started");
+ }
+
+ @Override
+ public void stop() {
+ if (mergeTaskPool != null) {
+ if (timedMergeThreadPool != null) {
+ timedMergeThreadPool.shutdownNow();
+ timedMergeThreadPool = null;
+ }
+ mergeTaskPool.shutdownNow();
+ logger.info("Waiting for task pool to shut down");
+ while (!mergeTaskPool.isShutdown()) {
+ // wait
+ }
+ mergeTaskPool = null;
+ }
+ logger.info("MergeManager stopped");
+ }
+
+ @Override
+ public ServiceType getID() {
+ return ServiceType.MERGE_SERVICE;
+ }
+
+ private void flushAll() {
+ try {
+ StorageEngine.getInstance().mergeAll();
+ } catch (StorageEngineException e) {
+ logger.error("Cannot perform a global merge because", e);
+ }
+ }
+}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
index 0132308,0000000..0cfe876
mode 100644,000000..100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@@ -1,330 -1,0 +1,330 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.MetadataErrorException;
+import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RecoverMergeTask extends MergeTask {
+
+ private static final Logger logger = LoggerFactory.getLogger(RecoverMergeTask.class);
+
+ private String currLine;
+ private List<TsFileResource> mergeSeqFiles = new ArrayList<>();
+ private List<TsFileResource> mergeUnseqFiles = new ArrayList<>();
+
+ private Map<File, Long> fileLastPositions = new HashMap<>();
+ private Map<File, Long> tempFileLastPositions = new HashMap<>();
+
+ private List<Path> unmergedPaths;
+ private List<Path> mergedPaths = new ArrayList<>();
+ private List<TsFileResource> unmergedFiles;
+
+ public RecoverMergeTask(
+ List<TsFileResource> allSeqFiles,
+ List<TsFileResource> allUnseqFiles,
+ String storageGroupDir, MergeCallback callback, String taskName) throws IOException {
+ super(allSeqFiles, allUnseqFiles, storageGroupDir, callback, taskName);
+ }
+
+ public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
+ File logFile = new File(storageGroupDir, MergeLogger.MERGE_LOG_NAME);
+ if (!logFile.exists()) {
+ logger.info("{} no merge.log, merge recovery ends", taskName);
+ return;
+ }
+ mergeSeqFiles = new ArrayList<>();
+ mergeUnseqFiles = new ArrayList<>();
+ long startTime = System.currentTimeMillis();
+
+ Status status = determineStatus(logFile);
+ if (logger.isInfoEnabled()) {
+ logger.info("{} merge recovery status determined: {} after {}ms", taskName, status,
+ (System.currentTimeMillis() - startTime));
+ }
+ switch (status) {
+ case NONE:
+ logFile.delete();
+ break;
+ case FILES_LOGGED:
+ if (continueMerge) {
+ mergeLogger = new MergeLogger(storageGroupDir);
+ truncateFiles();
+ recoverChunkCounts();
+ mergeSeries(unmergedPaths);
+
+ mergeFiles(seqFiles);
+ }
+ cleanUp(continueMerge);
+ break;
+ case ALL_TS_MERGED:
+ if (continueMerge) {
+ mergeLogger = new MergeLogger(storageGroupDir);
+ truncateFiles();
+ recoverChunkCounts();
+ mergeFiles(unmergedFiles);
+ } else {
+ // NOTICE: although some of the seqFiles may have been truncated, later TsFile recovery
+ // will recover them, so they are not a concern here
+ truncateFiles();
+ }
+ cleanUp(continueMerge);
+ break;
+ case MERGE_END:
+ cleanUp(continueMerge);
+ break;
+ }
+ if (logger.isInfoEnabled()) {
+ logger.info("{} merge recovery ends after", taskName, (System.currentTimeMillis() - startTime));
+ }
+ }
+
+
+ // scan metadata to compute how many chunks are merged/unmerged so at last we can decide to
+ // move the merged chunks or the unmerged chunks
+ private void recoverChunkCounts() throws IOException {
+ logger.debug("{} recovering chunk counts", taskName);
+ for (TsFileResource tsFileResource : seqFiles) {
+ RestorableTsFileIOWriter mergeFileWriter = getMergeFileWriter(tsFileResource);
+ mergeFileWriter.makeMetadataVisible();
+ unmergedChunkStartTimes.put(tsFileResource, new HashMap<>());
+ for(Path path : mergedPaths) {
+ recoverChunkCounts(path, tsFileResource, mergeFileWriter);
+ }
+ }
+ }
+
+ private void recoverChunkCounts(Path path, TsFileResource tsFileResource,
+ RestorableTsFileIOWriter mergeFileWriter) throws IOException {
+ unmergedChunkStartTimes.get(tsFileResource).put(path, new ArrayList<>());
+
+ List<ChunkMetaData> seqFileChunks = queryChunkMetadata(path, tsFileResource);
+ List<ChunkMetaData> mergeFileChunks =
- mergeFileWriter.getVisibleMetadatas(path.getDevice(), path.getMeasurement(), null);
++ mergeFileWriter.getVisibleMetadataList(path.getDevice(), path.getMeasurement(), null);
+ mergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? mergeFileChunks.size() :
+ v + mergeFileChunks.size());
+ int seqIndex = 0;
+ int mergeIndex = 0;
+ int unmergedCnt = 0;
+ while (seqIndex < seqFileChunks.size() && mergeIndex < mergeFileChunks.size()) {
+ ChunkMetaData seqChunk = seqFileChunks.get(seqIndex);
+ ChunkMetaData mergedChunk = mergeFileChunks.get(mergeIndex);
+ if (seqChunk.getStartTime() < mergedChunk.getStartTime()) {
+ // this seqChunk is unmerged
+ unmergedCnt ++;
+ seqIndex ++;
+ unmergedChunkStartTimes.get(tsFileResource).get(path).add(seqChunk.getStartTime());
+ } else if (mergedChunk.getStartTime() <= seqChunk.getStartTime() &&
+ seqChunk.getStartTime() <= mergedChunk.getEndTime()) {
+ // this seqChunk is merged
+ seqIndex ++;
+ } else {
+ // seqChunk.startTime > mergeChunk.endTime, find next mergedChunk that may cover the
+ // seqChunk
+ mergeIndex ++;
+ }
+ }
+ int finalUnmergedCnt = unmergedCnt;
+ unmergedChunkCnt.compute(tsFileResource, (k, v) -> v == null ? finalUnmergedCnt :
+ v + finalUnmergedCnt);
+ }
+
+ private void truncateFiles() throws IOException {
+ logger.debug("{} truncating {} files", taskName, fileLastPositions.size());
+ for (Entry<File, Long> entry : fileLastPositions.entrySet()) {
+ File file = entry.getKey();
+ Long lastPosition = entry.getValue();
+ if (file.exists() && file.length() != lastPosition) {
+ try (FileInputStream fileInputStream = new FileInputStream(file)) {
+ FileChannel channel = fileInputStream.getChannel();
+ channel.truncate(lastPosition);
+ channel.close();
+ }
+ }
+ }
+ }
+
+ private Status determineStatus(File logFile) throws IOException, MetadataErrorException {
+ Status status = Status.NONE;
+ try (BufferedReader bufferedReader =
+ new BufferedReader(new FileReader(logFile))) {
+ currLine = bufferedReader.readLine();
+ if (currLine != null) {
+ if (currLine.equals(MergeLogger.STR_SEQ_FILES)) {
+ analyzeSeqFiles(bufferedReader);
+ }
+ if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
+ analyzeUnseqFiles(bufferedReader);
+ }
+ if (currLine.equals(MergeLogger.STR_MERGE_START)) {
+ status = Status.FILES_LOGGED;
+ seqFiles = mergeSeqFiles;
+ unseqFiles = mergeUnseqFiles;
+ for (TsFileResource seqFile : seqFiles) {
+ File mergeFile = new File(seqFile.getFile().getPath() + MergeTask.MERGE_SUFFIX);
+ fileLastPositions.put(mergeFile, 0L);
+ }
+ unmergedPaths = collectPathsInUnseqFiles();
+ analyzeMergedSeries(bufferedReader, unmergedPaths);
+ }
+ if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
+ status = Status.ALL_TS_MERGED;
+ unmergedFiles = seqFiles;
+ analyzeMergedFiles(bufferedReader);
+ }
+ if (currLine.equals(MergeLogger.STR_MERGE_END)) {
+ status = Status.MERGE_END;
+ }
+ }
+ }
+ return status;
+ }
+
+ private void analyzeSeqFiles(BufferedReader bufferedReader) throws IOException {
+ long startTime = System.currentTimeMillis();
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_UNSEQ_FILES)) {
+ break;
+ }
+ Iterator<TsFileResource> iterator = seqFiles.iterator();
+ while (iterator.hasNext()) {
+ TsFileResource seqFile = iterator.next();
+ if (seqFile.getFile().getAbsolutePath().equals(currLine)) {
+ mergeSeqFiles.add(seqFile);
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} found {} seq files after {}ms", taskName, mergeSeqFiles.size(),
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void analyzeUnseqFiles(BufferedReader bufferedReader) throws IOException {
+ long startTime = System.currentTimeMillis();
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_MERGE_START)) {
+ break;
+ }
+ Iterator<TsFileResource> iterator = unseqFiles.iterator();
+ while (iterator.hasNext()) {
+ TsFileResource unseqFile = iterator.next();
+ if (unseqFile.getFile().getAbsolutePath().equals(currLine)) {
+ mergeUnseqFiles.add(unseqFile);
+ iterator.remove();
+ break;
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} found {} seq files after {}ms", taskName, mergeUnseqFiles.size(),
+ (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void analyzeMergedSeries(BufferedReader bufferedReader, List<Path> unmergedPaths) throws IOException {
+ Path currTS = null;
+ long startTime = System.currentTimeMillis();
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_ALL_TS_END)) {
+ break;
+ }
+ if (currLine.contains(MergeLogger.STR_START)) {
+ // a TS starts to merge
+ String[] splits = currLine.split(" ");
+ currTS = new Path(splits[0]);
+ tempFileLastPositions.clear();
+ } else if (!currLine.contains(MergeLogger.STR_END)) {
+ // file position
+ String[] splits = currLine.split(" ");
+ File file = new File(splits[0]);
+ Long position = Long.parseLong(splits[1]);
+ tempFileLastPositions.put(file, position);
+ } else {
+ // a TS ends merging
+ unmergedPaths.remove(currTS);
+ for (Entry<File, Long> entry : tempFileLastPositions.entrySet()) {
+ fileLastPositions.put(entry.getKey(), entry.getValue());
+ }
+ mergedPaths.add(currTS);
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} found {} series have already been merged after {}ms", taskName,
+ mergeSeqFiles.size(), (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ private void analyzeMergedFiles(BufferedReader bufferedReader) throws IOException {
+ File currFile = null;
+ long startTime = System.currentTimeMillis();
+ int mergedCnt = 0;
+ while ((currLine = bufferedReader.readLine()) != null) {
+ if (currLine.equals(MergeLogger.STR_MERGE_END)) {
+ break;
+ }
+ if (currLine.contains(MergeLogger.STR_START)) {
+ String[] splits = currLine.split(" ");
+ currFile = new File(splits[0]);
+ Long lastPost = Long.parseLong(splits[2]);
+ fileLastPositions.put(currFile, lastPost);
+ } else if (currLine.contains(MergeLogger.STR_END)) {
+ fileLastPositions.remove(currFile);
+ String seqFilePath = currFile.getAbsolutePath().replace(MergeTask.MERGE_SUFFIX, "");
+ Iterator<TsFileResource> unmergedFileIter = unmergedFiles.iterator();
+ while (unmergedFileIter.hasNext()) {
+ TsFileResource seqFile = unmergedFileIter.next();
+ if (seqFile.getFile().getAbsolutePath().equals(seqFilePath)) {
+ mergedCnt ++;
+ unmergedFileIter.remove();
+ break;
+ }
+ }
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("{} found {} files have already been merged after {}ms", taskName,
+ mergedCnt, (System.currentTimeMillis() - startTime));
+ }
+ }
+
+ enum Status {
+ NONE, FILES_LOGGED, ALL_TS_MERGED, MERGE_END
+ }
+}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 0aac9a7,a3d0e9d..9a38315
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@@ -18,9 -18,7 +18,9 @@@
*/
package org.apache.iotdb.db.engine.storagegroup;
+import static org.apache.iotdb.db.engine.merge.MergeTask.MERGE_SUFFIX;
+import static org.apache.iotdb.db.engine.storagegroup.TsFileResource.TEMP_SUFFIX;
- import static org.apache.iotdb.tsfile.common.constant.SystemConstant.TSFILE_SUFFIX;
+ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFFIX;
import java.io.File;
import java.io.IOException;
@@@ -38,22 -36,16 +38,22 @@@ import org.apache.commons.io.FileUtils
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.MergeFileSelector;
- import org.apache.iotdb.db.engine.merge.MergeTask;
+import org.apache.iotdb.db.engine.merge.MergeManager;
++import org.apache.iotdb.db.engine.merge.MergeTask;
+import org.apache.iotdb.db.engine.merge.RecoverMergeTask;
import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.StorageGroupProcessorException;
+import org.apache.iotdb.db.exception.MergeException;
+import org.apache.iotdb.db.exception.MetadataErrorException;
- import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.ProcessorException;
++import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
@@@ -564,8 -516,6 +564,7 @@@ public class StorageGroupProcessor
public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
// TODO: how to avoid partial deletion?
writeLock();
+ mergeLock.writeLock().lock();
- mergeLock.writeLock().lock();
// record files which are updated so that we can roll back them in case of exception
List<ModificationFile> updatedModFiles = new ArrayList<>();
diff --cc iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index ad922d9,fae36f2..df47afd
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@@ -30,9 -30,6 +30,10 @@@ import java.util.HashMap
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
++import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
@@@ -233,23 -228,4 +238,40 @@@ public class TsFileResource
endTimeMap.put(deviceId, time);
}
}
+
+ public ReentrantReadWriteLock getMergeQueryLock() {
+ return mergeQueryLock;
+ }
+
+ public void removeModFile() throws IOException {
+ getModFile().remove();
+ modFile = null;
+ }
+
+ public void remove() {
+ file.delete();
+ new File(file.getPath() + RESOURCE_SUFFIX).delete();
+ }
+
+ @Override
+ public String toString() {
+ return file.toString();
+ }
++
++ @Override
++ public boolean equals(Object o) {
++ if (this == o) {
++ return true;
++ }
++ if (o == null || getClass() != o.getClass()) {
++ return false;
++ }
++ TsFileResource that = (TsFileResource) o;
++ return Objects.equals(file, that.file);
++ }
++
++ @Override
++ public int hashCode() {
++ return Objects.hash(file);
++ }
}
diff --cc iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
index ba48c4b,4139d8e..f87cf6c
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/IoTDB.java
@@@ -21,8 -21,8 +21,9 @@@ package org.apache.iotdb.db.service
import org.apache.iotdb.db.concurrent.IoTDBDefaultThreadExceptionHandler;
import org.apache.iotdb.db.conf.IoTDBConstant;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+ import org.apache.iotdb.db.cost.statistic.Measurement;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.MergeManager;
import org.apache.iotdb.db.exception.StartupException;
import org.apache.iotdb.db.exception.builder.ExceptionBuilder;
import org.apache.iotdb.db.monitor.StatMonitor;
@@@ -92,9 -92,9 +93,10 @@@ public class IoTDB implements IoTDBMBea
registerManager.register(JDBCService.getInstance());
registerManager.register(Monitor.getInstance());
registerManager.register(StatMonitor.getInstance());
+ registerManager.register(Measurement.INSTANCE);
registerManager.register(SyncServerManager.getInstance());
registerManager.register(TVListAllocator.getInstance());
+ registerManager.register(MergeManager.getINSTANCE());
JMXService.registerMBean(getInstance(), mbeanName);
diff --cc iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
index 44a1064,4f872fd..1da0071
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/ServiceType.java
@@@ -31,8 -31,8 +31,9 @@@ public enum ServiceType
AUTHORIZATION_SERVICE("Authorization ServerService", ""),
FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
SYNC_SERVICE("SYNC ServerService", ""),
- PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE"),
- TVLIST_ALLOCATOR_SERVICE("TVList Allocator", "");
+ TVLIST_ALLOCATOR_SERVICE("TVList Allocator", ""),
- MERGE_SERVICE("Merge Manager", "");
++ MERGE_SERVICE("Merge Manager", ""),
++ PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE","PERFORMANCE_STATISTIC_SERVICE");
private String name;
private String jmxName;
diff --cc iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
index 5bcc436,970489d..227f2ab
--- a/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
+++ b/iotdb/src/test/java/org/apache/iotdb/db/query/control/FileReaderManagerTest.java
@@@ -58,9 -57,9 +58,12 @@@ public class FileReaderManagerTest
FileReaderManager manager = FileReaderManager.getInstance();
JobFileManager testManager = new JobFileManager();
++ TsFileResource[] tsFileResources = new TsFileResource[MAX_FILE_SIZE + 1];
++
for (int i = 1; i <= MAX_FILE_SIZE; i++) {
File file = new File(filePath + i);
file.createNewFile();
++ tsFileResources[i] = new TsFileResource(file);
}
Thread t1 = new Thread(() -> {
@@@ -68,12 -67,11 +71,16 @@@
testManager.addJobId(1L);
for (int i = 1; i <= 6; i++) {
- TsFileResource tsFile = new TsFileResource(new File(filePath + i));
- testManager.addFilePathToMap(1L, filePath + i,
++ TsFileResource tsFile = tsFileResources[i];
+ testManager.addFilePathToMap(1L, tsFile,
false);
- manager.get(filePath + i, false);
- Assert.assertTrue(manager.contains(filePath + i, false));
+ manager.get(tsFile, false);
+ Assert.assertTrue(manager.contains(tsFile, false));
+ }
++ for (int i = 1; i <= 6; i++) {
++ TsFileResource tsFile = tsFileResources[i];
++ manager.decreaseFileReaderReference(tsFile, false);
+ }
} catch (IOException e) {
e.printStackTrace();
@@@ -87,12 -85,11 +94,16 @@@
testManager.addJobId(2L);
for (int i = 4; i <= MAX_FILE_SIZE; i++) {
- TsFileResource tsFile = new TsFileResource(new File(filePath + i));
- testManager.addFilePathToMap(2L, filePath + i,
++ TsFileResource tsFile = tsFileResources[i];
+ testManager.addFilePathToMap(2L, tsFile,
false);
- manager.get(filePath + i, false);
- Assert.assertTrue(manager.contains(filePath + i, false));
+ manager.get(tsFile, false);
+ Assert.assertTrue(manager.contains(tsFile, false));
+ }
++ for (int i = 4; i <= MAX_FILE_SIZE; i++) {
++ TsFileResource tsFile = tsFileResources[i];
++ manager.decreaseFileReaderReference(tsFile, false);
+ }
} catch (IOException e) {
e.printStackTrace();
@@@ -105,14 -102,12 +116,10 @@@
t2.join();
for (int i = 1; i <= MAX_FILE_SIZE; i++) {
- Assert.assertTrue(manager.contains(filePath + i, false));
+ TsFileResource tsFile = new TsFileResource(new File(filePath + i));
+ Assert.assertTrue(manager.contains(tsFile, false));
}
-- for (int i = 1; i <= MAX_FILE_SIZE; i++) {
- TsFileResource tsFile = new TsFileResource(new File(filePath + i));
- manager.decreaseFileReaderReference(tsFile, true);
- manager.decreaseFileReaderReference(filePath + i, true);
-- }
// the code below is not valid because the cacheFileReaderClearPeriod config in this class is not valid
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
index 8ba77e4,8dc058c..d1ddaa0
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/TsFileSequenceReader.java
@@@ -567,9 -568,17 +575,18 @@@ public class TsFileSequenceReader imple
currentChunk = new ChunkMetaData(measurementID, dataType, fileOffsetOfChunk,
startTimeOfChunk, endTimeOfChunk);
currentChunk.setNumOfPoints(numOfPoints);
+ Map<String, ByteBuffer> statisticsMap = new HashMap<>();
+ statisticsMap.put(StatisticConstant.MAX_VALUE, ByteBuffer.wrap(chunkStatistics.getMaxBytes()));
+ statisticsMap.put(StatisticConstant.MIN_VALUE, ByteBuffer.wrap(chunkStatistics.getMinBytes()));
+ statisticsMap.put(StatisticConstant.FIRST, ByteBuffer.wrap(chunkStatistics.getFirstBytes()));
+ statisticsMap.put(StatisticConstant.SUM, ByteBuffer.wrap(chunkStatistics.getSumBytes()));
+ statisticsMap.put(StatisticConstant.LAST, ByteBuffer.wrap(chunkStatistics.getLastBytes()));
+ TsDigest tsDigest = new TsDigest();
+ tsDigest.setStatistics(statisticsMap);
+ currentChunk.setDigest(tsDigest);
chunks.add(currentChunk);
numOfPoints = 0;
+ chunkCnt ++;
break;
case MetaMarker.CHUNK_GROUP_FOOTER:
//this is a chunk group
diff --cc tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
index 2c2399a,d5bb0e0..c08404d
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/write/writer/RestorableTsFileIOWriter.java
@@@ -128,8 -124,8 +127,8 @@@ public class RestorableTsFileIOWriter e
// filter: if a device'sensor is defined as float type, and data has been persistent.
// Then someone deletes the timeseries and recreate it with Int type. We have to ignore
// all the stale data.
- if (dataType.equals(chunkMetaData.getTsDataType())) {
+ if (dataType == null || dataType.equals(chunkMetaData.getTsDataType())) {
- chunkMetaDatas.add(chunkMetaData);
+ chunkMetaDataList.add(chunkMetaData);
}
}
}