You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/02 22:46:31 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1177] Provide a
config for overprovisioning Gobblin Yarn containers by a configurable
amount[]
This is an automated email from the ASF dual-hosted git repository.
suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 2ab6200 [GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[]
2ab6200 is described below
commit 2ab6200116062f49f9d3c7854270ea19490d8ba1
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Jun 2 15:46:24 2020 -0700
[GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[]
Closes #3023 from sv2000/containerOverProvision
---
.../gobblin/yarn/YarnAutoScalingManager.java | 25 ++++---
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 85 +++++++++++++++++++---
2 files changed, 92 insertions(+), 18 deletions(-)
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index d08a4c4..458527a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -29,8 +29,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
@@ -50,6 +48,9 @@ import com.typesafe.config.Config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
@@ -69,6 +70,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
+ private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor";
+ private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0;
+
// A rough value of how much containers should be an intolerable number.
private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
@@ -85,6 +89,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final int partitionsPerContainer;
private final int minContainers;
private final int maxContainers;
+ private final double overProvisionFactor;
private final SlidingWindowReservoir slidingFixedSizeWindow;
private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
@@ -107,6 +112,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
this.maxContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MAX_CONTAINERS,
DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
+ this.overProvisionFactor = ConfigUtils.getDouble(this.config, AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
+ DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
+
Preconditions.checkArgument(this.maxContainers > 0,
DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
@@ -123,7 +131,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
}
@Override
- protected void startUp() throws Exception {
+ protected void startUp() {
int scheduleInterval = ConfigUtils.getInt(this.config, AUTO_SCALING_POLLING_INTERVAL_SECS,
DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
int initialDelay = ConfigUtils.getInt(this.config, AUTO_SCALING_INITIAL_DELAY,
@@ -132,13 +140,13 @@ public class YarnAutoScalingManager extends AbstractIdleService {
log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);
this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
- this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers,
+ this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers, this.overProvisionFactor,
this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
TimeUnit.SECONDS);
}
@Override
- protected void shutDown() throws Exception {
+ protected void shutDown() {
log.info("Stopping the " + YarnAutoScalingManager.class.getSimpleName());
ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor, Optional.of(log));
@@ -156,6 +164,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final int partitionsPerContainer;
private final int minContainers;
private final int maxContainers;
+ private final double overProvisionFactor;
private final SlidingWindowReservoir slidingWindowReservoir;
private final HelixDataAccessor helixDataAccessor;
/**
@@ -245,11 +254,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
}
}
-
-
// compute the target containers as a ceiling of number of partitions divided by the number of containers
- // per partition.
- int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
+ // per partition. Scale the result by a constant overprovision factor.
+ int numTargetContainers = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
// adjust the number of target containers based on the configured min and max container values.
numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index ba6c1db..6c40471 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -82,7 +82,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
- 1, 10, noopQueue, helixDataAccessor);
+ 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -132,7 +132,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, 1, 10, noopQueue, helixDataAccessor);
+ 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -201,7 +201,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, 1, 10, noopQueue, helixDataAccessor);
+ 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -270,7 +270,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, 1, 10, noopQueue, helixDataAccessor);
+ 1, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -314,7 +314,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 2, 1, 10, noopQueue, helixDataAccessor);
+ 2, 1, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -359,7 +359,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, 5, 10, noopQueue, helixDataAccessor);
+ 1, 5, 10, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -368,7 +368,6 @@ public class YarnAutoScalingManagerTest {
.requestTargetNumberOfContainers(5, ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
-
/**
* Test max containers
*/
@@ -404,7 +403,7 @@ public class YarnAutoScalingManagerTest {
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
- 1, 1, 1, noopQueue, helixDataAccessor);
+ 1, 1, 1, 1.0, noopQueue, helixDataAccessor);
runnable.run();
@@ -413,6 +412,74 @@ public class YarnAutoScalingManagerTest {
.requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
+ @Test
+ public void testOverprovision() {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+ Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+ Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
+ Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, 1.2, noopQueue, helixDataAccessor);
+
+ runnable1.run();
+
+ // 3 containers requested to max and one worker in use
+ // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10
+ // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, 0.1, noopQueue, helixDataAccessor);
+
+ runnable2.run();
+
+ // 3 containers requested to max and one worker in use
+ // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10
+ // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+
+ YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, 6.0, noopQueue, helixDataAccessor);
+
+ runnable3.run();
+
+ // 3 containers requested to max and one worker in use
+ // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
+ // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ }
+
/**
* Test suppressed exception
*/
@@ -542,7 +609,7 @@ public class YarnAutoScalingManagerTest {
public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer,
int minContainers, int maxContainers, HelixDataAccessor helixDataAccessor) {
- super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, noopQueue, helixDataAccessor);
+ super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, 1.0, noopQueue, helixDataAccessor);
}
@Override