You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/07/11 12:08:08 UTC

[incubator-iotdb] branch dev_merge updated: add merge in StorageGroupProcessor and StorageEngine

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_merge
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_merge by this push:
     new dae5889  add merge in StorageGroupProcessor and StorageEngine
dae5889 is described below

commit dae5889d0d73f6951d7cbf6f15e99edeacd9821a
Author: 江天 <jt...@163.com>
AuthorDate: Thu Jul 11 20:05:50 2019 +0800

    add merge in StorageGroupProcessor and StorageEngine
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 20 +++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |  4 +-
 .../iotdb/db/engine/merge/MergeCallback.java       | 29 +++++++
 .../apache/iotdb/db/engine/merge/MergeManager.java | 48 +++++++++++
 .../apache/iotdb/db/engine/merge/MergeTask.java    | 20 +++--
 .../iotdb/db/engine/merge/RecoverMergeTask.java    |  7 +-
 .../db/engine/modification/ModificationFile.java   |  7 ++
 .../iotdb/db/engine/storagegroup/FlushManager.java |  2 +-
 .../engine/storagegroup/StorageGroupProcessor.java | 98 +++++++++++++++++++++-
 .../db/engine/storagegroup/TsFileResource.java     |  5 ++
 .../org/apache/iotdb/db/service/TSServiceImpl.java |  5 +-
 11 files changed, 225 insertions(+), 20 deletions(-)

diff --git a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e7543b0..badd42e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -180,6 +180,10 @@ public class IoTDBConfig {
    */
   private boolean chunkBufferPoolEnable = false;
 
+  private long mergeMemoryBudget = (long) (Runtime.getRuntime().maxMemory() * 0.2);
+
+  private int mergeThreadNum = 2;
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -487,4 +491,20 @@ public class IoTDBConfig {
   void setChunkBufferPoolEnable(boolean chunkBufferPoolEnable) {
     this.chunkBufferPoolEnable = chunkBufferPoolEnable;
   }
+
+  public long getMergeMemoryBudget() {
+    return mergeMemoryBudget;
+  }
+
+  public void setMergeMemoryBudget(long mergeMemoryBudget) {
+    this.mergeMemoryBudget = mergeMemoryBudget;
+  }
+
+  public int getMergeThreadNum() {
+    return mergeThreadNum;
+  }
+
+  public void setMergeThreadNum(int mergeThreadNum) {
+    this.mergeThreadNum = mergeThreadNum;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index d27b0a3..ce9d6fb 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -307,7 +307,9 @@ public class StorageEngine implements IService {
     if (readOnly) {
       throw new StorageEngineException("Current system mode is read only, does not support merge");
     }
-    // TODO
+    for (StorageGroupProcessor storageGroupProcessor : processorMap.values()) {
+      storageGroupProcessor.merge();
+    }
   }
 
   /**
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeCallback.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeCallback.java
new file mode 100644
index 0000000..ad1dcf0
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeCallback.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.util.List;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+@FunctionalInterface
+public interface MergeCallback {
+
+  void call(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles);
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
new file mode 100644
index 0000000..4677e3d
--- /dev/null
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeManager.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.engine.merge;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+public class MergeManager {
+
+  private static final MergeManager INSTANCE = new MergeManager();
+
+  private AtomicInteger threadNum = new AtomicInteger();
+  private ThreadPoolExecutor pool;
+
+  private MergeManager() {
+    pool =
+        (ThreadPoolExecutor) Executors.newFixedThreadPool(
+            IoTDBDescriptor.getInstance().getConfig().getMergeConcurrentThreads(),
+            r -> new Thread(r, "mergeThread-" + threadNum.getAndIncrement()));
+  }
+
+  public static MergeManager getINSTANCE() {
+    return INSTANCE;
+  }
+
+  public void submit(MergeTask mergeTask) {
+    pool.submit(mergeTask);
+  }
+}
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
index 4ddbc2e..1c5366e 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/MergeTask.java
@@ -99,16 +99,14 @@ public class MergeTask implements Callable<Void> {
 
   private long currDeviceMaxTime;
 
-  protected MergeTask() {
-
-  }
+  protected MergeCallback callback;
 
   public MergeTask(List<TsFileResource> seqFiles,
-      List<TsFileResource> unseqFiles, String storageGroupDir) throws IOException {
+      List<TsFileResource> unseqFiles, String storageGroupDir, MergeCallback callback) throws IOException {
     this.seqFiles = seqFiles;
     this.unseqFiles = unseqFiles;
     this.storageGroupDir = storageGroupDir;
-    this.mergeLogger = new MergeLogger(storageGroupDir);
+    this.callback = callback;
   }
 
   @Override
@@ -118,7 +116,7 @@ public class MergeTask implements Callable<Void> {
   }
 
   private void doMerge() throws MetadataErrorException, IOException {
-
+    this.mergeLogger = new MergeLogger(storageGroupDir);
     logFiles();
 
     List<Path> unmergedSeries = collectPathsInUnseqFiles();
@@ -187,6 +185,10 @@ public class MergeTask implements Callable<Void> {
       File mergeFile = new File(seqFile.getFile().getPath() + MERGE_SUFFIX);
       mergeFile.delete();
     }
+
+    if (executeCallback) {
+      callback.call(seqFiles, unseqFiles);
+    }
   }
 
   private void moveMergedToOld(TsFileResource seqFile) throws IOException {
@@ -249,7 +251,7 @@ public class MergeTask implements Callable<Void> {
       maxVersion = maxVersion < chunkMetaData.getVersion() ? chunkMetaData.getVersion() :
           maxVersion;
     }
-    fileWriter.endChunkGroup(maxVersion);
+    fileWriter.endChunkGroup(maxVersion + 1);
   }
 
   private void moveUnmergedToNew(TsFileResource seqFile) throws IOException {
@@ -276,7 +278,7 @@ public class MergeTask implements Callable<Void> {
         fileWriter.startChunkGroup(path.getDevice());
         long maxVersion = writeUnmergedChunks(chunkStartTimes, chunkMetaDataList, chunkLoader,
             chunkWriter, measurementSchema, fileWriter);
-        fileWriter.endChunkGroup(maxVersion);
+        fileWriter.endChunkGroup(maxVersion + 1);
       }
     }
 
@@ -365,7 +367,7 @@ public class MergeTask implements Callable<Void> {
     mergeFileWriter.startChunkGroup(deviceId);
     if (mergeChunks(seqChunkMeta, fileLimitTime, chunkLoader, measurementSchema,
         unseqReader, mergeFileWriter, currTsFile, path)) {
-      mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion());
+      mergeFileWriter.endChunkGroup(seqChunkMeta.get(seqChunkMeta.size() - 1).getVersion() + 1);
       mergeLogger.logFilePositionUpdate(mergeFileWriter.getFile());
     }
     currTsFile.updateTime(path.getDevice(), currDeviceMaxTime);
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
index 9234443..740e854 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/merge/RecoverMergeTask.java
@@ -53,11 +53,8 @@ public class RecoverMergeTask extends MergeTask {
   public RecoverMergeTask(
       List<TsFileResource> allSeqFiles,
       List<TsFileResource> allUnseqFiles,
-      String storageGroupDir) throws IOException {
-    super();
-    this.seqFiles = allSeqFiles;
-    this.unseqFiles = allUnseqFiles;
-    this.storageGroupDir = storageGroupDir;
+      String storageGroupDir, MergeCallback callback) throws IOException {
+    super(allSeqFiles, allUnseqFiles, storageGroupDir, callback);
   }
 
   public void recoverMerge(boolean continueMerge) throws IOException, MetadataErrorException {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
index 964d89e..26438a8 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/modification/ModificationFile.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.engine.modification;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -117,4 +118,10 @@ public class ModificationFile {
   public void setFilePath(String filePath) {
     this.filePath = filePath;
   }
+
+  public void remove() throws IOException {
+    close();
+    new File(filePath).delete();
+  }
+
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
index 864d589..ffbd5fe 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/FlushManager.java
@@ -43,7 +43,7 @@ public class FlushManager {
   }
 
   /**
-   * Add BufferWriteProcessor to asyncFlush manager
+   * Add TsFileProcessor to asyncFlush manager
    */
   @SuppressWarnings("squid:S2445")
   void registerTsFileProcessor(TsFileProcessor tsFileProcessor) {
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 2cf59c9..e3876c3 100755
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -36,13 +36,18 @@ import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.merge.MergeFileSelector;
+import org.apache.iotdb.db.engine.merge.MergeTask;
+import org.apache.iotdb.db.engine.merge.MergeManager;
 import org.apache.iotdb.db.engine.modification.Deletion;
+import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.version.SimpleFileVersionController;
 import org.apache.iotdb.db.engine.version.VersionController;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.exception.MergeException;
 import org.apache.iotdb.db.exception.StorageGroupProcessorException;
 import org.apache.iotdb.db.exception.ProcessorException;
 import org.apache.iotdb.db.exception.TsFileProcessorException;
@@ -127,6 +132,8 @@ public class StorageGroupProcessor {
    */
   private Map<String, Long> latestFlushedTimeForEachDevice = new HashMap<>();
   private String storageGroupName;
+  private File storageGroupSysDir;
+
   /**
    * versionController assigns a version for each MemTable and deletion/update such that after
    * they are persisted, the order of insertions, deletions and updates can be re-determined.
@@ -140,6 +147,7 @@ public class StorageGroupProcessor {
    */
   @SuppressWarnings("unused") // to be used in merge
   private ReentrantLock mergeDeleteLock = new ReentrantLock();
+  private ReentrantReadWriteLock mergeLock = new ReentrantReadWriteLock();
 
   /**
    * This is the modification file of the result of the current merge. Because the merged file
@@ -147,6 +155,9 @@ public class StorageGroupProcessor {
    */
   private ModificationFile mergingModification;
 
+  private volatile boolean isMerging = false;
+  private long mergeStartTime;
+
 
   public StorageGroupProcessor(String systemInfoDir, String storageGroupName)
       throws ProcessorException {
@@ -156,7 +167,7 @@ public class StorageGroupProcessor {
     this.fileSchema = constructFileSchema(storageGroupName);
 
     try {
-      File storageGroupSysDir = new File(systemInfoDir, storageGroupName);
+      storageGroupSysDir = new File(systemInfoDir, storageGroupName);
       if (storageGroupSysDir.mkdirs()) {
         logger.info("Storage Group system Directory {} doesn't exist, create it",
             storageGroupSysDir.getPath());
@@ -444,6 +455,7 @@ public class StorageGroupProcessor {
   // TODO need a read lock, please consider the concurrency with flush manager threads.
   public QueryDataSource query(String deviceId, String measurementId, QueryContext context) {
     insertLock.readLock().lock();
+    mergeLock.readLock().lock();
     try {
       List<TsFileResource> seqResources = getFileReSourceListForQuery(sequenceFileList,
           deviceId, measurementId, context);
@@ -452,6 +464,7 @@ public class StorageGroupProcessor {
       return new QueryDataSource(new Path(deviceId, measurementId), seqResources, unseqResources);
     } finally {
       insertLock.readLock().unlock();
+      mergeLock.readLock().unlock();
     }
   }
 
@@ -515,6 +528,8 @@ public class StorageGroupProcessor {
   public void delete(String deviceId, String measurementId, long timestamp) throws IOException {
     // TODO: how to avoid partial deletion?
     writeLock();
+    mergeLock.writeLock().lock();
+    mergeLock.writeLock().lock();
 
     // record files which are updated so that we can roll back them in case of exception
     List<ModificationFile> updatedModFiles = new ArrayList<>();
@@ -558,6 +573,7 @@ public class StorageGroupProcessor {
       throw new IOException(e);
     } finally {
       writeUnlock();
+      mergeLock.writeLock().unlock();
     }
   }
 
@@ -632,6 +648,86 @@ public class StorageGroupProcessor {
     }
   }
 
+  public void merge() {
+    writeLock();
+    try {
+      if (isMerging) {
+        if (logger.isInfoEnabled()) {
+          logger.info("{} Last merge is ongoing, currently consumed time: {}ms", storageGroupName,
+              (System.currentTimeMillis() - mergeStartTime));
+        }
+        return;
+      }
+      if (unSequenceFileList.isEmpty() || sequenceFileList.isEmpty()) {
+        logger.info("{} no files to be merged", storageGroupName);
+      }
+
+      long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
+      MergeFileSelector fileSelector = new MergeFileSelector(sequenceFileList, unSequenceFileList,
+           budget);
+      try {
+        List[] mergeFiles = fileSelector.doSelect();
+        if (mergeFiles.length == 0) {
+          logger.info("{} cannot select merge candidates under the budget {}", storageGroupName,
+              budget);
+          return;
+        }
+        MergeTask mergeTask = new MergeTask(mergeFiles[0], mergeFiles[1],
+            storageGroupSysDir.getPath(), this::mergeEndAction);
+        MergeManager.getINSTANCE().submit(mergeTask);
+        if (logger.isInfoEnabled()) {
+          logger.info("{} submits a merge task, merging {} seqFiles, {} unseqFiles",
+              storageGroupName, mergeFiles[0].size(), mergeFiles[1].size());
+        }
+        isMerging = true;
+        mergeStartTime = System.currentTimeMillis();
+
+      } catch (MergeException e) {
+        logger.error("{} cannot select file for merge", storageGroupName, e);
+      } catch (IOException e) {
+        logger.error("{} cannot create a merge task", storageGroupName, e);
+      }
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  private void mergeEndAction(List<TsFileResource> seqFiles, List<TsFileResource> unseqFiles) {
+    mergeLock.writeLock().lock();
+    try {
+      unSequenceFileList.removeAll(unseqFiles);
+    } finally {
+      mergeLock.writeLock().unlock();
+    }
+
+    for (int i = 0; i < seqFiles.size(); i++) {
+      TsFileResource seqFile = seqFiles.get(i);
+      seqFile.getMergeQueryLock().writeLock().lock();
+      mergeLock.writeLock().lock();
+      try {
+        // remove old modifications and write modifications generated during merge
+        seqFile.removeModFile();
+        for (Modification modification : mergingModification.getModifications()) {
+          seqFile.getModFile().write(modification);
+        }
+      } catch (IOException e) {
+        logger.error("{} cannot clean the ModificationFile of {} after merge", storageGroupName,
+            seqFile.getFile(), e);
+      }
+      if (i == seqFiles.size() - 1) {
+        try {
+          mergingModification.remove();
+        } catch (IOException e) {
+          logger.error("{} cannot remove merging modification ", storageGroupName, e);
+        }
+        mergingModification = null;
+        isMerging = false;
+      }
+      seqFile.getMergeQueryLock().writeLock().unlock();
+      mergeLock.writeLock().unlock();
+    }
+  }
+
 
   public TsFileProcessor getWorkSequenceTsFileProcessor() {
     return workSequenceTsFileProcessor;
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index df979c8..b00e422 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -235,4 +235,9 @@ public class TsFileResource {
   public ReentrantReadWriteLock getMergeQueryLock() {
     return mergeQueryLock;
   }
+
+  public void removeModFile() throws IOException {
+    getModFile().remove();
+    modFile = null;
+  }
 }
diff --git a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
index 243b3a6..f77c353 100644
--- a/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
+++ b/iotdb/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
@@ -386,7 +386,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
    * @return true if the statement is ADMIN COMMAND
    * @throws IOException exception
    */
-  private boolean execAdminCommand(String statement) {
+  private boolean execAdminCommand(String statement) throws StorageEngineException {
     if (!"root".equals(username.get())) {
       return false;
     }
@@ -399,8 +399,7 @@ public class TSServiceImpl implements TSIService.Iface, ServerContext {
         StorageEngine.getInstance().syncCloseAllProcessor();
         return true;
       case "merge":
-          // TODO change to merge!!!
-        throw new UnsupportedOperationException("merge not implemented");
+         StorageEngine.getInstance().mergeAll();
       default:
         return false;
     }