You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/16 13:22:01 UTC

[incubator-iotdb] branch dev_timewindow_strategy updated: finish the framework

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_timewindow_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_timewindow_strategy by this push:
     new fd11fb6  finish the framework
fd11fb6 is described below

commit fd11fb6ed4e74e0f50b8f564ce92af4463d4bcc1
Author: jt <jt...@163.com>
AuthorDate: Wed Oct 16 21:06:26 2019 +0800

    finish the framework
---
 .../db/conf/directories/DirectoryManager.java      | 18 ++--
 .../directories/strategy/DirectoryStrategy.java    | 11 ++-
 .../strategy/MaxDiskUsableSpaceFirstStrategy.java  |  2 +-
 .../MinFolderOccupiedSpaceFirstStrategy.java       |  2 +-
 .../db/conf/directories/strategy/NopStrategy.java  |  2 +-
 .../directories/strategy/SequenceStrategy.java     |  2 +-
 .../directories/strategy/TimeWindowStrategy.java   | 96 +++++++++++++++++++---
 .../engine/storagegroup/StorageGroupProcessor.java | 23 +++++-
 .../strategy/DirectoryStrategyTest.java            | 10 +--
 9 files changed, 128 insertions(+), 38 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index 1ec95c6..590ec7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -72,14 +72,8 @@ public class DirectoryManager {
       sequenceStrategy.init(sequenceFileFolders);
       unsequenceStrategy = (DirectoryStrategy) dsClazz.newInstance();
       unsequenceStrategy.init(unsequenceFileFolders);
-      mergeStrategy = (DirectoryStrategy) msClazz.newInstance();
-      mergeStrategy.init(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
+      mergeStrategy = sequenceStrategy;
 
-      if (sequenceStrategy instanceof TimeWindowStrategy) {
-        // sequence strategy is 1 unit ahead of the merge strategy
-        ((TimeWindowStrategy) sequenceStrategy).setIndexOffset(1);
-        usingTimeWindowStrategy = true;
-      }
     } catch (Exception e) {
       logger.error("can't find sequenceStrategy {} for mult-directories.", dataStrategyName, e);
     }
@@ -121,7 +115,7 @@ public class DirectoryManager {
    * @return next folder index
    */
   public int getNextFolderIndexForSequenceFile() throws DiskSpaceInsufficientException {
-    return sequenceStrategy.nextFolderIndex();
+    return sequenceStrategy.nextInsertFolderIndex();
   }
 
   public String getSequenceFileFolder(int index) {
@@ -154,7 +148,7 @@ public class DirectoryManager {
    * @return next folder index
    */
   private int getNextFolderIndexForUnSequenceFile() throws DiskSpaceInsufficientException {
-    return unsequenceStrategy.nextFolderIndex();
+    return unsequenceStrategy.nextInsertFolderIndex();
   }
 
   private String getUnSequenceFileFolder(int index) {
@@ -178,9 +172,7 @@ public class DirectoryManager {
     return usingTimeWindowStrategy;
   }
 
-  public String getNextFolderForMerge() throws DiskSpaceInsufficientException {
-    int index = mergeStrategy.nextFolderIndex();
-    // a return value of null means files in all folders can be merged
-    return index != -1 ? sequenceFileFolders.get(index) : null;
+  public List<String> getMergableFolders() throws DiskSpaceInsufficientException {
+    return sequenceStrategy.getMergableDirs();
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
index 3216f48..01c4029 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
@@ -64,7 +64,7 @@ public abstract class DirectoryStrategy {
    *
    * @return the index of folder that will be allocated
    */
-  public abstract int nextFolderIndex() throws DiskSpaceInsufficientException;
+  public abstract int nextInsertFolderIndex() throws DiskSpaceInsufficientException;
 
   /**
    * Return the actual string value of a folder by its index.
@@ -76,6 +76,15 @@ public abstract class DirectoryStrategy {
     return folders.get(index);
   }
 
+  /**
+   *
+   * @return a list of dirs where merge is allowed, return null if all dirs are allowed
+   * @throws DiskSpaceInsufficientException
+   */
+  public List<String> getMergableDirs() throws DiskSpaceInsufficientException {
+    return null;
+  }
+
   // only used by test
   public String getFolderForTest() {
     return getTsFileFolder(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java
index 94961c6..4dfdf2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.utils.CommonUtils;
 public class MaxDiskUsableSpaceFirstStrategy extends DirectoryStrategy {
 
   @Override
-  public int nextFolderIndex() throws DiskSpaceInsufficientException {
+  public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
     return getMaxSpaceFolder();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java
index 224948a..39ccc65 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.utils.CommonUtils;
 public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy {
 
   @Override
-  public int nextFolderIndex() throws DiskSpaceInsufficientException {
+  public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
     return getMinOccupiedSpaceFolder();
   }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
index 8c2d803..ce299b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 public class NopStrategy extends DirectoryStrategy {
 
   @Override
-  public int nextFolderIndex() throws DiskSpaceInsufficientException {
+  public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
     return -1;
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java
index 75a7870..590c226 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java
@@ -42,7 +42,7 @@ public class SequenceStrategy extends DirectoryStrategy {
   }
 
   @Override
-  public int nextFolderIndex() throws DiskSpaceInsufficientException {
+  public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
     int index = currentIndex;
     currentIndex = tryGetNextIndex(index);
 
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
index 6c0f8c3..8807e0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
@@ -4,6 +4,9 @@
 
 package org.apache.iotdb.db.conf.directories.strategy;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.db.utils.CommonUtils;
@@ -15,28 +18,99 @@ import org.apache.iotdb.db.utils.CommonUtils;
 public class TimeWindowStrategy extends DirectoryStrategy {
 
   private long timeUnit;
-  private int indexOffset;
+
+  private List<String> nonFullFolders = new ArrayList<>();
+  private List<String> dirsForInsert = null;
+  private List<String> dirsForMerge = null;
+  private int insertDirIdx = 0;
+  private int mergeDirIdx = 0;
+
+  private long lastCheckTimeInstance = 0;
 
   public TimeWindowStrategy() {
-    this.indexOffset = 0;
     this.timeUnit = IoTDBDescriptor.getInstance().getConfig().getWindowDirStrategyTimeUnit();
   }
 
   @Override
-  public int nextFolderIndex() throws DiskSpaceInsufficientException {
-    long currTime = System.currentTimeMillis();
-    int startIndex = (int) ((currTime / timeUnit + indexOffset) % folders.size());
-    for (int i = 0; i < folders.size(); i++) {
-      int index = (startIndex + i) % folders.size();
-      if (CommonUtils.hasSpace(folders.get(index))) {
-        return index;
+  public void init(List<String> folders) throws DiskSpaceInsufficientException {
+    super.init(folders);
+
+
+  }
+
+  /**
+   * If time has changed to next instance (time unit is 1 hour and time has changed from 8:00 to
+   * 9:00), switch dirs for merge and insert.
+   */
+  private void checkSwitch() throws DiskSpaceInsufficientException {
+    long currentTimeInstance = System.currentTimeMillis() / timeUnit;
+    if (currentTimeInstance == lastCheckTimeInstance) {
+      return;
+    }
+
+    lastCheckTimeInstance = currentTimeInstance;
+    recalculateDir(currentTimeInstance);
+  }
+
+  private void recalculateDir(long currentTimeInstance) throws DiskSpaceInsufficientException {
+    // re-calculate non-full folders since some previously full folders may be non-full after
+    // a cycle
+    nonFullFolders.clear();
+    for (String folder : folders) {
+      if (CommonUtils.hasSpace(folder)) {
+        nonFullFolders.add(folder);
+      }
+    }
+    if (nonFullFolders.isEmpty()) {
+      throw new DiskSpaceInsufficientException("No available disk");
+    }
+    // divide the folders into ones for insertion and ones for merge
+    if (nonFullFolders.size() == 1) {
+      // we cannot divide if there is only one folder
+      dirsForInsert = Collections.singletonList(nonFullFolders.get(0));
+      dirsForMerge = Collections.singletonList(nonFullFolders.get(0));
+      return;
+    }
+    // TODO: other division strategy
+    int half = folders.size() / 2;
+    List<String> firstHalf = folders.subList(0, half);
+    List<String> secondHalf = folders.subList(half, folders.size());
+    dirsForMerge = currentTimeInstance % 2 == 0 ? firstHalf : secondHalf;
+    dirsForInsert = currentTimeInstance % 2 == 1 ? firstHalf : secondHalf;
+  }
+
+  @Override
+  public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
+    checkSwitch();
+    for (int i = 0; i < dirsForInsert.size(); i++) {
+      insertDirIdx = (insertDirIdx + i) % dirsForInsert.size();
+      if (CommonUtils.hasSpace(dirsForInsert.get(insertDirIdx))) {
+        return insertDirIdx;
+      }
+    }
+    // some dirs for merge may still have space, try re-divide dirs
+    recalculateDir(lastCheckTimeInstance);
+    for (int i = 0; i < dirsForInsert.size(); i++) {
+      insertDirIdx = (insertDirIdx + i) % dirsForInsert.size();
+      if (CommonUtils.hasSpace(dirsForInsert.get(insertDirIdx))) {
+        return insertDirIdx;
       }
     }
     throw new DiskSpaceInsufficientException(
         String.format("All disks of folders %s are full, can't proceed.", folders));
   }
 
-  public void setIndexOffset(int indexOffset) {
-    this.indexOffset = indexOffset;
+  @Override
+  List<String> getMergableDirs() throws DiskSpaceInsufficientException {
+    checkSwitch();
+    if (dirsForMerge.isEmpty()) {
+      // some dirs for insert may still have space, try re-divide dirs
+      recalculateDir(lastCheckTimeInstance);
+      if (dirsForMerge.isEmpty()) {
+        throw new DiskSpaceInsufficientException(
+            String.format("All disks of folders %s are full, can't proceed.", folders));
+      }
+    }
+    return dirsForMerge;
   }
 }
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 45db7d0..6745cca 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -845,6 +845,21 @@ public class StorageGroupProcessor {
     }
   }
 
+  /**
+   *
+   * @param filePath
+   * @param folders
+   * @return true if filePath belongs to any folder in folders, false otherwise
+   */
+  private boolean containedIn(String filePath, List<String> folders) {
+    for (String folder : folders) {
+      if (filePath.startsWith(folder)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   public void merge(boolean fullMerge) {
     if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
       return;
@@ -867,11 +882,11 @@ public class StorageGroupProcessor {
 
       List<TsFileResource> cpSeqFileList = new ArrayList<>(sequenceFileList);
       List<TsFileResource> cpUnseqFileList = new ArrayList<>(unSequenceFileList);
-      String allowedMergeDir = DirectoryManager.getInstance().getNextFolderForMerge();
-      if (allowedMergeDir != null) {
+      List<String> allowedMergeDirs = DirectoryManager.getInstance().getMergableFolders();
+      if (allowedMergeDirs != null) {
         // remove files that are not allowed to merge currently
-        cpSeqFileList.removeIf(file -> !file.getFile().getAbsolutePath().startsWith(allowedMergeDir));
-        cpUnseqFileList.removeIf(file -> !file.getFile().getAbsolutePath().startsWith(allowedMergeDir));
+        cpSeqFileList.removeIf(file -> !containedIn(file.getFile().getAbsolutePath(), allowedMergeDirs));
+        cpUnseqFileList.removeIf(file -> !containedIn(file.getFile().getAbsolutePath(), allowedMergeDirs));
       }
 
       MergeResource mergeResource = new MergeResource(cpSeqFileList, cpUnseqFileList);
diff --git a/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java b/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java
index 4158f79..2bb086f 100644
--- a/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java
@@ -82,7 +82,7 @@ public class DirectoryStrategyTest {
       while (fullDirIndexSet.contains(index)) {
         index = (index + 1) % dataDirList.size();
       }
-      assertEquals(index, sequenceStrategy.nextFolderIndex());
+      assertEquals(index, sequenceStrategy.nextInsertFolderIndex());
     }
   }
 
@@ -93,13 +93,13 @@ public class DirectoryStrategyTest {
 
     int maxIndex = getIndexOfMaxSpace();
     for (int i = 0; i < dataDirList.size(); i++) {
-      assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextFolderIndex());
+      assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextInsertFolderIndex());
     }
 
     PowerMockito.when(CommonUtils.getUsableSpace(dataDirList.get(maxIndex))).thenReturn(0L);
     maxIndex = getIndexOfMaxSpace();
     for (int i = 0; i < dataDirList.size(); i++) {
-      assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextFolderIndex());
+      assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextInsertFolderIndex());
     }
   }
 
@@ -124,13 +124,13 @@ public class DirectoryStrategyTest {
 
     int minIndex = getIndexOfMinOccupiedSpace();
     for (int i = 0; i < dataDirList.size(); i++) {
-      assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextFolderIndex());
+      assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextInsertFolderIndex());
     }
 
     PowerMockito.when(CommonUtils.getOccupiedSpace(dataDirList.get(minIndex))).thenReturn(Long.MAX_VALUE);
     minIndex = getIndexOfMinOccupiedSpace();
     for (int i = 0; i < dataDirList.size(); i++) {
-      assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextFolderIndex());
+      assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextInsertFolderIndex());
     }
   }