You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/02 05:40:58 UTC

carbondata git commit: [CARBONDATA-2043] Configurable wait time for requesting executors and minimum registered executors ratio to continue the block distribution - carbon.dynamicAllocation.schedulerTimeout : to configure wait time. defalt 5sec, Min 5 se

Repository: carbondata
Updated Branches:
  refs/heads/master 54a381c27 -> 473bd3197


[CARBONDATA-2043] Configurable wait time for requesting executors and minimum registered executors ratio to continue the block distribution
- carbon.dynamicAllocation.schedulerTimeout : to configure wait time. defalt 5sec, Min 5 sec and max 15 sec.
- carbon.scheduler.minRegisteredResourcesRatio :     min 0.1, max 1.0 and default to 0.8 to configure minimum registered executors ratio.

This closes #1822


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/473bd319
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/473bd319
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/473bd319

Branch: refs/heads/master
Commit: 473bd3197a69e3c0574f8c07f04c29e43f7a023d
Parents: 54a381c
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Fri Dec 22 17:30:31 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Feb 2 11:10:23 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   | 71 ++++++++++-----
 .../carbondata/core/util/CarbonProperties.java  | 90 +++++++++++++++-----
 .../core/CarbonPropertiesValidationTest.java    | 42 +++++++++
 .../spark/sql/hive/DistributionUtil.scala       | 67 ++++++++++-----
 4 files changed, 205 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7ae3034..87eec8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1149,29 +1149,6 @@ public final class CarbonCommonConstants {
    */
   public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS = 20000;
 
-  /**
-   * Maximum waiting time (in seconds) for a query for requested executors to be started
-   */
-  @CarbonProperty
-  public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT =
-      "carbon.max.executor.startup.timeout";
-
-  /**
-   * default value for executor start up waiting time out
-   */
-  public static final String CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT = "5";
-
-  /**
-   * Max value. If value configured by user is more than this than this value will value will be
-   * considered
-   */
-  public static final int CARBON_EXECUTOR_WAITING_TIMEOUT_MAX = 60;
-
-  /**
-   * time for which thread will sleep and check again if the requested number of executors
-   * have been started
-   */
-  public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250;
 
   /**
    * to enable unsafe column page in write step
@@ -1537,6 +1514,54 @@ public final class CarbonCommonConstants {
   public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
 
   /**
+   * minimum required registered resource for starting block distribution
+   */
+  @CarbonProperty
+  public static final String CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO =
+      "carbon.scheduler.minregisteredresourcesratio";
+  /**
+   * default minimum required registered resource for starting block distribution
+   */
+  public static final String CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT = "0.8d";
+  /**
+   * minimum required registered resource for starting block distribution
+   */
+  public static final double CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN = 0.1d;
+  /**
+   * max minimum required registered resource for starting block distribution
+   */
+  public static final double CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX = 1.0d;
+
+  /**
+   * To define how much time scheduler should wait for the
+   * resource in dynamic allocation.
+   */
+  public static final String CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT =
+      "carbon.dynamicallocation.schedulertimeout";
+
+  /**
+   * default scheduler wait time
+   */
+  public static final String CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT = "5";
+
+  /**
+   * default value for executor start up waiting time out
+   */
+  public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MIN = 5;
+
+  /**
+   * Max value. If value configured by user is more than this than this value will value will be
+   * considered
+   */
+  public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MAX = 15;
+
+  /**
+   * time for which thread will sleep and check again if the requested number of executors
+   * have been started
+   */
+  public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME = 250;
+
+  /**
    * It allows queries on hive metastore directly along with filter information, otherwise first
    * fetches all partitions from hive and apply filters on it.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index fd78efc..39a0b80 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -35,7 +35,33 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.*;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.BLOCKLET_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATA_FILE_VERSION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATE_FORMAT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_VECTOR_READER;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.HANDOFF_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.LOCK_TYPE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_BLOCK_SORT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_SIZE;
 import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB;
 import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO;
 
@@ -106,8 +132,8 @@ public final class CarbonProperties {
       case CARBON_DATA_FILE_VERSION:
         validateCarbonDataFileVersion();
         break;
-      case CARBON_EXECUTOR_STARTUP_TIMEOUT:
-        validateExecutorStartUpTime();
+      case CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT:
+        validateDynamicSchedulerTimeOut();
         break;
       case CARBON_PREFETCH_BUFFERSIZE:
         validatePrefetchBufferSize();
@@ -156,6 +182,9 @@ public final class CarbonProperties {
       case ENABLE_AUTO_HANDOFF:
         validateHandoffSize();
         break;
+      case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO:
+        validateSchedulerMinRegisteredRatio();
+        break;
       // TODO : Validation for carbon.lock.type should be handled for addProperty flow
       default:
         // none
@@ -171,7 +200,7 @@ public final class CarbonProperties {
     validateNumCoresBlockSort();
     validateSortSize();
     validateCarbonDataFileVersion();
-    validateExecutorStartUpTime();
+    validateDynamicSchedulerTimeOut();
     validatePrefetchBufferSize();
     validateBlockletGroupSizeInMB();
     validateNumberOfColumnPerIORead();
@@ -193,6 +222,7 @@ public final class CarbonProperties {
     validateSortFileWriteBufferSize();
     validateSortIntermediateFilesLimit();
     validateEnableAutoHandoff();
+    validateSchedulerMinRegisteredRatio();
   }
 
   /**
@@ -253,6 +283,36 @@ public final class CarbonProperties {
   }
 
   /**
+   * minimum required registered resource for starting block distribution
+   */
+  private void validateSchedulerMinRegisteredRatio() {
+    String value = carbonProperties
+        .getProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+            CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+    try {
+      double minRegisteredResourceRatio = java.lang.Double.parseDouble(value);
+      if (minRegisteredResourceRatio < CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN
+          || minRegisteredResourceRatio > CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX) {
+        LOGGER.warn("The value \"" + value
+            + "\" configured for key " + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
+            + "\" is not in range. Valid range is (byte) \""
+            + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN + " to \""
+            + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX + ". Using the default value \""
+            + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+        carbonProperties.setProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+            CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+      }
+    } catch (NumberFormatException e) {
+      LOGGER.warn("The value \"" + value
+          + "\" configured for key " + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
+          + "\" is invalid. Using the default value \""
+          + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+      carbonProperties.setProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+          CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+    }
+  }
+
+  /**
    * The method validate the validity of configured carbon.date.format value
    * and reset to default value if validation fail
    */
@@ -984,23 +1044,11 @@ public final class CarbonProperties {
   /**
    * This method will validate and set the value for executor start up waiting time out
    */
-  private void validateExecutorStartUpTime() {
-    int executorStartUpTimeOut = 0;
-    try {
-      executorStartUpTimeOut = Integer.parseInt(carbonProperties
-          .getProperty(CARBON_EXECUTOR_STARTUP_TIMEOUT,
-              CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT));
-      // If value configured by user is more than max value of time out then consider the max value
-      if (executorStartUpTimeOut > CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX) {
-        executorStartUpTimeOut = CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX;
-      }
-    } catch (NumberFormatException ne) {
-      executorStartUpTimeOut =
-          Integer.parseInt(CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT);
-    }
-    carbonProperties.setProperty(CARBON_EXECUTOR_STARTUP_TIMEOUT,
-        String.valueOf(executorStartUpTimeOut));
-    LOGGER.info("Executor start up wait time: " + executorStartUpTimeOut);
+  private void validateDynamicSchedulerTimeOut() {
+    validateRange(CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT,
+        CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT,
+        CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MIN,
+        CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MAX);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
index daf6db0..bbfe26c 100644
--- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -205,4 +205,46 @@ public class CarbonPropertiesValidationTest extends TestCase {
     assertTrue(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE
         .equalsIgnoreCase(valueAfterValidation));
   }
+
+  @Test public void testValidateDynamicSchedulerTimeOut() {
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "2");
+    String valueAfterValidation = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT);
+    assertTrue(valueAfterValidation
+        .equals(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT));
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "16");
+    valueAfterValidation = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT);
+    assertTrue(valueAfterValidation
+        .equals(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT));
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "15");
+    valueAfterValidation = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT);
+    assertTrue(valueAfterValidation
+        .equals("15"));
+
+  }
+  @Test public void testValidateSchedulerMinRegisteredRatio() {
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "0.0");
+    String valueAfterValidation = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO);
+    assertTrue(valueAfterValidation
+        .equals(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT));
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "-0.1");
+    valueAfterValidation = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO);
+    assertTrue(valueAfterValidation
+        .equals(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT));
+    carbonProperties
+        .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "0.1");
+    valueAfterValidation = carbonProperties
+        .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO);
+    assertTrue(valueAfterValidation.equals("0.1"));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 37b722f..1958d61 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
 
 import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
 
 import org.apache.spark.SparkContext
 import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -33,6 +34,26 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
 object DistributionUtil {
   @transient
   val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  /*
+   *  minimum required registered resource for starting block distribution
+   */
+  lazy val minRegisteredResourceRatio: Double = {
+    val value: String = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+        CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT)
+    java.lang.Double.parseDouble(value)
+  }
+
+  /*
+   * node registration wait time
+   */
+  lazy val dynamicAllocationSchTimeOut: Integer = {
+    val value: String = CarbonProperties.getInstance()
+      .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT,
+        CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT)
+    // milli second
+    java.lang.Integer.parseInt(value) * 1000
+  }
 
   /*
    * This method will return the list of executers in the cluster.
@@ -202,18 +223,25 @@ object DistributionUtil {
     var nodes = DistributionUtil.getNodeList(sparkContext)
     // calculate the number of times loop has to run to check for starting
     // the requested number of executors
-    val threadSleepTime = CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME
-    val loopCounter = calculateCounterBasedOnExecutorStartupTime(threadSleepTime)
-    var maxTimes = loopCounter
-    while (nodes.length < requiredExecutors && maxTimes > 0) {
-      Thread.sleep(threadSleepTime)
-      nodes = DistributionUtil.getNodeList(sparkContext)
-      maxTimes = maxTimes - 1
+    val threadSleepTime =
+    CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME
+    val maxRetryCount = calculateMaxRetry
+    var maxTimes = maxRetryCount
+    breakable {
+      while (nodes.length < requiredExecutors && maxTimes > 0) {
+        Thread.sleep(threadSleepTime);
+        nodes = DistributionUtil.getNodeList(sparkContext)
+        maxTimes = maxTimes - 1;
+        val resourceRatio = (nodes.length.toDouble / requiredExecutors)
+        if (resourceRatio.compareTo(minRegisteredResourceRatio) >= 0) {
+          break
+        }
+      }
     }
     val timDiff = System.currentTimeMillis() - startTime
     LOGGER.info(s"Total Time taken to ensure the required executors : $timDiff")
     LOGGER.info(s"Time elapsed to allocate the required executors: " +
-      s"${(loopCounter - maxTimes) * threadSleepTime}")
+      s"${(maxRetryCount - maxTimes) * threadSleepTime}")
     nodes.distinct.toSeq
   }
 
@@ -245,21 +273,18 @@ object DistributionUtil {
   /**
    * This method will calculate how many times a loop will run with an interval of given sleep
    * time to wait for requested executors to come up
-   *
-   * @param threadSleepTime
-   * @return
+    *
+    * @return The max retry count
    */
-  private def calculateCounterBasedOnExecutorStartupTime(threadSleepTime: Int): Int = {
-    var executorStartUpTimeOut = CarbonProperties.getInstance
-      .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT,
-        CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT).toInt
-    // convert seconds into milliseconds for loop counter calculation
-    executorStartUpTimeOut = executorStartUpTimeOut * 1000
-    // make executor start up time exactly divisible by thread sleep time
-    val remainder = executorStartUpTimeOut % threadSleepTime
+  def calculateMaxRetry(): Int = {
+    val remainder = dynamicAllocationSchTimeOut % CarbonCommonConstants
+      .CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME
+    val retryCount: Int = dynamicAllocationSchTimeOut / CarbonCommonConstants
+      .CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME
     if (remainder > 0) {
-      executorStartUpTimeOut = executorStartUpTimeOut + threadSleepTime - remainder
+      retryCount + 1
+    } else {
+      retryCount
     }
-    executorStartUpTimeOut / threadSleepTime
   }
 }