You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by le...@apache.org on 2020/03/09 16:42:23 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-1072] Being
more conservative on releasing containers
This is an automated email from the ASF dual-hosted git repository.
lesun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new f3d75e3 [GOBBLIN-1072] Being more conservative on releasing containers
f3d75e3 is described below
commit f3d75e392c554771e3c098cafb648ec55fa40ba7
Author: autumnust <le...@linkedin.com>
AuthorDate: Mon Mar 9 09:42:14 2020 -0700
[GOBBLIN-1072] Being more conservative on releasing containers
Add sliding window to protect AutoScaling from
fluctuation of number of active Helix Partitions
Less conservative on tagging an instance as unused
Add a unit test
Address comments
Closes #2912 from
autumnust/conservativeReleaseYarnContainers
---
.../cluster/GobblinHelixMessagingService.java | 2 -
.../apache/gobblin/yarn/GobblinYarnTaskRunner.java | 2 +
.../gobblin/yarn/YarnAutoScalingManager.java | 155 ++++++++++++++-
.../java/org/apache/gobblin/yarn/YarnService.java | 5 +-
.../gobblin/yarn/YarnAutoScalingManagerTest.java | 214 ++++++++++++++++++---
5 files changed, 338 insertions(+), 40 deletions(-)
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
index 2132335..21e3877 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMessagingService.java
@@ -77,8 +77,6 @@ public class GobblinHelixMessagingService extends DefaultMessagingService {
if (instanceType == InstanceType.CONTROLLER) {
List<Message> messages = generateMessagesForController(message);
messagesToSendMap.put(InstanceType.CONTROLLER, messages);
- // _dataAccessor.setControllerProperty(PropertyType.MESSAGES,
- // newMessage.getRecord(), CreateMode.PERSISTENT);
} else if (instanceType == InstanceType.PARTICIPANT) {
List<Message> messages = new ArrayList<Message>();
List<Map<String, String>> matchedList =
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
index 5b3fa7c..dc68162 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnTaskRunner.java
@@ -57,6 +57,8 @@ public class GobblinYarnTaskRunner extends GobblinTaskRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(GobblinTaskRunner.class);
+ public static final String HELIX_YARN_INSTANCE_NAME_PREFIX = GobblinYarnTaskRunner.class.getSimpleName();
+
public GobblinYarnTaskRunner(String applicationName, String helixInstanceName, ContainerId containerId, Config config,
Optional<Path> appWorkDirOptional) throws Exception {
super(applicationName, helixInstanceName, getApplicationId(containerId), getTaskRunnerId(containerId),
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 0742f54..d08a4c4 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,15 +17,23 @@
package org.apache.gobblin.yarn;
+import java.util.ArrayDeque;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
@@ -42,8 +50,7 @@ import com.typesafe.config.Config;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.ExecutorsUtils;
+import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
/**
@@ -62,10 +69,15 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
+ // A rough value of how much containers should be an intolerable number.
private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
+ private final String AUTO_SCALING_WINDOW_SIZE = AUTO_SCALING_PREFIX + "windowSize";
+
+ private final static int DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES = 10;
+
private final Config config;
private final HelixManager helixManager;
private final ScheduledExecutorService autoScalingExecutor;
@@ -73,6 +85,8 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final int partitionsPerContainer;
private final int minContainers;
private final int maxContainers;
+ private final SlidingWindowReservoir slidingFixedSizeWindow;
+ private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
public YarnAutoScalingManager(GobblinApplicationMaster appMaster) {
this.config = appMaster.getConfig();
@@ -100,6 +114,10 @@ public class YarnAutoScalingManager extends AbstractIdleService {
DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or equal to "
+ DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
+ this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
+ ? new SlidingWindowReservoir(maxContainers, config.getInt(AUTO_SCALING_WINDOW_SIZE))
+ : new SlidingWindowReservoir(maxContainers);
+
this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("AutoScalingExecutor")));
}
@@ -114,8 +132,9 @@ public class YarnAutoScalingManager extends AbstractIdleService {
log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);
this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
- this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers), initialDelay,
- scheduleInterval, TimeUnit.SECONDS);
+ this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers,
+ this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
+ TimeUnit.SECONDS);
}
@Override
@@ -137,6 +156,13 @@ public class YarnAutoScalingManager extends AbstractIdleService {
private final int partitionsPerContainer;
private final int minContainers;
private final int maxContainers;
+ private final SlidingWindowReservoir slidingWindowReservoir;
+ private final HelixDataAccessor helixDataAccessor;
+ /**
+ * A static map that keep track of an idle instance and its latest beginning idle time.
+ * If an instance is no longer idle when inspected, it will be dropped from this map.
+ */
+ private static final Map<String, Long> instanceIdleSince = new HashMap<>();
@Override
@@ -150,6 +176,17 @@ public class YarnAutoScalingManager extends AbstractIdleService {
}
/**
+ * Getting all instances (Helix Participants) in cluster at this moment.
+ * Note that the raw result could contains AppMaster node and replanner node.
+ * @param filterString Helix instances whose name containing fitlerString will pass filtering.
+ */
+ private Set<String> getParticipants(String filterString) {
+ PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
+ return helixDataAccessor.getChildValuesMap(keyBuilder.liveInstances())
+ .keySet().stream().filter(x -> filterString.isEmpty() || x.contains(filterString)).collect(Collectors.toSet());
+ }
+
+ /**
* Iterate through the workflows configured in Helix to figure out the number of required partitions
* and request the {@link YarnService} to scale to the desired number of containers.
*/
@@ -189,6 +226,27 @@ public class YarnAutoScalingManager extends AbstractIdleService {
}
}
+ // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
+ // and potentially replanner-instance.
+ Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
+
+ // Find all joined participants not in-use for this round of inspection.
+ // If idle time is beyond tolerance, mark the instance as unused by assigning timestamp as -1.
+ for (String participant : allParticipants) {
+ if (!inUseInstances.contains(participant)) {
+ instanceIdleSince.putIfAbsent(participant, System.currentTimeMillis());
+ if (!isInstanceUnused(participant)) {
+ inUseInstances.add(participant);
+ }
+ } else {
+ // A previously idle instance is now detected to be in use.
+ // Remove this instance if existed in the tracking map.
+ instanceIdleSince.remove(participant);
+ }
+ }
+
+
+
// compute the target containers as a ceiling of number of partitions divided by the number of containers
// per partition.
int numTargetContainers = (int) Math.ceil((double)numPartitions / this.partitionsPerContainer);
@@ -196,9 +254,96 @@ public class YarnAutoScalingManager extends AbstractIdleService {
// adjust the number of target containers based on the configured min and max container values.
numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
+ slidingWindowReservoir.add(numTargetContainers);
+
log.info("There are {} containers being requested", numTargetContainers);
- this.yarnService.requestTargetNumberOfContainers(numTargetContainers, inUseInstances);
+ this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
+ }
+
+ @VisibleForTesting
+ /**
+ * Return true is the condition for tagging an instance as "unused" holds.
+ * The condition, by default is that if an instance went back to
+ * active (having partition running on it) within {@link #maxIdleTimeInMinutesBeforeScalingDown} minutes, we will
+ * not tag that instance as "unused" and have that as the candidate for scaling down.
+ */
+ boolean isInstanceUnused(String participant){
+ return System.currentTimeMillis() - instanceIdleSince.get(participant) >
+ TimeUnit.MINUTES.toMillis(maxIdleTimeInMinutesBeforeScalingDown);
+ }
+ }
+
+ /**
+ * A FIFO queue with fixed size and returns maxValue among all elements within the queue in constant time.
+ * This data structure prevents temporary fluctuation in the number of active helix partitions as the size of queue
+ * grows and will be less sensitive when scaling down is actually required.
+ *
+ * The interface for this is implemented in a minimal-necessity manner to serve only as a sliding-sized-window
+ * which captures max value. It is NOT built for general purpose.
+ */
+ static class SlidingWindowReservoir {
+ private ArrayDeque<Integer> fifoQueue;
+ private PriorityQueue<Integer> priorityQueue;
+
+ // Queue Size
+ private int maxSize;
+ private static final int DEFAULT_MAX_SIZE = 10;
+
+ // Upper-bound of value within the queue.
+ private int upperBound;
+
+ public SlidingWindowReservoir(int maxSize, int upperBound) {
+ Preconditions.checkArgument(maxSize > 0, "maxSize has to be a value larger than 0");
+
+ this.maxSize = maxSize;
+ this.upperBound = upperBound;
+ this.fifoQueue = new ArrayDeque<>(maxSize);
+ this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return o2.compareTo(o1);
+ }
+ });
+ }
+
+ public SlidingWindowReservoir(int upperBound) {
+ this(DEFAULT_MAX_SIZE, upperBound);
+ }
+
+ /**
+ * Add element into data structure.
+ * When a new element is larger than upperbound, reject the value since we may request too many Yarn containers.
+ * When queue is full, evict head of FIFO-queue (In FIFO queue, elements are inserted from tail).
+ */
+ public void add(int e) {
+ if (e > upperBound) {
+ log.error(String.format("Request of getting %s containers seems to be excessive, rejected", e));
+ return;
+ }
+
+ if (fifoQueue.size() == maxSize) {
+ Integer removedElement = fifoQueue.remove();
+ priorityQueue.remove(removedElement);
+ }
+
+ if (fifoQueue.size() == priorityQueue.size()) {
+ fifoQueue.add(e);
+ priorityQueue.add(e);
+ } else {
+ throw new IllegalStateException("Queue has its internal data structure being inconsistent.");
+ }
+ }
+
+ /**
+ * If queue is empty, throw {@link IllegalStateException}.
+ */
+ public int getMax() {
+ if (priorityQueue.size() > 0) {
+ return this.priorityQueue.peek();
+ } else {
+ throw new IllegalStateException("Queried before elements added into the queue.");
+ }
}
}
}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 6476c99..0c649ba 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -108,6 +108,8 @@ import org.apache.gobblin.yarn.event.ContainerReleaseRequest;
import org.apache.gobblin.yarn.event.ContainerShutdownRequest;
import org.apache.gobblin.yarn.event.NewContainerRequest;
+import static org.apache.gobblin.yarn.GobblinYarnTaskRunner.HELIX_YARN_INSTANCE_NAME_PREFIX;
+
/**
* This class is responsible for all Yarn-related stuffs including ApplicationMaster registration,
@@ -136,6 +138,7 @@ public class YarnService extends AbstractIdleService {
private final Optional<GobblinMetrics> gobblinMetrics;
private final Optional<EventSubmitter> eventSubmitter;
+
@VisibleForTesting
@Getter(AccessLevel.PROTECTED)
private final AMRMClientAsync<AMRMClient.ContainerRequest> amrmClientAsync;
@@ -720,7 +723,7 @@ public class YarnService extends AbstractIdleService {
String instanceName = unusedHelixInstanceNames.poll();
if (Strings.isNullOrEmpty(instanceName)) {
// No unused instance name, so generating a new one.
- instanceName = HelixUtils.getHelixInstanceName(GobblinYarnTaskRunner.class.getSimpleName(),
+ instanceName = HelixUtils.getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX,
helixInstanceIdGenerator.incrementAndGet());
}
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 58d639e..ba6c1db 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,6 +19,9 @@ package org.apache.gobblin.yarn;
import java.io.IOException;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobDag;
import org.apache.helix.task.TaskDriver;
@@ -26,6 +29,7 @@ import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.mockito.Mockito;
+import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.collect.ImmutableMap;
@@ -40,6 +44,9 @@ import static org.mockito.Mockito.times;
*/
@Test(groups = { "gobblin.yarn" })
public class YarnAutoScalingManagerTest {
+ // A queue within size == 1 and upperBound == "infinite" should not impact on the execution.
+ private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue =
+ new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
/**
* Test for one workflow with one job
*/
@@ -64,17 +71,24 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext = mock(JobContext.class);
Mockito.when(mockJobContext.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+ 1, 10, noopQueue, helixDataAccessor);
runnable.run();
// 2 containers requested and one worker in use
- Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(2, ImmutableSet.of("worker1"));
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
/**
@@ -101,22 +115,30 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext1 = mock(JobContext.class);
Mockito.when(mockJobContext1.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
JobContext mockJobContext2 = mock(JobContext.class);
Mockito.when(mockJobContext2.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(3)));
- Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+ Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+ "GobblinYarnTaskRunner-2", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, noopQueue, helixDataAccessor);
runnable.run();
// 3 containers requested and 2 workers in use
- Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(3, ImmutableSet.of("worker1", "worker2"));
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
}
/**
@@ -141,13 +163,13 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext1 = mock(JobContext.class);
Mockito.when(mockJobContext1.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
JobContext mockJobContext2 = mock(JobContext.class);
Mockito.when(mockJobContext2.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(3)));
- Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+ Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
@@ -164,20 +186,28 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext3 = mock(JobContext.class);
Mockito.when(mockJobContext3.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
- Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+ Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
Mockito.when(mockTaskDriver.getWorkflows())
.thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2));
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+ "GobblinYarnTaskRunner-2", new HelixProperty(""),
+ "GobblinYarnTaskRunner-3", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, noopQueue, helixDataAccessor);
runnable.run();
// 5 containers requested and 3 workers in use
Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(5,
- ImmutableSet.of("worker1", "worker2", "worker3"));
+ ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3"));
}
/**
@@ -203,13 +233,13 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext1 = mock(JobContext.class);
Mockito.when(mockJobContext1.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
JobContext mockJobContext2 = mock(JobContext.class);
Mockito.when(mockJobContext2.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(3)));
- Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("worker2");
+ Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
@@ -226,20 +256,27 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext3 = mock(JobContext.class);
Mockito.when(mockJobContext3.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
- Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("worker3");
+ Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
Mockito.when(mockTaskDriver.getWorkflows())
.thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2));
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+ "GobblinYarnTaskRunner-2", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 10);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, noopQueue, helixDataAccessor);
runnable.run();
// 3 containers requested and 2 workers in use
Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(3,
- ImmutableSet.of("worker1", "worker2"));
+ ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
}
/**
@@ -266,17 +303,24 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext = mock(JobContext.class);
Mockito.when(mockJobContext.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 2, 1, 10);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 2, 1, 10, noopQueue, helixDataAccessor);
runnable.run();
// 1 container requested since 2 partitions and limit is 2 partitions per container. One worker in use.
- Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
@@ -304,17 +348,24 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext = mock(JobContext.class);
Mockito.when(mockJobContext.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 5, 10);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 5, 10, noopQueue, helixDataAccessor);
runnable.run();
// 5 containers requested due to min and one worker in use
- Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(5, ImmutableSet.of("worker1"));
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(5, ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
@@ -342,17 +393,24 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext = mock(JobContext.class);
Mockito.when(mockJobContext.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
- new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1);
+ new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 1, noopQueue, helixDataAccessor);
runnable.run();
// 1 container requested to max and one worker in use
- Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ Mockito.verify(mockYarnService, times(1))
+ .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
}
/**
@@ -379,29 +437,112 @@ public class YarnAutoScalingManagerTest {
JobContext mockJobContext = mock(JobContext.class);
Mockito.when(mockJobContext.getPartitionSet())
.thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
- Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("worker1");
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
+
TestYarnAutoScalingRunnable runnable =
- new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1);
+ new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1, helixDataAccessor);
runnable.setRaiseException(true);
runnable.run();
- Mockito.verify(mockYarnService, times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ Mockito.verify(mockYarnService, times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ Mockito.reset(mockYarnService);
runnable.setRaiseException(false);
runnable.run();
// 1 container requested to max and one worker in use
- Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("worker1"));
+ Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+ }
+
+ public void testMaxValueEvictingQueue() throws Exception {
+ YarnAutoScalingManager.SlidingWindowReservoir window = new YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
+
+ // Normal insertion with eviction of originally largest value
+ window.add(3);
+ window.add(1);
+ window.add(2);
+ // Now it contains [3,1,2]
+ Assert.assertEquals(window.getMax(), 3);
+ window.add(1);
+ // Now it contains [1,2,1]
+ Assert.assertEquals(window.getMax(), 2);
+ window.add(5);
+ Assert.assertEquals(window.getMax(), 5);
+ // Now it contains [2,1,5]
+ window.add(11);
+ // Still [2,1,5] as 11 > 10 thereby being rejected.
+ Assert.assertEquals(window.getMax(), 5);
+ }
+
+ /**
+ * Test the scenarios when an instance in cluster has no participants assigned for too long and got tagged as the
+ * candidate for scaling-down.
+ */
+ @Test
+ public void testInstanceIdleBeyondTolerance() throws IOException {
+ YarnService mockYarnService = mock(YarnService.class);
+ TaskDriver mockTaskDriver = mock(TaskDriver.class);
+ WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
+ JobDag mockJobDag = mock(JobDag.class);
+
+ Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
+ Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
+
+ Mockito.when(mockTaskDriver.getWorkflows())
+ .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
+
+ WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
+ Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+ Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
+
+ // Having both partition assigned to single instance initially, in this case, GobblinYarnTaskRunner-2
+ JobContext mockJobContext = mock(JobContext.class);
+ Mockito.when(mockJobContext.getPartitionSet())
+ .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+ Mockito.when(mockJobContext.getAssignedParticipant(1)).thenReturn("GobblinYarnTaskRunner-2");
+ Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-2");
+
+ Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
+
+ HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+ Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+ Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+ .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+ "GobblinYarnTaskRunner-2", new HelixProperty("")));
+
+ TestYarnAutoScalingRunnable runnable = new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
+ 1, 1, 10, helixDataAccessor);
+
+ runnable.run();
+
+ // 2 containers requested and one worker in use, while the evaluation will hold for true if not set externally,
+ // still tell YarnService there are two instances being used.
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+
+ // Set failEvaluation which simulates the "beyond tolerance" case.
+ Mockito.reset(mockYarnService);
+ runnable.setAlwaysTagUnused(true);
+ runnable.run();
+
+ Mockito.verify(mockYarnService, times(1)).
+ requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-2"));
}
private static class TestYarnAutoScalingRunnable extends YarnAutoScalingManager.YarnAutoScalingRunnable {
boolean raiseException = false;
+ boolean alwaysUnused = false;
public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer,
- int minContainers, int maxContainers) {
- super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers);
+ int minContainers, int maxContainers, HelixDataAccessor helixDataAccessor) {
+ super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, noopQueue, helixDataAccessor);
}
@Override
@@ -416,5 +557,14 @@ public class YarnAutoScalingManagerTest {
void setRaiseException(boolean raiseException) {
this.raiseException = raiseException;
}
+
+ void setAlwaysTagUnused(boolean alwaysUnused) {
+ this.alwaysUnused = alwaysUnused;
+ }
+
+ @Override
+ boolean isInstanceUnused(String participant) {
+ return alwaysUnused || super.isInstanceUnused(participant);
+ }
}
}