You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/11 12:08:08 UTC
[incubator-iotdb] branch dev_merge updated: add merge in
StorageGroupProcessor and StorageEngine
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_merge by this push:
new dae5889 add merge in StorageGroupProcessor and StorageEngine
dae5889 is described below
commit dae5889d0d73f6951d7cbf6f15e99edeacd9821a
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jul 11 20:05:50 2019 +0800
add merge in StorageGroupProcessor and StorageEngine
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 20 +++++
.../org/apache/iotdb/db/engine/StorageEngine.java | 4 +-
.../iotdb/db/engine/merge/MergeCallback.java | 29 +++++++
.../apache/iotdb/db/engine/merge/MergeManager.java | 48 +++++++++++
.../apache/iotdb/db/engine/merge/MergeTask.java | 20 +++--
.../iotdb/db/engine/merge/RecoverMergeTask.java | 7 +-
.../db/engine/modification/ModificationFile.java | 7 ++
.../iotdb/db/engine/storagegroup/FlushManager.java | 2 +-
.../engine/storagegroup/StorageGroupProcessor.java | 98 +++++++++++++++++++++-
.../db/engine/storagegroup/TsFileResource.java | 5 ++
.../org/apache/iotdb/db/service/TSServiceImpl.java | 5 +-
11 files changed, 225 insertions(+), 20 deletions(-)
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e7543b0..badd42e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -180,6 +180,10 @@ public class IoTDBConfig {
*/
private boolean chunkBufferPoolEnable = false;
+ private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
+
+ private int mergeThreadNum = 2;
+
public IoTDBConfig() {
// empty constructor
}
@@ -487,4 +491,20 @@ public class IoTDBConfig {
void setChunkBufferPoolEnable(boolean chunkBufferPoolEnable) {
this.chunkBufferPoolEnable = chunkBufferPoolEnable;
}
+
+ public long getMergeMemoryBudget() {
+ return mergeMemoryBudget;
+ }
+
+ public void setMergeMemoryBudget(long mergeMemoryBudget) {
+ this.mergeMemoryBudget = mergeMemoryBudget;
+ }
+
+ public int getMergeThreadNum() {
+ return mergeThreadNum;
+ }
+
+ public void setMergeThreadNum(int mergeThreadNum) {
+ this.mergeThreadNum = mergeThreadNum;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d27b0a3..ce9d6fb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -307,7 +307,9 @@ public class StorageEngine implements IService {
if (readOnly) {
throw new StorageEngineException("Current system mode is read only, does not support merge");
}
- // TODO
+ for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+ storageGroupProcessor.merge();
+ }
}
/**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeCallback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeCallback.java
new file mode 100644
index 0000000..ad1dcf0
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+@FunctionalInterface
+public interface MergeCallback {
+
+ void call(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles);
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
new file mode 100644
index 0000000..4677e3d
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+public class MergeManager {
+
+ private static final MergeManager INSTANCE = new MergeManager();
+
+ private AtomicInteger threadNum = new AtomicInteger();
+ private ThreadPoolExecutor pool;
+
+ private MergeManager() {
+ pool =
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(
+ IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads(),
+ r -> new Thread(r, "mergeThread-" + threadNum.getAndIncrement()));
+ }
+
+ public static MergeManager getINSTANCE() {
+ return INSTANCE;
+ }
+
+ public void submit(MergeTask mergeTask) {
+ pool.submit(mergeTask);
+ }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index 4ddbc2e..1c5366e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -99,16 +99,14 @@ public class MergeTask implements Callable<Void> {
private long currDeviceMaxTime;
- protected MergeTask() {
-
- }
+ protected MergeCallback callback;
public MergeTask(List<TsFileResource> seqFiles,
- List<TsFileResource> unseqFiles, String storageGroupDir) throws IOException {
+ List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback) throws IOException {
this.seqFiles = seqFiles;
this.unseqFiles = unseqFiles;
this.storageGroupDir = storageGroupDir;
- this.mergeLogger = new MergeLogger(storageGroupDir);
+ this.callback = callback;
}
@Override
@@ -118,7 +116,7 @@ public class MergeTask implements Callable<Void> {
}
private void doMerge() throws MetadataErrorException, IOException {
-
+ this.mergeLogger = new MergeLogger(storageGroupDir);
logFiles();
List<Path> unmergedSeries = collectPathsInUnseqFiles();
@@ -187,6 +185,10 @@ public class MergeTask implements Callable<Void> {
File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
mergeFile.delete();
}
+
+ if (executeCallback) {
+ callback.call(seqFiles, unseqFiles);
+ }
}
private void moveMergedToOld(TsFileResource seqFile) throws IOException {
@@ -249,7 +251,7 @@ public class MergeTask implements Callable<Void> {
maxVersion = maxVersion < chunkMetaData.getVersion() ? chunkMetaData.getVersion() :
maxVersion;
}
- fileWriter.endChunkGroup(maxVersion);
+ fileWriter.endChunkGroup(maxVersion + 1);
}
private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
@@ -276,7 +278,7 @@ public class MergeTask implements Callable<Void> {
fileWriter.startChunkGroup(path.getDevice());
long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
chunkWriter, measurementSchema, fileWriter);
- fileWriter.endChunkGroup(maxVersion);
+ fileWriter.endChunkGroup(maxVersion + 1);
}
}
@@ -365,7 +367,7 @@ public class MergeTask implements Callable<Void> {
mergeFileWriter.startChunkGroup(deviceId);
if (mergeChunks(seqChunkMeta, fileLimitTime, chunkLoader, measurementSchema,
unseqReader, mergeFileWriter, currTsFile, path)) {
- mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion());
+ mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion() + 1);
mergeLogger.logFilePositionUpdate(mergeFileWriter.getFile());
}
currTsFile.updateTime(path.getDevice(), currDeviceMaxTime);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
index 9234443..740e854 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -53,11 +53,8 @@ public class RecoverMergeTask extends MergeTask {
public RecoverMergeTask(
List<TsFileResource> allSeqFiles,
List<TsFileResource> allUnseqFiles,
- String storageGroupDir) throws IOException {
- super();
- this.seqFiles = allSeqFiles;
- this.unseqFiles = allUnseqFiles;
- this.storageGroupDir = storageGroupDir;
+ String storageGroupDir, MergeCallback callback) throws IOException {
+ super(allSeqFiles, allUnseqFiles, storageGroupDir, callback);
}
public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 964d89e..26438a8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.engine.modification;
+import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -117,4 +118,10 @@ public class ModificationFile {
public void setFilePath(String filePath) {
this.filePath = filePath;
}
+
+ public void remove() throws IOException {
+ close();
+ new File(filePath).delete();
+ }
+
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
index 864d589..ffbd5fe 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
@@ -43,7 +43,7 @@ public class FlushManager {
}
/**
- * Add BufferWriteProcessor to asyncFlush manager
+ * Add TsFileProcessor to asyncFlush manager
*/
@SuppressWarnings("squid:S2445")
void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2cf59c9..e3876c3 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -36,13 +36,18 @@ import org.apache.commons.io.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.conf.directories.DirectoryManager;
import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.MergeFileSelector;
+import org.apache.iotdb.db.engine.merge.MergeTask;
+import org.apache.iotdb.db.engine.merge.MergeManager;
import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.modification.ModificationFile;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
import org.apache.iotdb.db.engine.version.VersionController;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.exception.StorageGroupProcessorException;
import org.apache.iotdb.db.exception.ProcessorException;
import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -127,6 +132,8 @@ public class StorageGroupProcessor {
*/
private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
private String storageGroupName;
+ private File storageGroupSysDir;
+
/**
* versionController assigns a version for each MemTable and deletion/update such that after
* they are persisted, the order of insertions, deletions and updates can be re-determined.
@@ -140,6 +147,7 @@ public class StorageGroupProcessor {
*/
@SuppressWarnings("unused") // to be used in merge
private ReentrantLock mergeDeleteLock = new ReentrantLock();
+ private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
/**
* This is the modification file of the result of the current merge. Because the merged file
@@ -147,6 +155,9 @@ public class StorageGroupProcessor {
*/
private ModificationFile mergingModification;
+ private volatile boolean isMerging = false;
+ private long mergeStartTime;
+
public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
throws ProcessorException {
@@ -156,7 +167,7 @@ public class StorageGroupProcessor {
this.fileSchema = constructFileSchema(storageGroupName);
try {
- File storageGroupSysDir = new File(systemInfoDir, storageGroupName);
+ storageGroupSysDir = new File(systemInfoDir, storageGroupName);
if (storageGroupSysDir.mkdirs()) {
logger.info("Storage Group system Directory {} doesn't exist, create it",
storageGroupSysDir.getPath());
@@ -444,6 +455,7 @@ public class StorageGroupProcessor {
// TODO need a read lock, please consider the concurrency with flush manager threads.
public QueryDataSource query(String deviceId, String measurementId, QueryContext context) {
insertLock.readLock().lock();
+ mergeLock.readLock().lock();
try {
List<TsFileResource> seqResources = getFileReSourceListForQuery(sequenceFileList,
deviceId, measurementId, context);
@@ -452,6 +464,7 @@ public class StorageGroupProcessor {
return new QueryDataSource(new Path(deviceId, measurementId), seqResources, unseqResources);
} finally {
insertLock.readLock().unlock();
+ mergeLock.readLock().unlock();
}
}
@@ -515,6 +528,8 @@ public class StorageGroupProcessor {
public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
// TODO: how to avoid partial deletion?
writeLock();
+ mergeLock.writeLock().lock();
+ mergeLock.writeLock().lock();
// record files which are updated so that we can roll back them in case of exception
List<ModificationFile> updatedModFiles = new ArrayList<>();
@@ -558,6 +573,7 @@ public class StorageGroupProcessor {
throw new IOException(e);
} finally {
writeUnlock();
+ mergeLock.writeLock().unlock();
}
}
@@ -632,6 +648,86 @@ public class StorageGroupProcessor {
}
}
+ public void merge() {
+ writeLock();
+ try {
+ if (isMerging) {
+ if (logger.isInfoEnabled()) {
+ logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName,
+ (System.currentTimeMillis() - mergeStartTime));
+ }
+ return;
+ }
+ if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) {
+ logger.info("{} no files to be merged", storageGroupName);
+ }
+
+ long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
+ MergeFileSelector fileSelector = new MergeFileSelector(sequenceFileList, unSequenceFileList,
+ budget);
+ try {
+ List[] mergeFiles = fileSelector.doSelect();
+ if (mergeFiles.length == 0) {
+ logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
+ budget);
+ return;
+ }
+ MergeTask mergeTask = new MergeTask(mergeFiles[0], mergeFiles[1],
+ storageGroupSysDir.getPath(), this::mergeEndAction);
+ MergeManager.getINSTANCE().submit(mergeTask);
+ if (logger.isInfoEnabled()) {
+ logger.info("{} submits a merge task, merging {} seqFiles, {} unseqFiles",
+ storageGroupName, mergeFiles[0].size(), mergeFiles[1].size());
+ }
+ isMerging = true;
+ mergeStartTime = System.currentTimeMillis();
+
+ } catch (MergeException e) {
+ logger.error("{} cannot select file for merge", storageGroupName, e);
+ } catch (IOException e) {
+ logger.error("{} cannot create a merge task", storageGroupName, e);
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ private void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
+ mergeLock.writeLock().lock();
+ try {
+ unSequenceFileList.removeAll(unseqFiles);
+ } finally {
+ mergeLock.writeLock().unlock();
+ }
+
+ for (int i = 0; i < seqFiles.size(); i++) {
+ TsFileResource seqFile = seqFiles.get(i);
+ seqFile.getMergeQueryLock().writeLock().lock();
+ mergeLock.writeLock().lock();
+ try {
+ // remove old modifications and write modifications generated during merge
+ seqFile.removeModFile();
+ for (Modification modification : mergingModification.getModifications()) {
+ seqFile.getModFile().write(modification);
+ }
+ } catch (IOException e) {
+ logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+ seqFile.getFile(), e);
+ }
+ if (i == seqFiles.size() - 1) {
+ try {
+ mergingModification.remove();
+ } catch (IOException e) {
+ logger.error("{} cannot remove merging modification ", storageGroupName, e);
+ }
+ mergingModification = null;
+ isMerging = false;
+ }
+ seqFile.getMergeQueryLock().writeLock().unlock();
+ mergeLock.writeLock().unlock();
+ }
+ }
+
public TsFileProcessor getWorkSequenceTsFileProcessor() {
return workSequenceTsFileProcessor;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index df979c8..b00e422 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -235,4 +235,9 @@ public class TsFileResource {
public ReentrantReadWriteLock getMergeQueryLock() {
return mergeQueryLock;
}
+
+ public void removeModFile() throws IOException {
+ getModFile().remove();
+ modFile = null;
+ }
}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 243b3a6..f77c353 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -386,7 +386,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
* @return true if the statement is ADMIN COMMAND
* @throws IOException exception
*/
- private boolean execAdminCommand(String statement) {
+ private boolean execAdminCommand(String statement) throws StorageEngineException {
if (!"root".equals(username.get())) {
return false;
}
@@ -399,8 +399,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
StorageEngine.getInstance().syncCloseAllProcessor();
return true;
case "merge":
- // TODO change to merge!!!
- throw new UnsupportedOperationException("merge not implemented");
+ StorageEngine.getInstance().mergeAll();
default:
return false;
}