You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/02 05:40:58 UTC
carbondata git commit: [CARBONDATA-2043] Configurable wait time for
requesting executors and minimum registered executors ratio to continue the
block distribution - carbon.dynamicAllocation.schedulerTimeout : to configure
wait time. defalt 5sec, Min 5 se
Repository: carbondata
Updated Branches:
refs/heads/master 54a381c27 -> 473bd3197
[CARBONDATA-2043] Configurable wait time for requesting executors and minimum registered executors ratio to continue the block distribution
- carbon.dynamicAllocation.schedulerTimeout : to configure wait time. defalt 5sec, Min 5 sec and max 15 sec.
- carbon.scheduler.minRegisteredResourcesRatio : min 0.1, max 1.0 and default to 0.8 to configure minimum registered executors ratio.
This closes #1822
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/473bd319
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/473bd319
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/473bd319
Branch: refs/heads/master
Commit: 473bd3197a69e3c0574f8c07f04c29e43f7a023d
Parents: 54a381c
Author: mohammadshahidkhan <mo...@gmail.com>
Authored: Fri Dec 22 17:30:31 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Feb 2 11:10:23 2018 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 71 ++++++++++-----
.../carbondata/core/util/CarbonProperties.java | 90 +++++++++++++++-----
.../core/CarbonPropertiesValidationTest.java | 42 +++++++++
.../spark/sql/hive/DistributionUtil.scala | 67 ++++++++++-----
4 files changed, 205 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7ae3034..87eec8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1149,29 +1149,6 @@ public final class CarbonCommonConstants {
*/
public static final int DEFAULT_MAX_NUMBER_OF_COLUMNS = 20000;
- /**
- * Maximum waiting time (in seconds) for a query for requested executors to be started
- */
- @CarbonProperty
- public static final String CARBON_EXECUTOR_STARTUP_TIMEOUT =
- "carbon.max.executor.startup.timeout";
-
- /**
- * default value for executor start up waiting time out
- */
- public static final String CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT = "5";
-
- /**
- * Max value. If value configured by user is more than this than this value will value will be
- * considered
- */
- public static final int CARBON_EXECUTOR_WAITING_TIMEOUT_MAX = 60;
-
- /**
- * time for which thread will sleep and check again if the requested number of executors
- * have been started
- */
- public static final int CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME = 250;
/**
* to enable unsafe column page in write step
@@ -1537,6 +1514,54 @@ public final class CarbonCommonConstants {
public static final long HANDOFF_SIZE_DEFAULT = 1024L * 1024 * 1024;
/**
+ * minimum required registered resource for starting block distribution
+ */
+ @CarbonProperty
+ public static final String CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO =
+ "carbon.scheduler.minregisteredresourcesratio";
+ /**
+ * default minimum required registered resource for starting block distribution
+ */
+ public static final String CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT = "0.8d";
+ /**
+ * minimum required registered resource for starting block distribution
+ */
+ public static final double CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN = 0.1d;
+ /**
+ * max minimum required registered resource for starting block distribution
+ */
+ public static final double CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX = 1.0d;
+
+ /**
+ * To define how much time scheduler should wait for the
+ * resource in dynamic allocation.
+ */
+ public static final String CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT =
+ "carbon.dynamicallocation.schedulertimeout";
+
+ /**
+ * default scheduler wait time
+ */
+ public static final String CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT = "5";
+
+ /**
+ * default value for executor start up waiting time out
+ */
+ public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MIN = 5;
+
+ /**
+ * Max value. If value configured by user is more than this than this value will value will be
+ * considered
+ */
+ public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MAX = 15;
+
+ /**
+ * time for which thread will sleep and check again if the requested number of executors
+ * have been started
+ */
+ public static final int CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME = 250;
+
+ /**
* It allows queries on hive metastore directly along with filter information, otherwise first
* fetches all partitions from hive and apply filters on it.
*/
http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index fd78efc..39a0b80 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -35,7 +35,33 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import static org.apache.carbondata.core.constants.CarbonCommonConstants.*;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.BLOCKLET_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_CUSTOM_BLOCK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATA_FILE_VERSION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DATE_FORMAT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_SORT_FILE_WRITE_BUFFER_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCK;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_BLOCKLET;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_CUSTOM;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TASK_DISTRIBUTION_MERGE_FILES;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.CSV_READ_BUFFER_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_AUTO_HANDOFF;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_UNSAFE_SORT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.ENABLE_VECTOR_READER;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.HANDOFF_SIZE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.LOCK_TYPE;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.NUM_CORES_BLOCK_SORT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT;
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.SORT_SIZE;
import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.BLOCKLET_SIZE_IN_MB;
import static org.apache.carbondata.core.constants.CarbonV3DataFormatConstants.NUMBER_OF_COLUMN_TO_READ_IN_IO;
@@ -106,8 +132,8 @@ public final class CarbonProperties {
case CARBON_DATA_FILE_VERSION:
validateCarbonDataFileVersion();
break;
- case CARBON_EXECUTOR_STARTUP_TIMEOUT:
- validateExecutorStartUpTime();
+ case CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT:
+ validateDynamicSchedulerTimeOut();
break;
case CARBON_PREFETCH_BUFFERSIZE:
validatePrefetchBufferSize();
@@ -156,6 +182,9 @@ public final class CarbonProperties {
case ENABLE_AUTO_HANDOFF:
validateHandoffSize();
break;
+ case CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO:
+ validateSchedulerMinRegisteredRatio();
+ break;
// TODO : Validation for carbon.lock.type should be handled for addProperty flow
default:
// none
@@ -171,7 +200,7 @@ public final class CarbonProperties {
validateNumCoresBlockSort();
validateSortSize();
validateCarbonDataFileVersion();
- validateExecutorStartUpTime();
+ validateDynamicSchedulerTimeOut();
validatePrefetchBufferSize();
validateBlockletGroupSizeInMB();
validateNumberOfColumnPerIORead();
@@ -193,6 +222,7 @@ public final class CarbonProperties {
validateSortFileWriteBufferSize();
validateSortIntermediateFilesLimit();
validateEnableAutoHandoff();
+ validateSchedulerMinRegisteredRatio();
}
/**
@@ -253,6 +283,36 @@ public final class CarbonProperties {
}
/**
+ * minimum required registered resource for starting block distribution
+ */
+ private void validateSchedulerMinRegisteredRatio() {
+ String value = carbonProperties
+ .getProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+ CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+ try {
+ double minRegisteredResourceRatio = java.lang.Double.parseDouble(value);
+ if (minRegisteredResourceRatio < CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN
+ || minRegisteredResourceRatio > CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX) {
+ LOGGER.warn("The value \"" + value
+ + "\" configured for key " + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
+ + "\" is not in range. Valid range is (byte) \""
+ + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MIN + " to \""
+ + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_MAX + ". Using the default value \""
+ + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+ carbonProperties.setProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+ CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+ }
+ } catch (NumberFormatException e) {
+ LOGGER.warn("The value \"" + value
+ + "\" configured for key " + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO
+ + "\" is invalid. Using the default value \""
+ + CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+ carbonProperties.setProperty(CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+ CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT);
+ }
+ }
+
+ /**
* The method validate the validity of configured carbon.date.format value
* and reset to default value if validation fail
*/
@@ -984,23 +1044,11 @@ public final class CarbonProperties {
/**
* This method will validate and set the value for executor start up waiting time out
*/
- private void validateExecutorStartUpTime() {
- int executorStartUpTimeOut = 0;
- try {
- executorStartUpTimeOut = Integer.parseInt(carbonProperties
- .getProperty(CARBON_EXECUTOR_STARTUP_TIMEOUT,
- CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT));
- // If value configured by user is more than max value of time out then consider the max value
- if (executorStartUpTimeOut > CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX) {
- executorStartUpTimeOut = CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_MAX;
- }
- } catch (NumberFormatException ne) {
- executorStartUpTimeOut =
- Integer.parseInt(CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT);
- }
- carbonProperties.setProperty(CARBON_EXECUTOR_STARTUP_TIMEOUT,
- String.valueOf(executorStartUpTimeOut));
- LOGGER.info("Executor start up wait time: " + executorStartUpTimeOut);
+ private void validateDynamicSchedulerTimeOut() {
+ validateRange(CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT,
+ CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT,
+ CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MIN,
+ CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_MAX);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
index daf6db0..bbfe26c 100644
--- a/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/CarbonPropertiesValidationTest.java
@@ -205,4 +205,46 @@ public class CarbonPropertiesValidationTest extends TestCase {
assertTrue(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE
.equalsIgnoreCase(valueAfterValidation));
}
+
+ @Test public void testValidateDynamicSchedulerTimeOut() {
+ carbonProperties
+ .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "2");
+ String valueAfterValidation = carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT);
+ assertTrue(valueAfterValidation
+ .equals(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT));
+ carbonProperties
+ .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "16");
+ valueAfterValidation = carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT);
+ assertTrue(valueAfterValidation
+ .equals(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT));
+ carbonProperties
+ .addProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT, "15");
+ valueAfterValidation = carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT);
+ assertTrue(valueAfterValidation
+ .equals("15"));
+
+ }
+ @Test public void testValidateSchedulerMinRegisteredRatio() {
+ carbonProperties
+ .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "0.0");
+ String valueAfterValidation = carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO);
+ assertTrue(valueAfterValidation
+ .equals(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT));
+ carbonProperties
+ .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "-0.1");
+ valueAfterValidation = carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO);
+ assertTrue(valueAfterValidation
+ .equals(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT));
+ carbonProperties
+ .addProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO, "0.1");
+ valueAfterValidation = carbonProperties
+ .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO);
+ assertTrue(valueAfterValidation.equals("0.1"));
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/473bd319/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
index 37b722f..1958d61 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/hive/DistributionUtil.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
import java.net.{InetAddress, InterfaceAddress, NetworkInterface}
import scala.collection.JavaConverters._
+import scala.util.control.Breaks._
import org.apache.spark.SparkContext
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
@@ -33,6 +34,26 @@ import org.apache.carbondata.processing.util.CarbonLoaderUtil
object DistributionUtil {
@transient
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ /*
+ * minimum required registered resource for starting block distribution
+ */
+ lazy val minRegisteredResourceRatio: Double = {
+ val value: String = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO,
+ CarbonCommonConstants.CARBON_SCHEDULER_MIN_REGISTERED_RESOURCES_RATIO_DEFAULT)
+ java.lang.Double.parseDouble(value)
+ }
+
+ /*
+ * node registration wait time
+ */
+ lazy val dynamicAllocationSchTimeOut: Integer = {
+ val value: String = CarbonProperties.getInstance()
+ .getProperty(CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT,
+ CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_TIMEOUT_DEFAULT)
+ // milli second
+ java.lang.Integer.parseInt(value) * 1000
+ }
/*
* This method will return the list of executers in the cluster.
@@ -202,18 +223,25 @@ object DistributionUtil {
var nodes = DistributionUtil.getNodeList(sparkContext)
// calculate the number of times loop has to run to check for starting
// the requested number of executors
- val threadSleepTime = CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_THREAD_SLEEP_TIME
- val loopCounter = calculateCounterBasedOnExecutorStartupTime(threadSleepTime)
- var maxTimes = loopCounter
- while (nodes.length < requiredExecutors && maxTimes > 0) {
- Thread.sleep(threadSleepTime)
- nodes = DistributionUtil.getNodeList(sparkContext)
- maxTimes = maxTimes - 1
+ val threadSleepTime =
+ CarbonCommonConstants.CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME
+ val maxRetryCount = calculateMaxRetry
+ var maxTimes = maxRetryCount
+ breakable {
+ while (nodes.length < requiredExecutors && maxTimes > 0) {
+ Thread.sleep(threadSleepTime);
+ nodes = DistributionUtil.getNodeList(sparkContext)
+ maxTimes = maxTimes - 1;
+ val resourceRatio = (nodes.length.toDouble / requiredExecutors)
+ if (resourceRatio.compareTo(minRegisteredResourceRatio) >= 0) {
+ break
+ }
+ }
}
val timDiff = System.currentTimeMillis() - startTime
LOGGER.info(s"Total Time taken to ensure the required executors : $timDiff")
LOGGER.info(s"Time elapsed to allocate the required executors: " +
- s"${(loopCounter - maxTimes) * threadSleepTime}")
+ s"${(maxRetryCount - maxTimes) * threadSleepTime}")
nodes.distinct.toSeq
}
@@ -245,21 +273,18 @@ object DistributionUtil {
/**
* This method will calculate how many times a loop will run with an interval of given sleep
* time to wait for requested executors to come up
- *
- * @param threadSleepTime
- * @return
+ *
+ * @return The max retry count
*/
- private def calculateCounterBasedOnExecutorStartupTime(threadSleepTime: Int): Int = {
- var executorStartUpTimeOut = CarbonProperties.getInstance
- .getProperty(CarbonCommonConstants.CARBON_EXECUTOR_STARTUP_TIMEOUT,
- CarbonCommonConstants.CARBON_EXECUTOR_WAITING_TIMEOUT_DEFAULT).toInt
- // convert seconds into milliseconds for loop counter calculation
- executorStartUpTimeOut = executorStartUpTimeOut * 1000
- // make executor start up time exactly divisible by thread sleep time
- val remainder = executorStartUpTimeOut % threadSleepTime
+ def calculateMaxRetry(): Int = {
+ val remainder = dynamicAllocationSchTimeOut % CarbonCommonConstants
+ .CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME
+ val retryCount: Int = dynamicAllocationSchTimeOut / CarbonCommonConstants
+ .CARBON_DYNAMIC_ALLOCATION_SCHEDULER_THREAD_SLEEP_TIME
if (remainder > 0) {
- executorStartUpTimeOut = executorStartUpTimeOut + threadSleepTime - remainder
+ retryCount + 1
+ } else {
+ retryCount
}
- executorStartUpTimeOut / threadSleepTime
}
}