You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/11 02:13:54 UTC

[incubator-iotdb] branch dev_timewindow_strategy created (now 7385990)

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a change to branch dev_timewindow_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git.


      at 7385990  temp save

This branch includes the following new commits:

     new 7385990  temp save

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-iotdb] 01/01: temp save

Posted by ji...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_timewindow_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git

commit 7385990551858a8796045b2bb8d891fac048db9d
Author: jt <jt...@163.com>
AuthorDate: Fri Oct 11 10:01:22 2019 +0800

    temp save
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 24 +++++++++++++
 .../db/conf/directories/DirectoryManager.java      | 42 +++++++++++++++++-----
 .../directories/strategy/DirectoryStrategy.java    |  4 +--
 .../db/conf/directories/strategy/NopStrategy.java  | 35 ++++++++++++++++++
 .../directories/strategy/TimeWindowStrategy.java   | 42 ++++++++++++++++++++++
 .../db/engine/merge/manage/MergeResource.java      | 30 +++++++---------
 .../iotdb/db/engine/merge/task/MergeTask.java      |  1 +
 .../engine/storagegroup/StorageGroupProcessor.java | 19 +++++++++-
 8 files changed, 169 insertions(+), 28 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index a026da7..7db67ce 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -137,6 +137,12 @@ public class IoTDBConfig {
   private String multiDirStrategyClassName = null;
 
   /**
+   *
+   */
+  private String mergeStrategyClassName = "org.apache.iotdb.db.conf.directories.strategy"
+      + ".TimeWindowStrategy";
+
+  /**
    * Wal directory.
    */
   private String walFolder = "data/wal";
@@ -361,6 +367,12 @@ public class IoTDBConfig {
    */
   private String hdfsPort = "9000";
 
+  /**
+   * default time unit of TimeWindowStrategy
+   */
+  private long windowDirStrategyTimeUnit = 3600 * 1000;
+
+
   public IoTDBConfig() {
     // empty constructor
   }
@@ -992,4 +1004,16 @@ public class IoTDBConfig {
   public void setHdfsPort(String hdfsPort) {
     this.hdfsPort = hdfsPort;
   }
+
+  public long getWindowDirStrategyTimeUnit() {
+    return windowDirStrategyTimeUnit;
+  }
+
+  public void setWindowDirStrategyTimeUnit(long windowDirStrategyTimeUnit) {
+    this.windowDirStrategyTimeUnit = windowDirStrategyTimeUnit;
+  }
+
+  public String getMergeStrategyClassName() {
+    return mergeStrategyClassName;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index f44a2c8..1ec95c6 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy;
+import org.apache.iotdb.db.conf.directories.strategy.TimeWindowStrategy;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.tsfile.fileSystem.TSFileFactory;
 import org.slf4j.Logger;
@@ -40,6 +41,9 @@ public class DirectoryManager {
   private List<String> unsequenceFileFolders;
   private DirectoryStrategy sequenceStrategy;
   private DirectoryStrategy unsequenceStrategy;
+  private DirectoryStrategy mergeStrategy;
+
+  private boolean usingTimeWindowStrategy = false;
 
   private DirectoryManager() {
     sequenceFileFolders =
@@ -56,16 +60,28 @@ public class DirectoryManager {
     }
     mkDataDirs(unsequenceFileFolders);
 
-    String strategyName = "";
+    String dataStrategyName = "";
+    String mergeStrategyName;
     try {
-      strategyName = IoTDBDescriptor.getInstance().getConfig().getMultiDirStrategyClassName();
-      Class<?> clazz = Class.forName(strategyName);
-      sequenceStrategy = (DirectoryStrategy) clazz.newInstance();
+      dataStrategyName = IoTDBDescriptor.getInstance().getConfig().getMultiDirStrategyClassName();
+      mergeStrategyName = IoTDBDescriptor.getInstance().getConfig().getMergeStrategyClassName();
+      Class<?> dsClazz = Class.forName(dataStrategyName);
+      Class<?> msClazz = Class.forName(mergeStrategyName);
+
+      sequenceStrategy = (DirectoryStrategy) dsClazz.newInstance();
       sequenceStrategy.init(sequenceFileFolders);
-      unsequenceStrategy = (DirectoryStrategy) clazz.newInstance();
+      unsequenceStrategy = (DirectoryStrategy) dsClazz.newInstance();
       unsequenceStrategy.init(unsequenceFileFolders);
+      mergeStrategy = (DirectoryStrategy) msClazz.newInstance();
+      mergeStrategy.init(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
+
+      if (sequenceStrategy instanceof TimeWindowStrategy) {
+        // sequence strategy is 1 unit ahead of the merge strategy
+        ((TimeWindowStrategy) sequenceStrategy).setIndexOffset(1);
+        usingTimeWindowStrategy = true;
+      }
     } catch (Exception e) {
-      logger.error("can't find sequenceStrategy {} for mult-directories.", strategyName, e);
+      logger.error("can't find sequenceStrategy {} for mult-directories.", dataStrategyName, e);
     }
   }
 
@@ -137,11 +153,11 @@ public class DirectoryManager {
    *
    * @return next folder index
    */
-  public int getNextFolderIndexForUnSequenceFile() throws DiskSpaceInsufficientException {
+  private int getNextFolderIndexForUnSequenceFile() throws DiskSpaceInsufficientException {
     return unsequenceStrategy.nextFolderIndex();
   }
 
-  public String getUnSequenceFileFolder(int index) {
+  private String getUnSequenceFileFolder(int index) {
     return unsequenceFileFolders.get(index);
   }
 
@@ -157,4 +173,14 @@ public class DirectoryManager {
   public String getUnSequenceFolderForTest() {
     return unsequenceFileFolders.get(0);
   }
+
+  public boolean isUsingTimeWindowStrategy() {
+    return usingTimeWindowStrategy;
+  }
+
+  public String getNextFolderForMerge() throws DiskSpaceInsufficientException {
+    int index = mergeStrategy.nextFolderIndex();
+    // a return value of null means files in all folders can be merged
+    return index != -1 ? sequenceFileFolders.get(index) : null;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
index 085de13..3216f48 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
@@ -35,7 +35,7 @@ public abstract class DirectoryStrategy {
   /**
    * All the folders of data files, should be init once the subclass is created.
    */
-  protected List<String> folders;
+  List<String> folders;
 
   /**
    * To init folders. Do not recommend to overwrite.
@@ -72,7 +72,7 @@ public abstract class DirectoryStrategy {
    * @param index the index of the folder
    * @return the string value of the folder
    */
-  public String getTsFileFolder(int index) {
+  private String getTsFileFolder(int index) {
     return folders.get(index);
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
new file mode 100644
index 0000000..8c2d803
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.conf.directories.strategy;
+
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+
+/**
+ * NopStrategy is only used as a merge strategy and always returns -1, which means files in all dirs
+ * can be merged.
+ */
+public class NopStrategy extends DirectoryStrategy {
+
+  @Override
+  public int nextFolderIndex() throws DiskSpaceInsufficientException {
+    return -1;
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
new file mode 100644
index 0000000..6c0f8c3
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at      http://www.apache.org/licenses/LICENSE-2.0  Unless required by applicable law or ag [...]
+ */
+
+package org.apache.iotdb.db.conf.directories.strategy;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.db.utils.CommonUtils;
+
+/**
+ * TimeWindowStrategy switches the directory periodically, making it possible to merge and
+ * write on different disks.
+ */
+public class TimeWindowStrategy extends DirectoryStrategy {
+
+  private long timeUnit;
+  private int indexOffset;
+
+  public TimeWindowStrategy() {
+    this.indexOffset = 0;
+    this.timeUnit = IoTDBDescriptor.getInstance().getConfig().getWindowDirStrategyTimeUnit();
+  }
+
+  @Override
+  public int nextFolderIndex() throws DiskSpaceInsufficientException {
+    long currTime = System.currentTimeMillis();
+    int startIndex = (int) ((currTime / timeUnit + indexOffset) % folders.size());
+    for (int i = 0; i < folders.size(); i++) {
+      int index = (startIndex + i) % folders.size();
+      if (CommonUtils.hasSpace(folders.get(index))) {
+        return index;
+      }
+    }
+    throw new DiskSpaceInsufficientException(
+        String.format("All disks of folders %s are full, can't proceed.", folders));
+  }
+
+  public void setIndexOffset(int indexOffset) {
+    this.indexOffset = indexOffset;
+  }
+}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
index 54463d9..3b84eca 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/manage/MergeResource.java
@@ -19,12 +19,23 @@
 
 package org.apache.iotdb.db.engine.merge.manage;
 
+import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
 import org.apache.iotdb.db.engine.modification.Modification;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.query.reader.IPointReader;
 import org.apache.iotdb.db.query.reader.resourceRelated.CachedUnseqResourceMergeReader;
 import org.apache.iotdb.db.utils.MergeUtils;
-import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
 import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetaData;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
@@ -37,16 +48,6 @@ import org.apache.iotdb.tsfile.write.chunk.ChunkWriterImpl;
 import org.apache.iotdb.tsfile.write.chunk.IChunkWriter;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-
-import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
 
 /**
  * MergeResource manages files and caches of readers, writers, MeasurementSchemas and
@@ -54,8 +55,6 @@ import static org.apache.iotdb.db.engine.merge.task.MergeTask.MERGE_SUFFIX;
  */
 public class MergeResource {
 
-  private static final Logger logger = LoggerFactory.getLogger(MergeResource.class);
-
   private List<TsFileResource> seqFiles;
   private List<TsFileResource> unseqFiles;
 
@@ -73,6 +72,7 @@ public class MergeResource {
         unseqFiles.stream().filter(TsFileResource::isClosed).collect(Collectors.toList());
   }
 
+
   public void clear() throws IOException {
     for (TsFileSequenceReader sequenceReader : fileReaderCache.values()) {
       sequenceReader.close();
@@ -231,10 +231,6 @@ public class MergeResource {
     this.unseqFiles = unseqFiles;
   }
 
-  public Map<String, MeasurementSchema> getMeasurementSchemaMap() {
-    return measurementSchemaMap;
-  }
-
   public void removeOutdatedSeqReaders() throws IOException {
     Iterator<Entry<TsFileResource, TsFileSequenceReader>> entryIterator =
         fileReaderCache.entrySet().iterator();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
index 39bbb02..3250ded 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/task/MergeTask.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.db.engine.merge.manage.MergeContext;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.recover.MergeLogger;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.exception.MetadataErrorException;
 import org.apache.iotdb.db.metadata.MManager;
 import org.apache.iotdb.db.utils.MergeUtils;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 70ac9c7..45db7d0 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -39,6 +39,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.commons.io.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.merge.manage.MergeManager;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.merge.selector.IMergeFileSelector;
@@ -845,6 +846,9 @@ public class StorageGroupProcessor {
   }
 
   public void merge(boolean fullMerge) {
+    if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
+      return;
+    }
     writeLock();
     try {
       if (isMerging) {
@@ -860,7 +864,17 @@ public class StorageGroupProcessor {
       }
 
       long budget = IoTDBDescriptor.getInstance().getConfig().getMergeMemoryBudget();
-      MergeResource mergeResource = new MergeResource(sequenceFileList, unSequenceFileList);
+
+      List<TsFileResource> cpSeqFileList = new ArrayList<>(sequenceFileList);
+      List<TsFileResource> cpUnseqFileList = new ArrayList<>(unSequenceFileList);
+      String allowedMergeDir = DirectoryManager.getInstance().getNextFolderForMerge();
+      if (allowedMergeDir != null) {
+        // remove files that are not allowed to merge currently
+        cpSeqFileList.removeIf(file -> !file.getFile().getAbsolutePath().startsWith(allowedMergeDir));
+        cpUnseqFileList.removeIf(file -> !file.getFile().getAbsolutePath().startsWith(allowedMergeDir));
+      }
+
+      MergeResource mergeResource = new MergeResource(cpSeqFileList, cpUnseqFileList);
       IMergeFileSelector fileSelector = getMergeFileSelector(budget, mergeResource);
       try {
         List[] mergeFiles = fileSelector.select();
@@ -890,6 +904,9 @@ public class StorageGroupProcessor {
       } catch (MergeException | IOException e) {
         logger.error("{} cannot select file for merge", storageGroupName, e);
       }
+    } catch (DiskSpaceInsufficientException e) {
+      logger.error("No sufficient space, merge aborted.", e);
+      IoTDBDescriptor.getInstance().getConfig().setReadOnly(true);
     } finally {
       writeUnlock();
     }