You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by ji...@apache.org on 2019/11/22 15:01:00 UTC

[incubator-iotdb] branch dev_new_merge_strategy updated: add merge selection time limit

This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch dev_new_merge_strategy
in repository https://gitbox.apache.org/repos/asf/incubator-iotdb.git


The following commit(s) were added to refs/heads/dev_new_merge_strategy by this push:
     new 0474107  add merge selection time limit
0474107 is described below

commit 0474107a940299c1a54f02dff1761cf56cf51dfa
Author: jt2594838 <jt...@163.com>
AuthorDate: Fri Nov 22 22:58:04 2019 +0800

    add merge selection time limit
---
 .../iotdb/db/engine/merge/BaseFileSelector.java       | 17 ++++++++++-------
 .../db/engine/merge/MaxSeriesMergeFileSelector.java   | 13 +++++++++++--
 .../squeeze/selector/SqueezeMaxFileSelector.java      | 19 ++++++++++++++++---
 3 files changed, 37 insertions(+), 12 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java
index 38ef280..42916d1 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/BaseFileSelector.java
@@ -62,10 +62,18 @@ public abstract class BaseFileSelector implements IMergeFileSelector{
   protected int seqSelectedNum;
 
   protected TmpSelectedSeqIterable tmpSelectedSeqIterable;
+  protected long startTime = System.currentTimeMillis();
+  protected long timeConsumption = 0;
+  protected long timeLimit = 0;
 
   @Override
   public void select() throws MergeException {
-    long startTime = System.currentTimeMillis();
+    startTime = System.currentTimeMillis();
+    timeConsumption = 0;
+    timeLimit = IoTDBDescriptor.getInstance().getConfig().getMergeFileSelectionTimeBudget();
+    if (timeLimit < 0) {
+      timeLimit = Long.MAX_VALUE;
+    }
     try {
       logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
           resource.getSeqFiles().size(), resource.getUnseqFiles().size());
@@ -101,12 +109,7 @@ public abstract class BaseFileSelector implements IMergeFileSelector{
     totalCost = 0;
 
     int unseqIndex = 0;
-    long startTime = System.currentTimeMillis();
-    long timeConsumption = 0;
-    long timeLimit = IoTDBDescriptor.getInstance().getConfig().getMergeFileSelectionTimeBudget();
-    if (timeLimit < 0) {
-      timeLimit = Long.MAX_VALUE;
-    }
+
     while (unseqIndex < resource.getUnseqFiles().size() && timeConsumption < timeLimit) {
       // select next unseq files
       TsFileResource unseqFile = resource.getUnseqFiles().get(unseqIndex);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java
index f90b3ab..cf539b6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/MaxSeriesMergeFileSelector.java
@@ -22,6 +22,8 @@ package org.apache.iotdb.db.engine.merge;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.engine.merge.manage.MergeResource;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.exception.MergeException;
@@ -48,6 +50,10 @@ public class MaxSeriesMergeFileSelector<T extends IMergeFileSelector> implements
   private int concurrentMergeNum;
   private long totalCost;
 
+  private long startTime;
+  private long timeConsumption;
+  private long timeLimit;
+
 
   public MaxSeriesMergeFileSelector(T baseSelector) {
     this.baseSelector = baseSelector;
@@ -56,7 +62,9 @@ public class MaxSeriesMergeFileSelector<T extends IMergeFileSelector> implements
 
   @Override
   public void select() throws MergeException {
-    long startTime = System.currentTimeMillis();
+    startTime = System.currentTimeMillis();
+    timeLimit = IoTDBDescriptor.getInstance().getConfig().getMergeFileSelectionTimeBudget();
+    timeConsumption = 0;
     try {
       logger.info("Selecting merge candidates from {} seqFile, {} unseqFiles",
           resource.getSeqFiles().size(),
@@ -126,7 +134,7 @@ public class MaxSeriesMergeFileSelector<T extends IMergeFileSelector> implements
   private void binSearch() throws IOException, MergeException {
     int lb = 0;
     int ub = MAX_SERIES_NUM + 1;
-    while (true) {
+    while (timeConsumption < timeLimit) {
       int mid = (lb + ub) / 2;
       if (mid == lb) {
         break;
@@ -151,6 +159,7 @@ public class MaxSeriesMergeFileSelector<T extends IMergeFileSelector> implements
         lastTotalMemoryCost = baseSelector.getTotalCost();
         lb = mid;
       }
+      timeConsumption = System.currentTimeMillis() - startTime;
     }
     concurrentMergeNum = lb;
     totalCost = lastTotalMemoryCost;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java
index c32bfb9..61ad7bb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/merge/squeeze/selector/SqueezeMaxFileSelector.java
@@ -62,14 +62,22 @@ public class SqueezeMaxFileSelector extends BaseFileSelector {
   }
 
   public void select(boolean useTightBound) throws IOException {
-    super.selectByUnseq(useTightBound);
+    firstOverlapIdx = Integer.MAX_VALUE;
+    lastOverlapIdx = Integer.MIN_VALUE;
+
+    tmpFirstOverlapIdx = Integer.MAX_VALUE;
+    tmpLastOverlapIdx = Integer.MIN_VALUE;
 
+    super.selectByUnseq(useTightBound);
+    logger.info("After selecting by unseq, first seq index:{}, last seq index:{}", firstOverlapIdx, lastOverlapIdx);
     if (firstOverlapIdx <= lastOverlapIdx) {
       // selectByUnseq has found candidates, check if we can extend the selection
       extendCurrentSelection(useTightBound);
+      logger.info("After seq extension, first seq index:{}, last seq index:{}", firstOverlapIdx, lastOverlapIdx);
     } else {
       // try selecting only seq files as candidates
       selectBySeq(useTightBound);
+      logger.info("After seq selection, first seq index:{}, last seq index:{}", firstOverlapIdx, lastOverlapIdx);
     }
     for (int i = firstOverlapIdx; i <= lastOverlapIdx; i++) {
       selectedSeqFiles.add(resource.getSeqFiles().get(i));
@@ -77,7 +85,7 @@ public class SqueezeMaxFileSelector extends BaseFileSelector {
   }
 
   private void selectBySeq(boolean useTightBound) throws IOException {
-    for (int i = 0; i < resource.getSeqFiles().size() - 1; i ++) {
+    for (int i = 0; i < resource.getSeqFiles().size() - 1 && timeConsumption < timeLimit; i ++) {
       // try to find candidates starting from i
       TsFileResource seqFile = resource.getSeqFiles().get(i);
       long fileCost = calculateSeqFileCost(seqFile, useTightBound);
@@ -95,24 +103,29 @@ public class SqueezeMaxFileSelector extends BaseFileSelector {
           lastOverlapIdx = Integer.MIN_VALUE;
         }
       }
+      timeConsumption = System.currentTimeMillis() - startTime;
     }
   }
 
   // if we have selected seqFiles[3] to seqFiles[6], check if we can add seqFiles[7] into the
   // selection without exceeding the budget
   private void extendCurrentSelection(boolean useTightBound) throws IOException {
-    for (int i = lastOverlapIdx + 1; i < resource.getSeqFiles().size(); i++) {
+    for (int i = lastOverlapIdx + 1; i < resource.getSeqFiles().size() && timeConsumption < timeLimit; i++) {
       TsFileResource seqFile = resource.getSeqFiles().get(i);
       long fileCost = calculateSeqFileCost(seqFile, useTightBound);
+      logger.debug("Try extending seq file {}", seqFile);
 
       if (fileCost + totalCost < memoryBudget) {
         maxSeqFileCost = tempMaxSeqFileCost;
         totalCost += fileCost;
         lastOverlapIdx++;
+        logger.debug("Extended seq file {}", seqFile);
       } else {
         tempMaxSeqFileCost = maxSeqFileCost;
+        logger.debug("Cannot extend seq file {}", seqFile);
         break;
       }
+      timeConsumption = System.currentTimeMillis() - startTime;
     }
   }