You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2020/06/02 22:46:31 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ab6200  [GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[]
2ab6200 is described below

commit 2ab6200116062f49f9d3c7854270ea19490d8ba1
Author: sv2000 <su...@gmail.com>
AuthorDate: Tue Jun 2 15:46:24 2020 -0700

    [GOBBLIN-1177] Provide a config for overprovisioning Gobblin Yarn containers by a configurable amount[]
    
    Closes #3023 from sv2000/containerOverProvision
---
 .../gobblin/yarn/YarnAutoScalingManager.java       | 25 ++++---
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 85 +++++++++++++++++++---
 2 files changed, 92 insertions(+), 18 deletions(-)

diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index d08a4c4..458527a 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -29,8 +29,6 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
@@ -50,6 +48,9 @@ import com.typesafe.config.Config;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+
 import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
 
 
@@ -69,6 +70,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
+  private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor";
+  private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0;
+
   // A rough value of how much containers should be an intolerable number.
   private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
@@ -85,6 +89,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   private final int partitionsPerContainer;
   private final int minContainers;
   private final int maxContainers;
+  private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
   private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
 
@@ -107,6 +112,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     this.maxContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MAX_CONTAINERS,
         DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
 
+    this.overProvisionFactor = ConfigUtils.getDouble(this.config, AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
+        DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
+
     Preconditions.checkArgument(this.maxContainers > 0,
         DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
 
@@ -123,7 +131,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   }
 
   @Override
-  protected void startUp() throws Exception {
+  protected void startUp() {
     int scheduleInterval = ConfigUtils.getInt(this.config, AUTO_SCALING_POLLING_INTERVAL_SECS,
         DEFAULT_AUTO_SCALING_POLLING_INTERVAL_SECS);
     int initialDelay = ConfigUtils.getInt(this.config, AUTO_SCALING_INITIAL_DELAY,
@@ -132,13 +140,13 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);
 
     this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
-            this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers,
+            this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers, this.overProvisionFactor,
             this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
         TimeUnit.SECONDS);
   }
 
   @Override
-  protected void shutDown() throws Exception {
+  protected void shutDown() {
     log.info("Stopping the " + YarnAutoScalingManager.class.getSimpleName());
 
     ExecutorsUtils.shutdownExecutorService(this.autoScalingExecutor, Optional.of(log));
@@ -156,6 +164,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     private final int partitionsPerContainer;
     private final int minContainers;
     private final int maxContainers;
+    private final double overProvisionFactor;
     private final SlidingWindowReservoir slidingWindowReservoir;
     private final HelixDataAccessor helixDataAccessor;
     /**
@@ -245,11 +254,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
         }
       }
 
-
-
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
-      // per partition.
-      int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
+      // per partition. Scale the result by a constant overprovision factor.
+      int numTargetContainers = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
 
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index ba6c1db..6c40471 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -82,7 +82,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
-            1, 10, noopQueue, helixDataAccessor);
+            1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -132,7 +132,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, noopQueue, helixDataAccessor);
+            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -201,7 +201,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, noopQueue, helixDataAccessor);
+            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -270,7 +270,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, noopQueue, helixDataAccessor);
+            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -314,7 +314,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            2, 1, 10, noopQueue, helixDataAccessor);
+            2, 1, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -359,7 +359,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 5, 10, noopQueue, helixDataAccessor);
+            1, 5, 10, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -368,7 +368,6 @@ public class YarnAutoScalingManagerTest {
         .requestTargetNumberOfContainers(5, ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
-
   /**
    * Test max containers
    */
@@ -404,7 +403,7 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 1, noopQueue, helixDataAccessor);
+            1, 1, 1, 1.0, noopQueue, helixDataAccessor);
 
     runnable.run();
 
@@ -413,6 +412,74 @@ public class YarnAutoScalingManagerTest {
         .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
+  @Test
+  public void testOverprovision() {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+
+    Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, 1.2, noopQueue, helixDataAccessor);
+
+    runnable1.run();
+
+    // 3 containers requested to max and one worker in use
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10
+    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, 0.1, noopQueue, helixDataAccessor);
+
+    runnable2.run();
+
+    // 3 containers requested to max and one worker in use
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10
+    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, 6.0, noopQueue, helixDataAccessor);
+
+    runnable3.run();
+
+    // 3 containers requested to max and one worker in use
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
+    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+  }
+
   /**
    * Test suppressed exception
    */
@@ -542,7 +609,7 @@ public class YarnAutoScalingManagerTest {
 
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer,
         int minContainers, int maxContainers, HelixDataAccessor helixDataAccessor) {
-      super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, noopQueue, helixDataAccessor);
+      super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, 1.0, noopQueue, helixDataAccessor);
     }
 
     @Override