You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/09 16:42:23 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-1072] Being more conservative on releasing containers

This is an automated email from the ASF dual-hosted git repository.

lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new f3d75e3  [GOBBLIN-1072] Being more conservative on releasing containers
f3d75e3 is described below

commit f3d75e392c554771e3c098cafb648ec55fa40ba7
Author: autumnust <le...@linkedin.com>
AuthorDate: Mon Mar 9 09:42:14 2020 -0700

    [GOBBLIN-1072] Being more conservative on releasing containers
    
    Add sliding window to protect AutoScaling from
    fluctuation of number of active Helix Partitions
    
    Less conservative on tagging an instance as unused
    
    Add a unit test
    
    Address comments
    
    Closes #2912 from
    autumnust/conservativeReleaseYarnContainers
---
 .../cluster/GobblinHelixMessagingService.java      |   2 -
 .../apache/gobblin/yarn/GobblinYarnTaskRunner.java |   2 +
 .../gobblin/yarn/YarnAutoScalingManager.java       | 155 ++++++++++++++-
 .../java/org/apache/gobblin/yarn/YarnService.java  |   5 +-
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 214 ++++++++++++++++++---
 5 files changed, 338 insertions(+), 40 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
index 2132335..21e3877 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
@@ -77,8 +77,6 @@ public class GobblinHelixMessagingService extends DefaultMessagingService {
     if (instanceType == InstanceType.CONTROLLER) {
       List<Message> messages = generateMessagesForController(message);
       messagesToSendMap.put(InstanceType.CONTROLLER, messages);
-      // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
-      // newMessage.getRecord(), CreateMode.PERSISTENT);
     } else if (instanceType == InstanceType.PARTICIPANT) {
       List<Message> messages = new ArrayList<Message>();
       List<Map<String, String>> matchedList =
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 5b3fa7c..dc68162 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -57,6 +57,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
 
+  public static final String HELIX_YARN_INSTANCE_NAME_PREFIX = GobblinYarnTaskRunner.class.getSimpleName();
+
   public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
       Optional<Path> appWorkDirOptional) throws Exception {
     super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 0742f54..d08a4c4 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,15 +17,23 @@
 
 package org.apache.gobblin.yarn;
 
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
@@ -42,8 +50,7 @@ import com.typesafe.config.Config;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
+import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
 
 
 /**
@@ -62,10 +69,15 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
   private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
   private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
+  // A rough value of how much containers should be an intolerable number.
   private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
+  private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize";
+
+  private final static int DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+
   private final Config config;
   private final HelixManager helixManager;
   private final ScheduledExecutorService autoScalingExecutor;
@@ -73,6 +85,8 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   private final int partitionsPerContainer;
   private final int minContainers;
   private final int maxContainers;
+  private final SlidingWindowReservoir slidingFixedSizeWindow;
+  private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
 
   public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
     this.config = appMaster.getConfig();
@@ -100,6 +114,10 @@ public class YarnAutoScalingManager extends AbstractIdleService {
         DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or equal to "
             + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
 
+    this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
+        ? new SlidingWindowReservoir(maxContainers, config.getInt(AUTO_SCALING_WINDOW_SIZE))
+        : new SlidingWindowReservoir(maxContainers);
+
     this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("AutoScalingExecutor")));
   }
@@ -114,8 +132,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);
 
     this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
-            this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers), initialDelay,
-        scheduleInterval, TimeUnit.SECONDS);
+            this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers,
+            this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
+        TimeUnit.SECONDS);
   }
 
   @Override
@@ -137,6 +156,13 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     private final int partitionsPerContainer;
     private final int minContainers;
     private final int maxContainers;
+    private final SlidingWindowReservoir slidingWindowReservoir;
+    private final HelixDataAccessor helixDataAccessor;
+    /**
+     * A static map that keep track of an idle instance and its latest beginning idle time.
+     * If an instance is no longer idle when inspected, it will be dropped from this map.
+     */
+    private static final Map<String, Long> instanceIdleSince = new HashMap<>();
 
 
     @Override
@@ -150,6 +176,17 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     }
 
     /**
+     * Getting all instances (Helix Participants) in cluster at this moment.
+     * Note that the raw result could contains AppMaster node and replanner node.
+     * @param filterString Helix instances whose name containing fitlerString will pass filtering.
+     */
+    private Set<String> getParticipants(String filterString) {
+      PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+      return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
+          .keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
+    }
+
+    /**
      * Iterate through the workflows configured in Helix to figure out the number of required partitions
      * and request the {@link YarnService} to scale to the desired number of containers.
      */
@@ -189,6 +226,27 @@ public class YarnAutoScalingManager extends AbstractIdleService {
         }
       }
 
+      // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+      // and potentially replanner-instance.
+      Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
+
+      // Find all joined participants not in-use for this round of inspection.
+      // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+      for (String participant : allParticipants) {
+        if (!inUseInstances.contains(participant)) {
+          instanceIdleSince.putIfAbsent(participant, System.currentTimeMillis());
+          if (!isInstanceUnused(participant)) {
+            inUseInstances.add(participant);
+          }
+        } else {
+          // A previously idle instance is now detected to be in use.
+          // Remove this instance if existed in the tracking map.
+          instanceIdleSince.remove(participant);
+        }
+      }
+
+
+
       // compute the target containers as a ceiling of number of partitions divided by the number of containers
       // per partition.
       int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
@@ -196,9 +254,96 @@ public class YarnAutoScalingManager extends AbstractIdleService {
       // adjust the number of target containers based on the configured min and max container values.
       numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
 
+      slidingWindowReservoir.add(numTargetContainers);
+
       log.info("There are {} containers being requested", numTargetContainers);
 
-      this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+      this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
+    }
+
+    @VisibleForTesting
+    /**
+     * Return true is the condition for tagging an instance as "unused" holds.
+     * The condition, by default is that if an instance went back to
+     * active (having partition running on it) within {@link #maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+     * not tag that instance as "unused" and have that as the candidate for scaling down.
+     */
+    boolean isInstanceUnused(String participant){
+      return System.currentTimeMillis() - instanceIdleSince.get(participant) >
+          TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
+    }
+  }
+
+  /**
+   * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+   * This data structure prevents temporary fluctuation in the number of active helix partitions as the size of queue
+   * grows and will be less sensitive when scaling down is actually required.
+   *
+   * The interface for this is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+   * which captures max value. It is NOT built for general purpose.
+   */
+  static class SlidingWindowReservoir {
+    private ArrayDeque<Integer> fifoQueue;
+    private PriorityQueue<Integer> priorityQueue;
+
+    // Queue Size
+    private int maxSize;
+    private static final int DEFAULT_MAX_SIZE = 10;
+
+    // Upper-bound of value within the queue.
+    private int upperBound;
+
+    public SlidingWindowReservoir(int maxSize, int upperBound) {
+      Preconditions.checkArgument(maxSize > 0, "maxSize has to be a value larger than 0");
+
+      this.maxSize = maxSize;
+      this.upperBound = upperBound;
+      this.fifoQueue = new ArrayDeque<>(maxSize);
+      this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return o2.compareTo(o1);
+        }
+      });
+    }
+
+    public SlidingWindowReservoir(int upperBound) {
+      this(DEFAULT_MAX_SIZE, upperBound);
+    }
+
+    /**
+     * Add element into data structure.
+     * When a new element is larger than upperbound, reject the value since we may request too many Yarn containers.
+     * When queue is full, evict head of FIFO-queue (In FIFO queue, elements are inserted from tail).
+     */
+    public void add(int e) {
+      if (e > upperBound) {
+        log.error(String.format("Request of getting %s containers seems to be excessive, rejected", e));
+        return;
+      }
+
+      if (fifoQueue.size() == maxSize) {
+        Integer removedElement = fifoQueue.remove();
+        priorityQueue.remove(removedElement);
+      }
+
+      if (fifoQueue.size() == priorityQueue.size()) {
+        fifoQueue.add(e);
+        priorityQueue.add(e);
+      } else {
+        throw new IllegalStateException("Queue has its internal data structure being inconsistent.");
+      }
+    }
+
+    /**
+     * If queue is empty, throw {@link IllegalStateException}.
+     */
+    public int getMax() {
+      if (priorityQueue.size() > 0) {
+        return this.priorityQueue.peek();
+      } else {
+        throw new IllegalStateException("Queried before elements added into the queue.");
+      }
     }
   }
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 6476c99..0c649ba 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -108,6 +108,8 @@ import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
 import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
 import org.apache.gobblin.yarn.event.NewContainerRequest;
 
+import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
+
 
 /**
  * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration,
@@ -136,6 +138,7 @@ public class YarnService extends AbstractIdleService {
   private final Optional<GobblinMetrics> gobblinMetrics;
   private final Optional<EventSubmitter> eventSubmitter;
 
+
   @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
   private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
@@ -720,7 +723,7 @@ public class YarnService extends AbstractIdleService {
         String instanceName = unusedHelixInstanceNames.poll();
         if (Strings.isNullOrEmpty(instanceName)) {
           // No unused instance name, so generating a new one.
-          instanceName = HelixUtils.getHelixInstanceName(GobblinYarnTaskRunner.class.getSimpleName(),
+          instanceName = HelixUtils.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX,
               helixInstanceIdGenerator.incrementAndGet());
         }
 
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 58d639e..ba6c1db 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.yarn;
 
 import java.io.IOException;
 
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
@@ -26,6 +29,7 @@ import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
 import org.mockito.Mockito;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.collect.ImmutableMap;
@@ -40,6 +44,9 @@ import static org.mockito.Mockito.times;
  */
 @Test(groups = { "gobblin.yarn" })
 public class YarnAutoScalingManagerTest {
+  // A queue within size == 1 and upperBound == "infinite" should not impact on the execution.
+  private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue =
+      new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
   /**
    * Test for one workflow with one job
    */
@@ -64,17 +71,24 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext = mock(JobContext.class);
     Mockito.when(mockJobContext.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
 
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            1, 10, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 2 containers requested and one worker in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(2, ImmutableSet.of("worker1"));
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   /**
@@ -101,22 +115,30 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext1 = mock(JobContext.class);
     Mockito.when(mockJobContext1.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
 
     JobContext mockJobContext2 = mock(JobContext.class);
     Mockito.when(mockJobContext2.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
     Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+            "GobblinYarnTaskRunner-2", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(3, ImmutableSet.of("worker1", "worker2"));
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
   }
 
   /**
@@ -141,13 +163,13 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext1 = mock(JobContext.class);
     Mockito.when(mockJobContext1.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
 
     JobContext mockJobContext2 = mock(JobContext.class);
     Mockito.when(mockJobContext2.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
     Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
 
     WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
@@ -164,20 +186,28 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext3 = mock(JobContext.class);
     Mockito.when(mockJobContext3.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-    Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+    Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
     Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
 
     Mockito.when(mockTaskDriver.getWorkflows())
         .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2));
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+            "GobblinYarnTaskRunner-2", new HelixProperty(""),
+            "GobblinYarnTaskRunner-3", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 5 containers requested and 3 workers in use
     Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(5,
-        ImmutableSet.of("worker1", "worker2", "worker3"));
+        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3"));
   }
 
   /**
@@ -203,13 +233,13 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext1 = mock(JobContext.class);
     Mockito.when(mockJobContext1.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
 
     JobContext mockJobContext2 = mock(JobContext.class);
     Mockito.when(mockJobContext2.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
-    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
     Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
 
     WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
@@ -226,20 +256,27 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext3 = mock(JobContext.class);
     Mockito.when(mockJobContext3.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
-    Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+    Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
     Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
 
     Mockito.when(mockTaskDriver.getWorkflows())
         .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2));
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+            "GobblinYarnTaskRunner-2", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
     Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(3,
-        ImmutableSet.of("worker1", "worker2"));
+        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
   }
 
   /**
@@ -266,17 +303,24 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext = mock(JobContext.class);
     Mockito.when(mockJobContext.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
 
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 2, 1, 10);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            2, 1, 10, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 1 container requested since 2 partitions and limit is 2 partitions per container. One worker in use.
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
 
@@ -304,17 +348,24 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext = mock(JobContext.class);
     Mockito.when(mockJobContext.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
 
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 5, 10);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 5, 10, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 5 containers requested due to min and one worker in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(5, ImmutableSet.of("worker1"));
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(5, ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
 
@@ -342,17 +393,24 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext = mock(JobContext.class);
     Mockito.when(mockJobContext.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
 
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 1, noopQueue, helixDataAccessor);
 
     runnable.run();
 
     // 1 container requested to max and one worker in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+    Mockito.verify(mockYarnService, times(1))
+        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
   }
 
   /**
@@ -379,29 +437,112 @@ public class YarnAutoScalingManagerTest {
     JobContext mockJobContext = mock(JobContext.class);
     Mockito.when(mockJobContext.getPartitionSet())
         .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
 
     Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
 
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
     TestYarnAutoScalingRunnable runnable =
-        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1);
+        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1, helixDataAccessor);
 
     runnable.setRaiseException(true);
     runnable.run();
-    Mockito.verify(mockYarnService, times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+    Mockito.verify(mockYarnService, times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
 
+    Mockito.reset(mockYarnService);
     runnable.setRaiseException(false);
     runnable.run();
     // 1 container requested to max and one worker in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+  }
+
+  public void testMaxValueEvictingQueue() throws Exception {
+    YarnAutoScalingManager.SlidingWindowReservoir window = new YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
+
+    // Normal insertion with eviction of originally largest value
+    window.add(3);
+    window.add(1);
+    window.add(2);
+    // Now it contains [3,1,2]
+    Assert.assertEquals(window.getMax(), 3);
+    window.add(1);
+    // Now it contains [1,2,1]
+    Assert.assertEquals(window.getMax(), 2);
+    window.add(5);
+    Assert.assertEquals(window.getMax(), 5);
+    // Now it contains [2,1,5]
+    window.add(11);
+    // Still [2,1,5] as 11 > 10 thereby being rejected.
+    Assert.assertEquals(window.getMax(), 5);
+  }
+
+  /**
+   * Test the scenarios when an instance in cluster has no participants assigned for too long and got tagged as the
+   * candidate for scaling-down.
+   */
+  @Test
+  public void testInstanceIdleBeyondTolerance() throws IOException {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+    JobDag mockJobDag = mock(JobDag.class);
+
+    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+    Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+    // Having both partition assigned to single instance initially, in this case, GobblinYarnTaskRunner-2
+    JobContext mockJobContext = mock(JobContext.class);
+    Mockito.when(mockJobContext.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    Mockito.when(mockJobContext.getAssignedParticipant(1)).thenReturn("GobblinYarnTaskRunner-2");
+    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-2");
+
+    Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+            "GobblinYarnTaskRunner-2", new HelixProperty("")));
+
+    TestYarnAutoScalingRunnable runnable = new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+            1, 1, 10, helixDataAccessor);
+
+    runnable.run();
+
+    // 2 containers requested and one worker in use, while the evaluation will hold for true if not set externally,
+    // still tell YarnService there are two instances being used.
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+
+    // Set failEvaluation which simulates the "beyond tolerance" case.
+    Mockito.reset(mockYarnService);
+    runnable.setAlwaysTagUnused(true);
+    runnable.run();
+
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-2"));
   }
 
   private static class TestYarnAutoScalingRunnable extends YarnAutoScalingManager.YarnAutoScalingRunnable {
     boolean raiseException = false;
+    boolean alwaysUnused = false;
 
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer,
-        int minContainers, int maxContainers) {
-      super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers);
+        int minContainers, int maxContainers, HelixDataAccessor helixDataAccessor) {
+      super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, noopQueue, helixDataAccessor);
     }
 
     @Override
@@ -416,5 +557,14 @@ public class YarnAutoScalingManagerTest {
     void setRaiseException(boolean raiseException) {
       this.raiseException = raiseException;
     }
+
+    void setAlwaysTagUnused(boolean alwaysUnused) {
+      this.alwaysUnused = alwaysUnused;
+    }
+
+    @Override
+    boolean isInstanceUnused(String participant) {
+      return alwaysUnused || super.isInstanceUnused(participant);
+    }
   }
 }