You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/10/16 13:22:01 UTC
[incubator-iotdb] branch dev_timewindow_strategy updated: finish
the framework
This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev_timewindow_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git
The following commit(s) were added to refs/heads/dev_timewindow_strategy by this push:
new fd11fb6 finish the framework
fd11fb6 is described below
commit fd11fb6ed4e74e0f50b8f564ce92af4463d4bcc1
Author: jt <jt...@163.com>
AuthorDate: Wed Oct 16 21:06:26 2019 +0800
finish the framework
---
.../db/conf/directories/DirectoryManager.java | 18 ++--
.../directories/strategy/DirectoryStrategy.java | 11 ++-
.../strategy/MaxDiskUsableSpaceFirstStrategy.java | 2 +-
.../MinFolderOccupiedSpaceFirstStrategy.java | 2 +-
.../db/conf/directories/strategy/NopStrategy.java | 2 +-
.../directories/strategy/SequenceStrategy.java | 2 +-
.../directories/strategy/TimeWindowStrategy.java | 96 +++++++++++++++++++---
.../engine/storagegroup/StorageGroupProcessor.java | 23 +++++-
.../strategy/DirectoryStrategyTest.java | 10 +--
9 files changed, 128 insertions(+), 38 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
index 1ec95c6..590ec7b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
@@ -72,14 +72,8 @@ public class DirectoryManager {
sequenceStrategy.init(sequenceFileFolders);
unsequenceStrategy = (DirectoryStrategy) dsClazz.newInstance();
unsequenceStrategy.init(unsequenceFileFolders);
- mergeStrategy = (DirectoryStrategy) msClazz.newInstance();
- mergeStrategy.init(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
+ mergeStrategy = sequenceStrategy;
- if (sequenceStrategy instanceof TimeWindowStrategy) {
- // sequence strategy is 1 unit ahead of the merge strategy
- ((TimeWindowStrategy) sequenceStrategy).setIndexOffset(1);
- usingTimeWindowStrategy = true;
- }
} catch (Exception e) {
logger.error("can't find sequenceStrategy {} for mult-directories.", dataStrategyName, e);
}
@@ -121,7 +115,7 @@ public class DirectoryManager {
* @return next folder index
*/
public int getNextFolderIndexForSequenceFile() throws DiskSpaceInsufficientException {
- return sequenceStrategy.nextFolderIndex();
+ return sequenceStrategy.nextInsertFolderIndex();
}
public String getSequenceFileFolder(int index) {
@@ -154,7 +148,7 @@ public class DirectoryManager {
* @return next folder index
*/
private int getNextFolderIndexForUnSequenceFile() throws DiskSpaceInsufficientException {
- return unsequenceStrategy.nextFolderIndex();
+ return unsequenceStrategy.nextInsertFolderIndex();
}
private String getUnSequenceFileFolder(int index) {
@@ -178,9 +172,7 @@ public class DirectoryManager {
return usingTimeWindowStrategy;
}
- public String getNextFolderForMerge() throws DiskSpaceInsufficientException {
- int index = mergeStrategy.nextFolderIndex();
- // a return value of null means files in all folders can be merged
- return index != -1 ? sequenceFileFolders.get(index) : null;
+ public List<String> getMergableFolders() throws DiskSpaceInsufficientException {
+ return sequenceStrategy.getMergableDirs();
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
index 3216f48..01c4029 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategy.java
@@ -64,7 +64,7 @@ public abstract class DirectoryStrategy {
*
* @return the index of folder that will be allocated
*/
- public abstract int nextFolderIndex() throws DiskSpaceInsufficientException;
+ public abstract int nextInsertFolderIndex() throws DiskSpaceInsufficientException;
/**
* Return the actual string value of a folder by its index.
@@ -76,6 +76,15 @@ public abstract class DirectoryStrategy {
return folders.get(index);
}
+ /**
+ *
+ * @return a list of dirs where merge is allowed, return null if all dirs are allowed
+ * @throws DiskSpaceInsufficientException
+ */
+ public List<String> getMergableDirs() throws DiskSpaceInsufficientException {
+ return null;
+ }
+
// only used by test
public String getFolderForTest() {
return getTsFileFolder(0);
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java
index 94961c6..4dfdf2c 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MaxDiskUsableSpaceFirstStrategy.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.db.utils.CommonUtils;
public class MaxDiskUsableSpaceFirstStrategy extends DirectoryStrategy {
@Override
- public int nextFolderIndex() throws DiskSpaceInsufficientException {
+ public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
return getMaxSpaceFolder();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java
index 224948a..39ccc65 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/MinFolderOccupiedSpaceFirstStrategy.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.db.utils.CommonUtils;
public class MinFolderOccupiedSpaceFirstStrategy extends DirectoryStrategy {
@Override
- public int nextFolderIndex() throws DiskSpaceInsufficientException {
+ public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
return getMinOccupiedSpaceFolder();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
index 8c2d803..ce299b4 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/NopStrategy.java
@@ -29,7 +29,7 @@ import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
public class NopStrategy extends DirectoryStrategy {
@Override
- public int nextFolderIndex() throws DiskSpaceInsufficientException {
+ public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
return -1;
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java
index 75a7870..590c226 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/SequenceStrategy.java
@@ -42,7 +42,7 @@ public class SequenceStrategy extends DirectoryStrategy {
}
@Override
- public int nextFolderIndex() throws DiskSpaceInsufficientException {
+ public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
int index = currentIndex;
currentIndex = tryGetNextIndex(index);
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
index 6c0f8c3..8807e0b 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/strategy/TimeWindowStrategy.java
@@ -4,6 +4,9 @@
package org.apache.iotdb.db.conf.directories.strategy;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import org.apache.iotdb.db.utils.CommonUtils;
@@ -15,28 +18,99 @@ import org.apache.iotdb.db.utils.CommonUtils;
public class TimeWindowStrategy extends DirectoryStrategy {
private long timeUnit;
- private int indexOffset;
+
+ private List<String> nonFullFolders = new ArrayList<>();
+ private List<String> dirsForInsert = null;
+ private List<String> dirsForMerge = null;
+ private int insertDirIdx = 0;
+ private int mergeDirIdx = 0;
+
+ private long lastCheckTimeInstance = 0;
public TimeWindowStrategy() {
- this.indexOffset = 0;
this.timeUnit = IoTDBDescriptor.getInstance().getConfig().getWindowDirStrategyTimeUnit();
}
@Override
- public int nextFolderIndex() throws DiskSpaceInsufficientException {
- long currTime = System.currentTimeMillis();
- int startIndex = (int) ((currTime / timeUnit + indexOffset) % folders.size());
- for (int i = 0; i < folders.size(); i++) {
- int index = (startIndex + i) % folders.size();
- if (CommonUtils.hasSpace(folders.get(index))) {
- return index;
+ public void init(List<String> folders) throws DiskSpaceInsufficientException {
+ super.init(folders);
+
+
+ }
+
+ /**
+ * If time has changed to next instance (time unit is 1 hour and time has changed from 8:00 to
+ * 9:00), switch dirs for merge and insert.
+ */
+ private void checkSwitch() throws DiskSpaceInsufficientException {
+ long currentTimeInstance = System.currentTimeMillis() / timeUnit;
+ if (currentTimeInstance == lastCheckTimeInstance) {
+ return;
+ }
+
+ lastCheckTimeInstance = currentTimeInstance;
+ recalculateDir(currentTimeInstance);
+ }
+
+ private void recalculateDir(long currentTimeInstance) throws DiskSpaceInsufficientException {
+ // re-calculate non-full folders since some previously full folders may be non-full after
+ // a cycle
+ nonFullFolders.clear();
+ for (String folder : folders) {
+ if (CommonUtils.hasSpace(folder)) {
+ nonFullFolders.add(folder);
+ }
+ }
+ if (nonFullFolders.isEmpty()) {
+ throw new DiskSpaceInsufficientException("No available disk");
+ }
+ // divide the folders into ones for insertion and ones for merge
+ if (nonFullFolders.size() == 1) {
+ // we cannot divide if there is only one folder
+ dirsForInsert = Collections.singletonList(nonFullFolders.get(0));
+ dirsForMerge = Collections.singletonList(nonFullFolders.get(0));
+ return;
+ }
+ // TODO: other division strategy
+ int half = folders.size() / 2;
+ List<String> firstHalf = folders.subList(0, half);
+ List<String> secondHalf = folders.subList(half, folders.size());
+ dirsForMerge = currentTimeInstance % 2 == 0 ? firstHalf : secondHalf;
+ dirsForInsert = currentTimeInstance % 2 == 1 ? firstHalf : secondHalf;
+ }
+
+ @Override
+ public int nextInsertFolderIndex() throws DiskSpaceInsufficientException {
+ checkSwitch();
+ for (int i = 0; i < dirsForInsert.size(); i++) {
+ insertDirIdx = (insertDirIdx + i) % dirsForInsert.size();
+ if (CommonUtils.hasSpace(dirsForInsert.get(insertDirIdx))) {
+ return insertDirIdx;
+ }
+ }
+ // some dirs for merge may still have space, try re-divide dirs
+ recalculateDir(lastCheckTimeInstance);
+ for (int i = 0; i < dirsForInsert.size(); i++) {
+ insertDirIdx = (insertDirIdx + i) % dirsForInsert.size();
+ if (CommonUtils.hasSpace(dirsForInsert.get(insertDirIdx))) {
+ return insertDirIdx;
}
}
throw new DiskSpaceInsufficientException(
String.format("All disks of folders %s are full, can't proceed.", folders));
}
- public void setIndexOffset(int indexOffset) {
- this.indexOffset = indexOffset;
+ @Override
+ List<String> getMergableDirs() throws DiskSpaceInsufficientException {
+ checkSwitch();
+ if (dirsForMerge.isEmpty()) {
+ // some dirs for insert may still have space, try re-divide dirs
+ recalculateDir(lastCheckTimeInstance);
+ if (dirsForMerge.isEmpty()) {
+ throw new DiskSpaceInsufficientException(
+ String.format("All disks of folders %s are full, can't proceed.", folders));
+ }
+ }
+ return dirsForMerge;
}
}
\ No newline at end of file
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
index 45db7d0..6745cca 100755
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
@@ -845,6 +845,21 @@ public class StorageGroupProcessor {
}
}
+ /**
+ *
+ * @param filePath
+ * @param folders
+ * @return true if filePath belongs to any folder in folders, false otherwise
+ */
+ private boolean containedIn(String filePath, List<String> folders) {
+ for (String folder : folders) {
+ if (filePath.startsWith(folder)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void merge(boolean fullMerge) {
if (IoTDBDescriptor.getInstance().getConfig().isReadOnly()) {
return;
@@ -867,11 +882,11 @@ public class StorageGroupProcessor {
List<TsFileResource> cpSeqFileList = new ArrayList<>(sequenceFileList);
List<TsFileResource> cpUnseqFileList = new ArrayList<>(unSequenceFileList);
- String allowedMergeDir = DirectoryManager.getInstance().getNextFolderForMerge();
- if (allowedMergeDir != null) {
+ List<String> allowedMergeDirs = DirectoryManager.getInstance().getMergableFolders();
+ if (allowedMergeDirs != null) {
// remove files that are not allowed to merge currently
- cpSeqFileList.removeIf(file -> !file.getFile().getAbsolutePath().startsWith(allowedMergeDir));
- cpUnseqFileList.removeIf(file -> !file.getFile().getAbsolutePath().startsWith(allowedMergeDir));
+ cpSeqFileList.removeIf(file -> !containedIn(file.getFile().getAbsolutePath(), allowedMergeDirs));
+ cpUnseqFileList.removeIf(file -> !containedIn(file.getFile().getAbsolutePath(), allowedMergeDirs));
}
MergeResource mergeResource = new MergeResource(cpSeqFileList, cpUnseqFileList);
diff --git a/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java b/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java
index 4158f79..2bb086f 100644
--- a/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/conf/directories/strategy/DirectoryStrategyTest.java
@@ -82,7 +82,7 @@ public class DirectoryStrategyTest {
while (fullDirIndexSet.contains(index)) {
index = (index + 1) % dataDirList.size();
}
- assertEquals(index, sequenceStrategy.nextFolderIndex());
+ assertEquals(index, sequenceStrategy.nextInsertFolderIndex());
}
}
@@ -93,13 +93,13 @@ public class DirectoryStrategyTest {
int maxIndex = getIndexOfMaxSpace();
for (int i = 0; i < dataDirList.size(); i++) {
- assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextFolderIndex());
+ assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextInsertFolderIndex());
}
PowerMockito.when(CommonUtils.getUsableSpace(dataDirList.get(maxIndex))).thenReturn(0L);
maxIndex = getIndexOfMaxSpace();
for (int i = 0; i < dataDirList.size(); i++) {
- assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextFolderIndex());
+ assertEquals(maxIndex, maxDiskUsableSpaceFirstStrategy.nextInsertFolderIndex());
}
}
@@ -124,13 +124,13 @@ public class DirectoryStrategyTest {
int minIndex = getIndexOfMinOccupiedSpace();
for (int i = 0; i < dataDirList.size(); i++) {
- assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextFolderIndex());
+ assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextInsertFolderIndex());
}
PowerMockito.when(CommonUtils.getOccupiedSpace(dataDirList.get(minIndex))).thenReturn(Long.MAX_VALUE);
minIndex = getIndexOfMinOccupiedSpace();
for (int i = 0; i < dataDirList.size(); i++) {
- assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextFolderIndex());
+ assertEquals(minIndex, minFolderOccupiedSpaceFirstStrategy.nextInsertFolderIndex());
}
}