You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2022/04/28 19:46:40 UTC

[gobblin] branch master updated: [GOBBLIN-1620]Make yarn container allocation group by helix tag (#3487)

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e877951c [GOBBLIN-1620]Make yarn container allocation group by helix tag (#3487)
3e877951c is described below

commit 3e877951c284ccd68be3634522f9fc2c3d39f81a
Author: Hanghang Nate Liu <na...@gmail.com>
AuthorDate: Thu Apr 28 12:46:35 2022 -0700

    [GOBBLIN-1620]Make yarn container allocation group by helix tag (#3487)
    
    * make yarn service aware of helix tag and resource requirment for each workflow so that containers will be assigned to correct task
    
    update test cases
    
    update helix instance tag during task runner initiation
    
    update logs
    
    update test case
    
    * remove lib not used, add test case
    
    address comments
    
    * update test cases
    
    * remove container min and max config
---
 .../cluster/GobblinClusterConfigurationKeys.java   |   8 +
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  14 +
 .../apache/gobblin/cluster/GobblinTaskRunner.java  |  25 +-
 .../gobblin/yarn/YarnAutoScalingManager.java       | 120 ++++----
 .../gobblin/yarn/YarnContainerRequestBundle.java   |  76 +++++
 .../org/apache/gobblin/yarn/YarnHelixUtils.java    |  25 ++
 .../java/org/apache/gobblin/yarn/YarnService.java  | 176 +++++++----
 .../gobblin/yarn/event/NewContainerRequest.java    |  10 +
 .../apache/gobblin/yarn/GobblinYarnTestUtils.java  |   7 +
 .../gobblin/yarn/YarnAutoScalingManagerTest.java   | 338 ++++++++++++---------
 .../org/apache/gobblin/yarn/YarnServiceTest.java   |  39 ++-
 .../yarn/YarnServiceTestWithExpiration.java        |   6 +-
 .../src/test/resources/YarnServiceTest.conf        |   3 +-
 13 files changed, 560 insertions(+), 287 deletions(-)

diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 8c9513f50..3454b4d48 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -84,6 +84,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag";
   public static final String HELIX_PLANNING_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixPlanningJobTag";
   public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags";
+  public static final String HELIX_DEFAULT_TAG = "GobblinHelixDefaultTag";
 
   // Helix job quota
   public static final String HELIX_JOB_TYPE_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobType";
@@ -184,6 +185,13 @@ public class GobblinClusterConfigurationKeys {
   public static final String CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = GOBBLIN_CLUSTER_PREFIX + "container.exitOnHealthCheckFailure";
   public static final boolean DEFAULT_CONTAINER_EXIT_ON_HEALTH_CHECK_FAILURE_ENABLED = false;
 
+  // Config to specify the resource requirement for each Gobblin job run, so that helix tasks within this job will
+  // be assigned to containers with desired resource. This config need to cooperate with helix job tag, so that helix
+  // cluster knows how to distribute tasks to correct containers.
+  public static final String HELIX_JOB_CONTAINER_MEMORY_MBS = GOBBLIN_CLUSTER_PREFIX + "job.container.memory.mbs";
+  public static final String HELIX_JOB_CONTAINER_CORES = GOBBLIN_CLUSTER_PREFIX + "job.container.cores";
+
+
 
   //Config to enable/disable reuse of existing Helix Cluster
   public static final String HELIX_CLUSTER_OVERWRITE_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.overwrite";
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index df15ee385..e7e305ad5 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -413,6 +413,20 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher {
         gobblinJobState.getPropAsLong(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
             GobblinClusterConfigurationKeys.DEFAULT_HELIX_WORKFLOW_EXPIRY_TIME_SECONDS));
 
+    Map<String, String> jobConfigMap = new HashMap<>();
+    if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)) {
+      jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS,
+          jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
+      log.info("Job {} has specific memory requirement:{}, add this config to command config map",
+          this.jobContext.getJobId(), jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS));
+    }
+    if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)) {
+      jobConfigMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES,
+          jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
+      log.info("Job {} has specific Vcore requirement:{}, add this config to command config map",
+          this.jobContext.getJobId(), jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES));
+    }
+    jobConfigBuilder.setJobCommandConfigMap(jobConfigMap);
     return jobConfigBuilder;
   }
 
diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index cad583908..4878d1031 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -24,9 +24,11 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -540,16 +542,25 @@ public class GobblinTaskRunner implements StandardMetricsBridge {
    * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance with EXAMPLE_INSTANCE_TAG was found.
    */
   private void addInstanceTags() {
-    List<String> tags = ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY);
     HelixManager receiverManager = getReceiverManager();
     if (receiverManager.isConnected()) {
-      if (!tags.isEmpty()) {
-        logger.info("Adding tags binding " + tags);
-        tags.forEach(tag -> receiverManager.getClusterManagmentTool()
-            .addInstanceTag(this.clusterName, this.helixInstanceName, tag));
-        logger.info("Actual tags binding " + receiverManager.getClusterManagmentTool()
-            .getInstanceConfig(this.clusterName, this.helixInstanceName).getTags());
+      // The helix instance associated with this container should be consistent on helix tag
+      List<String> existedTags = receiverManager.getClusterManagmentTool()
+          .getInstanceConfig(this.clusterName, this.helixInstanceName).getTags();
+      Set<String> desiredTags = new HashSet<>(
+          ConfigUtils.getStringList(this.clusterConfig, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY));
+      if (!desiredTags.isEmpty()) {
+        // Remove tag assignments for the current Helix instance from a previous run
+        for (String tag : existedTags) {
+          if (!desiredTags.contains(tag))
+            receiverManager.getClusterManagmentTool().removeInstanceTag(this.clusterName, this.helixInstanceName, tag);
+          logger.info("Removed unrelated helix tag {} for instance {}", tag, this.helixInstanceName);
+        }
+        desiredTags.forEach(desiredTag -> receiverManager.getClusterManagmentTool()
+            .addInstanceTag(this.clusterName, this.helixInstanceName, desiredTag));
       }
+      logger.info("Actual tags binding " + receiverManager.getClusterManagmentTool()
+          .getInstanceConfig(this.clusterName, this.helixInstanceName).getTags());
     }
   }
 
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 458527abc..7c4da8fd8 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -17,11 +17,13 @@
 
 package org.apache.gobblin.yarn;
 
+import com.google.common.base.Strings;
 import java.util.ArrayDeque;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Objects;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -29,9 +31,12 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
@@ -67,14 +72,13 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   // Only one container will be requested for each N partitions of work
   private final String AUTO_SCALING_PARTITIONS_PER_CONTAINER = AUTO_SCALING_PREFIX + "partitionsPerContainer";
   private final int DEFAULT_AUTO_SCALING_PARTITIONS_PER_CONTAINER = 1;
-  private final String AUTO_SCALING_MIN_CONTAINERS = AUTO_SCALING_PREFIX + "minContainers";
-  private final int DEFAULT_AUTO_SCALING_MIN_CONTAINERS = 1;
-  private final String AUTO_SCALING_MAX_CONTAINERS = AUTO_SCALING_PREFIX + "maxContainers";
   private final String AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = AUTO_SCALING_PREFIX + "overProvisionFactor";
   private final double DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR = 1.0;
+  // The cluster level default tags for Helix instances
+  private final String defaultHelixInstanceTags;
+  private final int defaultContainerMemoryMbs;
+  private final int defaultContainerCores;
 
-  // A rough value of how much containers should be an intolerable number.
-  private final int DEFAULT_AUTO_SCALING_MAX_CONTAINERS = Integer.MAX_VALUE;
   private final String AUTO_SCALING_INITIAL_DELAY = AUTO_SCALING_PREFIX + "initialDelay";
   private final int DEFAULT_AUTO_SCALING_INITIAL_DELAY_SECS = 60;
 
@@ -87,8 +91,6 @@ public class YarnAutoScalingManager extends AbstractIdleService {
   private final ScheduledExecutorService autoScalingExecutor;
   private final YarnService yarnService;
   private final int partitionsPerContainer;
-  private final int minContainers;
-  private final int maxContainers;
   private final double overProvisionFactor;
   private final SlidingWindowReservoir slidingFixedSizeWindow;
   private static int maxIdleTimeInMinutesBeforeScalingDown = DEFAULT_MAX_IDLE_TIME_BEFORE_SCALING_DOWN_MINUTES;
@@ -103,31 +105,20 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     Preconditions.checkArgument(this.partitionsPerContainer > 0,
         AUTO_SCALING_PARTITIONS_PER_CONTAINER + " needs to be greater than 0");
 
-    this.minContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MIN_CONTAINERS,
-        DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
-
-    Preconditions.checkArgument(this.minContainers > 0,
-        DEFAULT_AUTO_SCALING_MIN_CONTAINERS + " needs to be greater than 0");
-
-    this.maxContainers = ConfigUtils.getInt(this.config, AUTO_SCALING_MAX_CONTAINERS,
-        DEFAULT_AUTO_SCALING_MAX_CONTAINERS);
-
     this.overProvisionFactor = ConfigUtils.getDouble(this.config, AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR,
         DEFAULT_AUTO_SCALING_CONTAINER_OVERPROVISION_FACTOR);
 
-    Preconditions.checkArgument(this.maxContainers > 0,
-        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than 0");
-
-    Preconditions.checkArgument(this.maxContainers >= this.minContainers,
-        DEFAULT_AUTO_SCALING_MAX_CONTAINERS + " needs to be greater than or equal to "
-            + DEFAULT_AUTO_SCALING_MIN_CONTAINERS);
-
     this.slidingFixedSizeWindow = config.hasPath(AUTO_SCALING_WINDOW_SIZE)
-        ? new SlidingWindowReservoir(maxContainers, config.getInt(AUTO_SCALING_WINDOW_SIZE))
-        : new SlidingWindowReservoir(maxContainers);
+        ? new SlidingWindowReservoir(config.getInt(AUTO_SCALING_WINDOW_SIZE), Integer.MAX_VALUE)
+        : new SlidingWindowReservoir(Integer.MAX_VALUE);
 
     this.autoScalingExecutor = Executors.newSingleThreadScheduledExecutor(
         ExecutorsUtils.newThreadFactory(Optional.of(log), Optional.of("AutoScalingExecutor")));
+
+    this.defaultHelixInstanceTags = ConfigUtils.getString(config,
+        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
+    this.defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY);
+    this.defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY);
   }
 
   @Override
@@ -140,9 +131,10 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     log.info("Scheduling the auto scaling task with an interval of {} seconds", scheduleInterval);
 
     this.autoScalingExecutor.scheduleAtFixedRate(new YarnAutoScalingRunnable(new TaskDriver(this.helixManager),
-            this.yarnService, this.partitionsPerContainer, this.minContainers, this.maxContainers, this.overProvisionFactor,
-            this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor()), initialDelay, scheduleInterval,
-        TimeUnit.SECONDS);
+            this.yarnService, this.partitionsPerContainer, this.overProvisionFactor,
+            this.slidingFixedSizeWindow, this.helixManager.getHelixDataAccessor(), this.defaultHelixInstanceTags,
+            this.defaultContainerMemoryMbs, this.defaultContainerCores),
+        initialDelay, scheduleInterval, TimeUnit.SECONDS);
   }
 
   @Override
@@ -162,11 +154,13 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     private final TaskDriver taskDriver;
     private final YarnService yarnService;
     private final int partitionsPerContainer;
-    private final int minContainers;
-    private final int maxContainers;
     private final double overProvisionFactor;
     private final SlidingWindowReservoir slidingWindowReservoir;
     private final HelixDataAccessor helixDataAccessor;
+    private final String defaultHelixInstanceTags;
+    private final int defaultContainerMemoryMbs;
+    private final int defaultContainerCores;
+
     /**
      * A static map that keep track of an idle instance and its latest beginning idle time.
      * If an instance is no longer idle when inspected, it will be dropped from this map.
@@ -202,8 +196,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     @VisibleForTesting
     void runInternal() {
       Set<String> inUseInstances = new HashSet<>();
-
-      int numPartitions = 0;
+      YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
       for (Map.Entry<String, WorkflowConfig> workFlowEntry : taskDriver.getWorkflows().entrySet()) {
         WorkflowContext workflowContext = taskDriver.getWorkflowContext(workFlowEntry.getKey());
 
@@ -217,24 +210,42 @@ public class YarnAutoScalingManager extends AbstractIdleService {
 
         WorkflowConfig workflowConfig = workFlowEntry.getValue();
         JobDag jobDag = workflowConfig.getJobDag();
-
         Set<String> jobs = jobDag.getAllNodes();
 
         // sum up the number of partitions
         for (String jobName : jobs) {
           JobContext jobContext = taskDriver.getJobContext(jobName);
-
+          JobConfig jobConfig = taskDriver.getJobConfig(jobName);
+          Resource resource = Resource.newInstance(this.defaultContainerMemoryMbs, this.defaultContainerCores);
+          int numPartitions = 0;
+          String jobTag = defaultHelixInstanceTags;
           if (jobContext != null) {
             log.debug("JobContext {} num partitions {}", jobContext, jobContext.getPartitionSet().size());
 
             inUseInstances.addAll(jobContext.getPartitionSet().stream().map(jobContext::getAssignedParticipant)
-                .filter(e -> e != null).collect(Collectors.toSet()));
-
-            numPartitions += jobContext.getPartitionSet().size();
+                .filter(Objects::nonNull).collect(Collectors.toSet()));
+
+            numPartitions = jobContext.getPartitionSet().size();
+            // Job level config for helix instance tags takes precedence over other tag configurations
+            if (jobConfig != null) {
+              if (!Strings.isNullOrEmpty(jobConfig.getInstanceGroupTag())) {
+                jobTag = jobConfig.getInstanceGroupTag();
+              }
+              Map<String, String> jobCommandConfigMap = jobConfig.getJobCommandConfigMap();
+              if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)){
+                resource.setMemory(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS)));
+              }
+              if(jobCommandConfigMap.containsKey(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)){
+                resource.setVirtualCores(Integer.parseInt(jobCommandConfigMap.get(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES)));
+              }
+            }
           }
+          // compute the container count as a ceiling of number of partitions divided by the number of containers
+          // per partition. Scale the result by a constant overprovision factor.
+          int containerCount = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
+          yarnContainerRequestBundle.add(jobTag, containerCount, resource);
         }
       }
-
       // Find all participants appearing in this cluster. Note that Helix instances can contain cluster-manager
       // and potentially replanner-instance.
       Set<String> allParticipants = getParticipants(HELIX_YARN_INSTANCE_NAME_PREFIX);
@@ -253,17 +264,11 @@ public class YarnAutoScalingManager extends AbstractIdleService {
           instanceIdleSince.remove(participant);
         }
       }
+      slidingWindowReservoir.add(yarnContainerRequestBundle);
 
-      // compute the target containers as a ceiling of number of partitions divided by the number of containers
-      // per partition. Scale the result by a constant overprovision factor.
-      int numTargetContainers = (int) Math.ceil(((double)numPartitions / this.partitionsPerContainer) * this.overProvisionFactor);
-
-      // adjust the number of target containers based on the configured min and max container values.
-      numTargetContainers = Math.max(this.minContainers, Math.min(this.maxContainers, numTargetContainers));
-
-      slidingWindowReservoir.add(numTargetContainers);
-
-      log.info("There are {} containers being requested", numTargetContainers);
+      log.debug("There are {} containers being requested in total, tag-count map {}, tag-resource map {}",
+          yarnContainerRequestBundle.getTotalContainers(), yarnContainerRequestBundle.getHelixTagContainerCountMap(),
+          yarnContainerRequestBundle.getHelixTagResourceMap());
 
       this.yarnService.requestTargetNumberOfContainers(slidingWindowReservoir.getMax(), inUseInstances);
     }
@@ -290,8 +295,8 @@ public class YarnAutoScalingManager extends AbstractIdleService {
    * which captures max value. It is NOT built for general purpose.
    */
   static class SlidingWindowReservoir {
-    private ArrayDeque<Integer> fifoQueue;
-    private PriorityQueue<Integer> priorityQueue;
+    private ArrayDeque<YarnContainerRequestBundle> fifoQueue;
+    private PriorityQueue<YarnContainerRequestBundle> priorityQueue;
 
     // Queue Size
     private int maxSize;
@@ -306,10 +311,11 @@ public class YarnAutoScalingManager extends AbstractIdleService {
       this.maxSize = maxSize;
       this.upperBound = upperBound;
       this.fifoQueue = new ArrayDeque<>(maxSize);
-      this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<Integer>() {
+      this.priorityQueue = new PriorityQueue<>(maxSize, new Comparator<YarnContainerRequestBundle>() {
         @Override
-        public int compare(Integer o1, Integer o2) {
-          return o2.compareTo(o1);
+        public int compare(YarnContainerRequestBundle o1, YarnContainerRequestBundle o2) {
+          Integer i2 = o2.getTotalContainers();
+          return i2.compareTo(o1.getTotalContainers());
         }
       });
     }
@@ -323,14 +329,14 @@ public class YarnAutoScalingManager extends AbstractIdleService {
      * When a new element is larger than upperbound, reject the value since we may request too many Yarn containers.
      * When queue is full, evict head of FIFO-queue (In FIFO queue, elements are inserted from tail).
      */
-    public void add(int e) {
-      if (e > upperBound) {
+    public void add(YarnContainerRequestBundle e) {
+      if (e.getTotalContainers() > upperBound) {
         log.error(String.format("Request of getting %s containers seems to be excessive, rejected", e));
         return;
       }
 
       if (fifoQueue.size() == maxSize) {
-        Integer removedElement = fifoQueue.remove();
+        YarnContainerRequestBundle removedElement = fifoQueue.remove();
         priorityQueue.remove(removedElement);
       }
 
@@ -345,7 +351,7 @@ public class YarnAutoScalingManager extends AbstractIdleService {
     /**
      * If queue is empty, throw {@link IllegalStateException}.
      */
-    public int getMax() {
+    public YarnContainerRequestBundle getMax() {
       if (priorityQueue.size() > 0) {
         return this.priorityQueue.peek();
       } else {
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
new file mode 100644
index 000000000..63e6fcb23
--- /dev/null
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnContainerRequestBundle.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.yarn;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+/**
+ * The class that represents current Yarn container request that will be used by {link @YarnService}.
+ * Yarn container allocation should associate with helix tag, as workflows can have specific helix tag setup
+ * and specific resource requirement.
+ */
+@Slf4j
+@Getter
+public class YarnContainerRequestBundle {
+  int totalContainers;
+  private final Map<String, Integer> helixTagContainerCountMap;
+  private final Map<String, Resource> helixTagResourceMap;
+  private final Map<String, Set<String>> resourceHelixTagMap;
+
+  public YarnContainerRequestBundle() {
+    this.totalContainers = 0;
+    this.helixTagContainerCountMap = new HashMap<>();
+    this.helixTagResourceMap = new HashMap<>();
+    this.resourceHelixTagMap = new HashMap<>();
+  }
+
+  public void add(String helixTag, int containerCount, Resource resource) {
+    helixTagContainerCountMap.put(helixTag, helixTagContainerCountMap.getOrDefault(helixTag, 0) + containerCount);
+    if(helixTagResourceMap.containsKey(helixTag)) {
+      Resource existedResource = helixTagResourceMap.get(helixTag);
+      Preconditions.checkArgument(resource.getMemory() == existedResource.getMemory() &&
+              resource.getVirtualCores() == existedResource.getVirtualCores(),
+          "Helix tag need to have consistent resource requirement. Tag " + helixTag
+              + " has existed resource require " + existedResource.toString() + " and different require " + resource.toString());
+    } else {
+      helixTagResourceMap.put(helixTag, resource);
+      Set<String> tagSet = resourceHelixTagMap.getOrDefault(resource.toString(), new HashSet<>());
+      tagSet.add(helixTag);
+      resourceHelixTagMap.put(resource.toString(), tagSet);
+    }
+    totalContainers += containerCount;
+  }
+
+  // This method assumes the resource requirement for the helix tag is already stored in the map
+  public void add(String helixTag, int containerCount) {
+    if (!helixTagContainerCountMap.containsKey(helixTag) && !helixTagResourceMap.containsKey(helixTag)) {
+      log.error("Helix tag {} is not present in the request bundle yet, can't process the request to add {} "
+          + "container for it without specifying the resource requirement", helixTag, containerCount);
+      return;
+    }
+    helixTagContainerCountMap.put(helixTag, helixTagContainerCountMap.get(helixTag) + containerCount);
+    this.totalContainers += containerCount;
+  }
+}
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
index 5309a13ac..18068895c 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnHelixUtils.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -229,4 +230,28 @@ public class YarnHelixUtils {
   public static String getContainerNum(String containerId) {
     return "container-" + containerId.substring(containerId.lastIndexOf("_") + 1);
   }
+
+  /**
+   * Find the helix tag for the newly allocated container. The tag should align with {@link YarnContainerRequestBundle},
+   * so that the correct resource can be allocated to helix workflow that has specific resource requirement.
+   * @param container newly allocated container
+   * @param helixTagAllocatedContainerCount current container count for each helix tag
+   * @param requestedYarnContainer yarn container request specify the desired state
+   * @return helix tag that this container should be assigned with, if null means need to use the default
+   */
+  public static String findHelixTagForContainer(Container container,
+      Map<String, Integer> helixTagAllocatedContainerCount, YarnContainerRequestBundle requestedYarnContainer) {
+    String foundTag = null;
+    if(requestedYarnContainer != null && requestedYarnContainer.getResourceHelixTagMap().containsKey(container.getResource().toString())) {
+      for (String tag : requestedYarnContainer.getResourceHelixTagMap().get(container.getResource().toString())) {
+        int desiredCount = requestedYarnContainer.getHelixTagContainerCountMap().get(tag);
+        int allocatedCount = helixTagAllocatedContainerCount.getOrDefault(tag, 0);
+        foundTag = tag;
+        if(allocatedCount < desiredCount) {
+          return foundTag;
+        }
+      }
+    }
+    return foundTag;
+  }
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 9fadc9485..ac61b0eeb 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -20,7 +20,6 @@ package org.apache.gobblin.yarn;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -35,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import lombok.AllArgsConstructor;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -70,7 +70,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
@@ -127,6 +126,7 @@ public class YarnService extends AbstractIdleService {
   private final String applicationName;
   private final String applicationId;
   private final String appViewAcl;
+  //Default helix instance tag derived from cluster level config
   private final String helixInstanceTags;
 
   private final Config config;
@@ -158,6 +158,7 @@ public class YarnService extends AbstractIdleService {
   private final String containerTimezone;
   private final HelixManager helixManager;
 
+  @Getter(AccessLevel.PROTECTED)
   private volatile Optional<Resource> maxResourceCapacity = Optional.absent();
 
   // Security tokens for accessing HDFS
@@ -167,10 +168,10 @@ public class YarnService extends AbstractIdleService {
 
   private final Object allContainersStopped = new Object();
 
-  // A map from container IDs to pairs of Container instances and Helix participant IDs of the containers
+  // A map from container IDs to Container instances, Helix participant IDs of the containers and Helix Tag
   @VisibleForTesting
   @Getter(AccessLevel.PROTECTED)
-  private final ConcurrentMap<ContainerId, Map.Entry<Container, String>> containerMap = Maps.newConcurrentMap();
+  private final ConcurrentMap<ContainerId, ContainerInfo> containerMap = Maps.newConcurrentMap();
 
   // A cache of the containers with an outstanding container release request.
   // This is a cache instead of a set to get the automatic cleanup in case a container completes before the requested
@@ -190,6 +191,15 @@ public class YarnService extends AbstractIdleService {
   // instance names get picked up when replacement containers get allocated.
   private final Set<String> unusedHelixInstanceNames = ConcurrentHashMap.newKeySet();
 
+  // The map from helix tag to requested container count
+  private final Map<String, Integer> requestedContainerCountMap = Maps.newConcurrentMap();
+  // The map from helix tag to allocated container count
+  private final Map<String, Integer> allocatedContainerCountMap = Maps.newConcurrentMap();
+
+  private volatile YarnContainerRequestBundle yarnContainerRequest;
+  private final AtomicInteger priorityNumGenerator = new AtomicInteger(0);
+  private final Map<String, Integer> resourcePriorityMap = new HashMap<>();
+
   private volatile boolean shutdownInProgress = false;
 
   // The number of containers requested based on the desired target number of containers. This is used to determine
@@ -236,7 +246,8 @@ public class YarnService extends AbstractIdleService {
     this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED);
 
     this.helixInstanceMaxRetries = config.getInt(GobblinYarnConfigurationKeys.HELIX_INSTANCE_MAX_RETRIES);
-    this.helixInstanceTags = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, null);
+    this.helixInstanceTags = ConfigUtils.getString(config,
+        GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG);
 
     this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ?
         Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) :
@@ -286,14 +297,8 @@ public class YarnService extends AbstractIdleService {
           this.requestedContainerCores));
       return;
     }
-
-    requestContainer(newContainerRequest.getReplacedContainer().transform(new Function<Container, String>() {
-
-      @Override
-      public String apply(Container container) {
-        return container.getNodeId().getHost();
-      }
-    }));
+    requestContainer(newContainerRequest.getReplacedContainer().transform(container -> container.getNodeId().getHost()),
+        newContainerRequest.getResource());
   }
 
   protected NMClientCallbackHandler getNMClientCallbackHandler() {
@@ -359,10 +364,10 @@ public class YarnService extends AbstractIdleService {
       ExecutorsUtils.shutdownExecutorService(this.containerLaunchExecutor, Optional.of(LOGGER));
 
       // Stop the running containers
-      for (Map.Entry<Container, String> entry : this.containerMap.values()) {
-        LOGGER.info(String.format("Stopping container %s running participant %s", entry.getKey().getId(),
-            entry.getValue()));
-        this.nmClientAsync.stopContainerAsync(entry.getKey().getId(), entry.getKey().getNodeId());
+      for (ContainerInfo containerInfo : this.containerMap.values()) {
+        LOGGER.info("Stopping container {} running participant {}", containerInfo.getContainer().getId(),
+            containerInfo.getHelixParticipantId());
+        this.nmClientAsync.stopContainerAsync(containerInfo.getContainer().getId(), containerInfo.getContainer().getNodeId());
       }
 
       if (!this.containerMap.isEmpty()) {
@@ -431,13 +436,13 @@ public class YarnService extends AbstractIdleService {
    * number of containers. The intended usage is for the caller of this method to make periodic calls to attempt to
    * adjust the cluster towards the desired number of containers.
    *
-   * @param numTargetContainers the desired number of containers
+   * @param yarnContainerRequestBundle the desired containers information, including numbers, resource and helix tag
    * @param inUseInstances  a set of in use instances
    */
-  public synchronized void requestTargetNumberOfContainers(int numTargetContainers, Set<String> inUseInstances) {
-    LOGGER.debug("Requesting numTargetContainers {} current numRequestedContainers {} in use instances {} map size {}",
-        numTargetContainers, this.numRequestedContainers, inUseInstances, this.containerMap.size());
-
+  public synchronized void requestTargetNumberOfContainers(YarnContainerRequestBundle yarnContainerRequestBundle, Set<String> inUseInstances) {
+    LOGGER.debug("Requesting numTargetContainers {}, current numRequestedContainers {} in use, instances {} map size is {}",
+        yarnContainerRequestBundle.getTotalContainers(), this.numRequestedContainers, inUseInstances, this.containerMap.size());
+    int numTargetContainers = yarnContainerRequestBundle.getTotalContainers();
     // YARN can allocate more than the requested number of containers, compute additional allocations and deallocations
     // based on the max of the requested and actual allocated counts
     int numAllocatedContainers = this.containerMap.size();
@@ -450,8 +455,14 @@ public class YarnService extends AbstractIdleService {
     // Request additional containers if the desired count is higher than the max of the current allocation or previously
     // requested amount. Note that there may be in-flight or additional allocations after numContainers has been computed
     // so overshooting can occur, but periodic calls to this method will make adjustments towards the target.
-    for (int i = numContainers; i < numTargetContainers; i++) {
-      requestContainer(Optional.<String>absent());
+    for (Map.Entry<String, Integer> entry : yarnContainerRequestBundle.getHelixTagContainerCountMap().entrySet()) {
+      String currentHelixTag = entry.getKey();
+      int desiredContainerCount = entry.getValue();
+      int requestedContainerCount = requestedContainerCountMap.getOrDefault(currentHelixTag, 0);
+      for(; requestedContainerCount < desiredContainerCount; requestedContainerCount++) {
+        requestContainer(Optional.absent(), yarnContainerRequestBundle.getHelixTagResourceMap().get(currentHelixTag));
+      }
+      requestedContainerCountMap.put(currentHelixTag, requestedContainerCount);
     }
 
     // If the total desired is lower than the currently allocated amount then release free containers.
@@ -465,9 +476,14 @@ public class YarnService extends AbstractIdleService {
       int numToShutdown = numContainers - numTargetContainers;
 
       // Look for eligible containers to release. If a container is in use then it is not released.
-      for (Map.Entry<ContainerId, Map.Entry<Container, String>> entry : this.containerMap.entrySet()) {
-        if (!inUseInstances.contains(entry.getValue().getValue())) {
-          containersToRelease.add(entry.getValue().getKey());
+      for (Map.Entry<ContainerId, ContainerInfo> entry : this.containerMap.entrySet()) {
+        ContainerInfo containerInfo = entry.getValue();
+        if (!inUseInstances.contains(containerInfo.getHelixParticipantId())) {
+          containersToRelease.add(containerInfo.getContainer());
+          String helixTag = containerInfo.getHelixTag();
+          if (!Strings.isNullOrEmpty(helixTag)) {
+            requestedContainerCountMap.put(helixTag, requestedContainerCountMap.get(helixTag) - 1);
+          }
         }
 
         if (containersToRelease.size() == numToShutdown) {
@@ -479,32 +495,48 @@ public class YarnService extends AbstractIdleService {
 
       this.eventBus.post(new ContainerReleaseRequest(containersToRelease));
     }
-
+    this.yarnContainerRequest = yarnContainerRequestBundle;
     this.numRequestedContainers = numTargetContainers;
+    LOGGER.info("Current tag-container being requested:{}, tag-container allocated: {}",
+        this.requestedContainerCountMap, this.allocatedContainerCountMap);
   }
 
+  // Request initial containers with default resource and helix tag
   private void requestInitialContainers(int containersRequested) {
-    requestTargetNumberOfContainers(containersRequested, Collections.EMPTY_SET);
+    YarnContainerRequestBundle initialYarnContainerRequest = new YarnContainerRequestBundle();
+    Resource capability = Resource.newInstance(this.requestedContainerMemoryMbs, this.requestedContainerCores);
+    initialYarnContainerRequest.add(this.helixInstanceTags, containersRequested, capability);
+    requestTargetNumberOfContainers(initialYarnContainerRequest, Collections.EMPTY_SET);
   }
 
-  private void requestContainer(Optional<String> preferredNode) {
-    Priority priority = Records.newRecord(Priority.class);
-    priority.setPriority(0);
+  private void requestContainer(Optional<String> preferredNode, Optional<Resource> resourceOptional) {
+    Resource desiredResource = resourceOptional.or(Resource.newInstance(
+        this.requestedContainerMemoryMbs, this.requestedContainerCores));
+    requestContainer(preferredNode, desiredResource);
+  }
 
-    Resource capability = Records.newRecord(Resource.class);
-    int maxMemoryCapacity = this.maxResourceCapacity.get().getMemory();
-    capability.setMemory(this.requestedContainerMemoryMbs <= maxMemoryCapacity ?
-        this.requestedContainerMemoryMbs : maxMemoryCapacity);
-    int maxCoreCapacity = this.maxResourceCapacity.get().getVirtualCores();
-    capability.setVirtualCores(this.requestedContainerCores <= maxCoreCapacity ?
-        this.requestedContainerCores : maxCoreCapacity);
+  // Request containers with specific resource requirement
+  private void requestContainer(Optional<String> preferredNode, Resource resource) {
+    // Fail if Yarn cannot meet container resource requirements
+    Preconditions.checkArgument(resource.getMemory() <= this.maxResourceCapacity.get().getMemory() &&
+            resource.getVirtualCores() <= this.maxResourceCapacity.get().getVirtualCores(),
+        "Resource requirement must less than the max resource capacity. Requested resource" + resource.toString()
+            + " exceed the max resource limit " + this.maxResourceCapacity.get().toString());
+
+    // Due to YARN-314, different resource capacity needs different priority, otherwise Yarn will not allocate container
+    Priority priority = Records.newRecord(Priority.class);
+    if(!resourcePriorityMap.containsKey(resource.toString())) {
+      resourcePriorityMap.put(resource.toString(), priorityNumGenerator.getAndIncrement());
+    }
+    int priorityNum = resourcePriorityMap.get(resource.toString());
+    priority.setPriority(priorityNum);
 
     String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null;
     this.amrmClientAsync.addContainerRequest(
-        new AMRMClient.ContainerRequest(capability, preferredNodes, null, priority));
+        new AMRMClient.ContainerRequest(resource, preferredNodes, null, priority));
   }
 
-  protected ContainerLaunchContext newContainerLaunchContext(Container container, String helixInstanceName)
+  protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo)
       throws IOException {
     Path appWorkDir = GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, this.applicationName, this.applicationId);
     Path containerWorkDir = new Path(appWorkDir, GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
@@ -527,7 +559,7 @@ public class YarnService extends AbstractIdleService {
     ContainerLaunchContext containerLaunchContext = Records.newRecord(ContainerLaunchContext.class);
     containerLaunchContext.setLocalResources(resourceMap);
     containerLaunchContext.setEnvironment(YarnHelixUtils.getEnvironmentVariables(this.yarnConfiguration));
-    containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(container, helixInstanceName)));
+    containerLaunchContext.setCommands(Lists.newArrayList(buildContainerCommand(containerInfo)));
 
     Map<ApplicationAccessType, String> acls = new HashMap<>(1);
     acls.put(ApplicationAccessType.VIEW_APP, this.appViewAcl);
@@ -580,11 +612,11 @@ public class YarnService extends AbstractIdleService {
   }
 
   @VisibleForTesting
-  protected String buildContainerCommand(Container container, String helixInstanceName) {
+  protected String buildContainerCommand(ContainerInfo containerInfo) {
     String containerProcessName = GobblinYarnTaskRunner.class.getSimpleName();
     StringBuilder containerCommand = new StringBuilder()
         .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java")
-        .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) -
+        .append(" -Xmx").append((int) (containerInfo.getContainer().getResource().getMemory() * this.jvmMemoryXmxRatio) -
             this.jvmMemoryOverheadMbs).append("M")
         .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone)
         .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR)
@@ -596,11 +628,11 @@ public class YarnService extends AbstractIdleService {
         .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_ID_OPTION_NAME)
         .append(" ").append(this.applicationId)
         .append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_OPTION_NAME)
-        .append(" ").append(helixInstanceName);
+        .append(" ").append(containerInfo.getHelixParticipantId());
 
-    if (!Strings.isNullOrEmpty(this.helixInstanceTags)) {
+    if (!Strings.isNullOrEmpty(containerInfo.getHelixTag())) {
       containerCommand.append(" --").append(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_OPTION_NAME)
-          .append(" ").append(helixInstanceTags);
+          .append(" ").append(containerInfo.getHelixTag());
     }
     return containerCommand.append(" 1>").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR).append(File.separator).append(
           containerProcessName).append(".").append(ApplicationConstants.STDOUT)
@@ -640,11 +672,13 @@ public class YarnService extends AbstractIdleService {
    * A replacement container is needed in all but the last case.
    */
   protected void handleContainerCompletion(ContainerStatus containerStatus) {
-    Map.Entry<Container, String> completedContainerEntry = this.containerMap.remove(containerStatus.getContainerId());
+    ContainerInfo completedContainerInfo = this.containerMap.remove(containerStatus.getContainerId());
     //Get the Helix instance name for the completed container. Because callbacks are processed asynchronously, we might
     //encounter situations where handleContainerCompletion() is called before onContainersAllocated(), resulting in the
     //containerId missing from the containersMap.
-    String completedInstanceName = completedContainerEntry == null?  UNKNOWN_HELIX_INSTANCE : completedContainerEntry.getValue();
+    String completedInstanceName = completedContainerInfo == null?  UNKNOWN_HELIX_INSTANCE : completedContainerInfo.getHelixParticipantId();
+    String helixTag = completedContainerInfo == null ? helixInstanceTags : completedContainerInfo.getHelixTag();
+    allocatedContainerCountMap.put(helixTag, allocatedContainerCountMap.get(helixTag) - 1);
 
     LOGGER.info(String.format("Container %s running Helix instance %s has completed with exit status %d",
         containerStatus.getContainerId(), completedInstanceName, containerStatus.getExitStatus()));
@@ -657,7 +691,7 @@ public class YarnService extends AbstractIdleService {
     if (containerStatus.getExitStatus() == ContainerExitStatus.ABORTED) {
       if (this.releasedContainerCache.getIfPresent(containerStatus.getContainerId()) != null) {
         LOGGER.info("Container release requested, so not spawning a replacement for containerId {}", containerStatus.getContainerId());
-        if (completedContainerEntry != null) {
+        if (completedContainerInfo != null) {
           LOGGER.info("Adding instance {} to the pool of unused instances", completedInstanceName);
           this.unusedHelixInstanceNames.add(completedInstanceName);
         }
@@ -683,7 +717,7 @@ public class YarnService extends AbstractIdleService {
     if (this.shutdownInProgress) {
       return;
     }
-    if(completedContainerEntry != null) {
+    if(completedContainerInfo != null) {
       this.helixInstanceRetryCount.putIfAbsent(completedInstanceName, new AtomicInteger(0));
       int retryCount = this.helixInstanceRetryCount.get(completedInstanceName).incrementAndGet();
 
@@ -715,10 +749,13 @@ public class YarnService extends AbstractIdleService {
             .submit(GobblinYarnEventConstants.EventNames.HELIX_INSTANCE_COMPLETION, eventMetadataBuilder.get().build());
       }
     }
-    LOGGER.info(String.format("Requesting a new container to replace %s to run Helix instance %s", containerStatus.getContainerId(), completedInstanceName));
+    Optional<Resource> newContainerResource = completedContainerInfo != null ?
+        Optional.of(completedContainerInfo.getContainer().getResource()) : Optional.absent();
+    LOGGER.info("Requesting a new container to replace {} to run Helix instance {} with helix tag {} and resource {}",
+        containerStatus.getContainerId(), completedInstanceName, helixTag, newContainerResource.orNull());
     this.eventBus.post(new NewContainerRequest(
-        shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerEntry != null ?
-            Optional.of(completedContainerEntry.getKey()) : Optional.<Container>absent()));
+        shouldStickToTheSameNode(containerStatus.getExitStatus()) && completedContainerInfo != null ?
+            Optional.of(completedContainerInfo.getContainer()) : Optional.absent(), newContainerResource));
   }
 
   private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(ContainerStatus containerStatus) {
@@ -755,12 +792,18 @@ public class YarnService extends AbstractIdleService {
     @Override
     public void onContainersAllocated(List<Container> containers) {
       for (final Container container : containers) {
+        String containerId = container.getId().toString();
+        String containerHelixTag = YarnHelixUtils.findHelixTagForContainer(container, allocatedContainerCountMap, yarnContainerRequest);
+        if (Strings.isNullOrEmpty(containerHelixTag)) {
+          containerHelixTag = helixInstanceTags;
+        }
         if (eventSubmitter.isPresent()) {
           eventSubmitter.get().submit(GobblinYarnEventConstants.EventNames.CONTAINER_ALLOCATION,
-              GobblinYarnMetricTagNames.CONTAINER_ID, container.getId().toString());
+              GobblinYarnMetricTagNames.CONTAINER_ID, containerId);
         }
 
-        LOGGER.info(String.format("Container %s has been allocated", container.getId()));
+        LOGGER.info("Container {} has been allocated with resource {} for helix tag {}",
+            container.getId(), container.getResource(), containerHelixTag);
 
         //Iterate over the (thread-safe) set of unused instances to find the first instance that is not currently live.
         //Once we find a candidate instance, it is removed from the set.
@@ -781,6 +824,8 @@ public class YarnService extends AbstractIdleService {
               instanceName = null;
             }
           }
+          allocatedContainerCountMap.put(containerHelixTag,
+              allocatedContainerCountMap.getOrDefault(containerHelixTag, 0) + 1);
         }
 
         if (Strings.isNullOrEmpty(instanceName)) {
@@ -789,8 +834,8 @@ public class YarnService extends AbstractIdleService {
               .getHelixInstanceName(HELIX_YARN_INSTANCE_NAME_PREFIX, helixInstanceIdGenerator.incrementAndGet());
         }
 
-        final String finalInstanceName = instanceName;
-        containerMap.put(container.getId(), new AbstractMap.SimpleImmutableEntry<>(container, finalInstanceName));
+        ContainerInfo containerInfo = new ContainerInfo(container, instanceName, containerHelixTag);
+        containerMap.put(container.getId(), containerInfo);
 
         // Find matching requests and remove the request to reduce the chance that a subsequent request
         // will request extra containers. YARN does not have a delta request API and the requests are not
@@ -819,11 +864,11 @@ public class YarnService extends AbstractIdleService {
           @Override
           public void run() {
             try {
-              LOGGER.info("Starting container " + container.getId());
+              LOGGER.info("Starting container " + containerId);
 
-              nmClientAsync.startContainerAsync(container, newContainerLaunchContext(container, finalInstanceName));
+              nmClientAsync.startContainerAsync(container, newContainerLaunchContext(containerInfo));
             } catch (IOException ioe) {
-              LOGGER.error("Failed to start container " + container.getId(), ioe);
+              LOGGER.error("Failed to start container " + containerId, ioe);
             }
           }
         });
@@ -939,4 +984,13 @@ public class YarnService extends AbstractIdleService {
       LOGGER.error(String.format("Failed to stop container %s due to error %s", containerId, t));
     }
   }
+
+  //A class encapsulates Container instances, Helix participant IDs of the containers and Helix Tag
+  @AllArgsConstructor
+  @Getter
+  static class ContainerInfo {
+     private final Container container;
+     private final String helixParticipantId;
+     private final String helixTag;
+  }
 }
diff --git a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
index 7ee4b3223..5cb529ed0 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/event/NewContainerRequest.java
@@ -17,9 +17,11 @@
 
 package org.apache.gobblin.yarn.event;
 
+import lombok.Getter;
 import org.apache.hadoop.yarn.api.records.Container;
 
 import com.google.common.base.Optional;
+import org.apache.hadoop.yarn.api.records.Resource;
 
 
 /**
@@ -30,9 +32,17 @@ import com.google.common.base.Optional;
 public class NewContainerRequest {
 
   private final Optional<Container> replacedContainer;
+  @Getter
+  private final Optional<Resource> resource;
 
   public NewContainerRequest(Optional<Container> replacedContainer) {
     this.replacedContainer = replacedContainer;
+    this.resource = Optional.absent();
+  }
+
+  public NewContainerRequest(Optional<Container> replacedContainer, Optional<Resource> resource) {
+    this.replacedContainer = replacedContainer;
+    this.resource = resource;
   }
 
   /**
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
index 9bb5b747e..21f3df0d5 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnTestUtils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -75,4 +76,10 @@ public class GobblinYarnTestUtils {
     credentials.addToken(token.getService(), token);
     credentials.writeTokenStorageFile(path, new Configuration());
   }
+
+  public static YarnContainerRequestBundle createYarnContainerRequest(int n, Resource resource) {
+    YarnContainerRequestBundle yarnContainerRequestBundle = new YarnContainerRequestBundle();
+    yarnContainerRequestBundle.add("GobblinKafkaStreaming", n, resource);
+    return yarnContainerRequestBundle;
+  }
 }
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
index 6c4047147..003bd9ccb 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnAutoScalingManagerTest.java
@@ -19,15 +19,22 @@ package org.apache.gobblin.yarn;
 
 import java.io.IOException;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixProperty;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.task.JobConfig;
 import org.apache.helix.task.JobContext;
 import org.apache.helix.task.JobDag;
 import org.apache.helix.task.TaskDriver;
 import org.apache.helix.task.TaskState;
 import org.apache.helix.task.WorkflowConfig;
 import org.apache.helix.task.WorkflowContext;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -35,6 +42,7 @@ import org.testng.annotations.Test;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 
+import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 
@@ -47,6 +55,9 @@ public class YarnAutoScalingManagerTest {
   // A queue within size == 1 and upperBound == "infinite" should not impact on the execution.
   private final static YarnAutoScalingManager.SlidingWindowReservoir noopQueue =
       new YarnAutoScalingManager.SlidingWindowReservoir(1, Integer.MAX_VALUE);
+  private final static int defaultContainerMemory = 1024;
+  private final static int defaultContainerCores = 2;
+  private final static String defaultHelixTag = "DefaultHelixTag";
   /**
    * Test for one workflow with one job
    */
@@ -82,13 +93,15 @@ public class YarnAutoScalingManagerTest {
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
         new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
-            1, 10, 1.0, noopQueue, helixDataAccessor);
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
-
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
     // 2 containers requested and one worker in use
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
   }
 
   /**
@@ -131,14 +144,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
   }
 
   /**
@@ -200,14 +216,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-3", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 5 containers requested and 3 workers in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(5,
-        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
   }
 
   /**
@@ -269,14 +288,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 3 containers requested and 2 workers in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(3,
-        ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
   }
 
   /**
@@ -313,103 +335,17 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            2, 1, 10, 1.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 2,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable.run();
 
     // 1 container requested since 2 partitions and limit is 2 partitions per container. One worker in use.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
-  }
-
-
-  /**
-   * Test min containers
-   */
-  @Test
-  public void testMinContainers() throws IOException {
-    YarnService mockYarnService = mock(YarnService.class);
-    TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
-
-    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 5, 10, 1.0, noopQueue, helixDataAccessor);
-
-    runnable.run();
-
-    // 5 containers requested due to min and one worker in use
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(5, ImmutableSet.of("GobblinYarnTaskRunner-1"));
-  }
-
-  /**
-   * Test max containers
-   */
-  @Test
-  public void testMaxContainers() throws IOException {
-    YarnService mockYarnService = mock(YarnService.class);
-    TaskDriver mockTaskDriver = mock(TaskDriver.class);
-    WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
-    JobDag mockJobDag = mock(JobDag.class);
-
-    Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
-    Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
-
-    Mockito.when(mockTaskDriver.getWorkflows())
-        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig));
-
-    WorkflowContext mockWorkflowContext = mock(WorkflowContext.class);
-    Mockito.when(mockWorkflowContext.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
-
-    Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext);
-
-    JobContext mockJobContext = mock(JobContext.class);
-    Mockito.when(mockJobContext.getPartitionSet())
-        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
-    Mockito.when(mockJobContext.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
-
-    Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext);
-
-    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
-    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
-    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
-        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
-
-    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 1, 1.0, noopQueue, helixDataAccessor);
-
-    runnable.run();
-
-    // 1 container requested to max and one worker in use
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
   }
 
   @Test
@@ -443,41 +379,49 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
 
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable1 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, 1.2, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            1.2, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable1.run();
 
     // 3 containers requested to max and one worker in use
-    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10
-    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 1.2))) = 3.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(3, ImmutableSet.of("GobblinYarnTaskRunner-1"));
-
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
+    // so targetNumContainers = Ceil((2/1) * 1.2)) = 3.
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 3);
 
+    Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable2 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, 0.1, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            0.1, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable2.run();
 
     // 3 containers requested to max and one worker in use
-    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2, Min containers = 1, Max = 10
-    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 0.1))) = 1.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    // NumPartitions = 2, Partitions per container = 1 and overprovision = 1.2
+    // so targetNumContainers = Ceil((2/1) * 0.1)) = 1.
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 1);
 
+    Mockito.reset(mockYarnService);
     YarnAutoScalingManager.YarnAutoScalingRunnable runnable3 =
-        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, 6.0, noopQueue, helixDataAccessor);
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            6.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
 
     runnable3.run();
 
     // 3 containers requested to max and one worker in use
     // NumPartitions = 2, Partitions per container = 1 and overprovision = 6.0,
-    // so targetNumContainers = Max (1, Min(10, Ceil((2/1) * 6.0))) = 10.
-    Mockito.verify(mockYarnService, times(1))
-        .requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    // so targetNumContainers = Ceil((2/1) * 6.0)) = 12.
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 12);
   }
 
   /**
@@ -514,37 +458,43 @@ public class YarnAutoScalingManagerTest {
         .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty("")));
 
     TestYarnAutoScalingRunnable runnable =
-        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, 1, 1, helixDataAccessor);
+        new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1, helixDataAccessor);
 
     runnable.setRaiseException(true);
     runnable.run();
-    Mockito.verify(mockYarnService, times(0)).requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(0)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
 
     Mockito.reset(mockYarnService);
     runnable.setRaiseException(false);
     runnable.run();
-    // 1 container requested to max and one worker in use
-    Mockito.verify(mockYarnService, times(1)).requestTargetNumberOfContainers(1, ImmutableSet.of("GobblinYarnTaskRunner-1"));
+    // 2 container requested
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
   }
 
   public void testMaxValueEvictingQueue() throws Exception {
+    Resource resource = Resource.newInstance(16, 1);
     YarnAutoScalingManager.SlidingWindowReservoir window = new YarnAutoScalingManager.SlidingWindowReservoir(3, 10);
-
     // Normal insertion with eviction of originally largest value
-    window.add(3);
-    window.add(1);
-    window.add(2);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(3, resource));
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(2, resource));
     // Now it contains [3,1,2]
-    Assert.assertEquals(window.getMax(), 3);
-    window.add(1);
+    Assert.assertEquals(window.getMax().getTotalContainers(), 3);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(1, resource));
     // Now it contains [1,2,1]
-    Assert.assertEquals(window.getMax(), 2);
-    window.add(5);
-    Assert.assertEquals(window.getMax(), 5);
+    Assert.assertEquals(window.getMax().getTotalContainers(), 2);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(5, resource));
+    Assert.assertEquals(window.getMax().getTotalContainers(), 5);
     // Now it contains [2,1,5]
-    window.add(11);
+    window.add(GobblinYarnTestUtils.createYarnContainerRequest(11, resource));
     // Still [2,1,5] as 11 > 10 thereby being rejected.
-    Assert.assertEquals(window.getMax(), 5);
+    Assert.assertEquals(window.getMax().getTotalContainers(), 5);
   }
 
   /**
@@ -557,7 +507,6 @@ public class YarnAutoScalingManagerTest {
     TaskDriver mockTaskDriver = mock(TaskDriver.class);
     WorkflowConfig mockWorkflowConfig = mock(WorkflowConfig.class);
     JobDag mockJobDag = mock(JobDag.class);
-
     Mockito.when(mockJobDag.getAllNodes()).thenReturn(ImmutableSet.of("job1"));
     Mockito.when(mockWorkflowConfig.getJobDag()).thenReturn(mockJobDag);
 
@@ -585,14 +534,17 @@ public class YarnAutoScalingManagerTest {
             "GobblinYarnTaskRunner-2", new HelixProperty("")));
 
     TestYarnAutoScalingRunnable runnable = new TestYarnAutoScalingRunnable(mockTaskDriver, mockYarnService,
-            1, 1, 10, helixDataAccessor);
+            1, helixDataAccessor);
 
     runnable.run();
 
     // 2 containers requested and one worker in use, while the evaluation will hold for true if not set externally,
     // still tell YarnService there are two instances being used.
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2"));
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
 
     // Set failEvaluation which simulates the "beyond tolerance" case.
     Mockito.reset(mockYarnService);
@@ -600,7 +552,98 @@ public class YarnAutoScalingManagerTest {
     runnable.run();
 
     Mockito.verify(mockYarnService, times(1)).
-        requestTargetNumberOfContainers(2, ImmutableSet.of("GobblinYarnTaskRunner-2"));
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-2")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 2);
+  }
+
+  @Test
+  public void testFlowsWithHelixTags() {
+    YarnService mockYarnService = mock(YarnService.class);
+    TaskDriver mockTaskDriver = mock(TaskDriver.class);
+
+    WorkflowConfig mockWorkflowConfig1 = mock(WorkflowConfig.class);
+    JobDag mockJobDag1 = mock(JobDag.class);
+
+    Mockito.when(mockJobDag1.getAllNodes()).thenReturn(ImmutableSet.of("job1", "job2"));
+    Mockito.when(mockWorkflowConfig1.getJobDag()).thenReturn(mockJobDag1);
+
+    WorkflowContext mockWorkflowContext1 = mock(WorkflowContext.class);
+    Mockito.when(mockWorkflowContext1.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    Mockito.when(mockTaskDriver.getWorkflowContext("workflow1")).thenReturn(mockWorkflowContext1);
+
+    JobContext mockJobContext1 = mock(JobContext.class);
+    Mockito.when(mockJobContext1.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(1), Integer.valueOf(2)));
+    Mockito.when(mockJobContext1.getAssignedParticipant(2)).thenReturn("GobblinYarnTaskRunner-1");
+    Mockito.when(mockTaskDriver.getJobContext("job1")).thenReturn(mockJobContext1);
+
+    JobContext mockJobContext2 = mock(JobContext.class);
+    Mockito.when(mockJobContext2.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(3)));
+    Mockito.when(mockJobContext2.getAssignedParticipant(3)).thenReturn("GobblinYarnTaskRunner-2");
+    Mockito.when(mockTaskDriver.getJobContext("job2")).thenReturn(mockJobContext2);
+
+    WorkflowConfig mockWorkflowConfig2 = mock(WorkflowConfig.class);
+    JobDag mockJobDag2 = mock(JobDag.class);
+
+    Mockito.when(mockJobDag2.getAllNodes()).thenReturn(ImmutableSet.of("job3"));
+    Mockito.when(mockWorkflowConfig2.getJobDag()).thenReturn(mockJobDag2);
+
+    WorkflowContext mockWorkflowContext2 = mock(WorkflowContext.class);
+    Mockito.when(mockWorkflowContext2.getWorkflowState()).thenReturn(TaskState.IN_PROGRESS);
+
+    Mockito.when(mockTaskDriver.getWorkflowContext("workflow2")).thenReturn(mockWorkflowContext2);
+
+    JobContext mockJobContext3 = mock(JobContext.class);
+    Mockito.when(mockJobContext3.getPartitionSet())
+        .thenReturn(ImmutableSet.of(Integer.valueOf(4), Integer.valueOf(5)));
+    Mockito.when(mockJobContext3.getAssignedParticipant(4)).thenReturn("GobblinYarnTaskRunner-3");
+    Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+    JobConfig mockJobConfig3 = mock(JobConfig.class);
+    String helixTag = "test-Tag1";
+    Map<String, String> resourceMap = new HashMap<>();
+    resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_MEMORY_MBS, "512");
+    resourceMap.put(GobblinClusterConfigurationKeys.HELIX_JOB_CONTAINER_CORES, "8");
+    Mockito.when(mockJobConfig3.getInstanceGroupTag()).thenReturn(helixTag);
+    Mockito.when(mockJobConfig3.getJobCommandConfigMap()).thenReturn(resourceMap);
+    Mockito.when(mockTaskDriver.getJobContext("job3")).thenReturn(mockJobContext3);
+    Mockito.when(mockTaskDriver.getJobConfig("job3")).thenReturn(mockJobConfig3);
+    Mockito.when(mockTaskDriver.getWorkflows())
+        .thenReturn(ImmutableMap.of("workflow1", mockWorkflowConfig1, "workflow2", mockWorkflowConfig2));
+
+    HelixDataAccessor helixDataAccessor = mock(HelixDataAccessor.class);
+    Mockito.when(helixDataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder("cluster"));
+    Mockito.when(helixDataAccessor.getChildValuesMap(Mockito.any()))
+        .thenReturn(ImmutableMap.of("GobblinYarnTaskRunner-1", new HelixProperty(""),
+            "GobblinYarnTaskRunner-2", new HelixProperty(""),
+            "GobblinYarnTaskRunner-3", new HelixProperty("")));
+
+    YarnAutoScalingManager.YarnAutoScalingRunnable runnable =
+        new YarnAutoScalingManager.YarnAutoScalingRunnable(mockTaskDriver, mockYarnService, 1,
+            1.0, noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
+
+    runnable.run();
+
+    // 5 containers requested and 3 workers in use
+    ArgumentCaptor<YarnContainerRequestBundle> argument = ArgumentCaptor.forClass(YarnContainerRequestBundle.class);
+    Mockito.verify(mockYarnService, times(1)).
+        requestTargetNumberOfContainers(argument.capture(),
+            eq(ImmutableSet.of("GobblinYarnTaskRunner-1", "GobblinYarnTaskRunner-2", "GobblinYarnTaskRunner-3")));
+    Assert.assertEquals(argument.getValue().getTotalContainers(), 5);
+    Map<String, Set<String>> resourceHelixTagMap = argument.getValue().getResourceHelixTagMap();
+    Map<String, Resource> helixTagResourceMap = argument.getValue().getHelixTagResourceMap();
+    Map<String, Integer> helixTagContainerCountMap = argument.getValue().getHelixTagContainerCountMap();
+
+    // Verify that 3 containers requested with default tag and resource setting,
+    // while 2 with specific helix tag and resource requirement
+    Assert.assertEquals(resourceHelixTagMap.size(), 2);
+    Assert.assertEquals(helixTagResourceMap.get(helixTag), Resource.newInstance(512, 8));
+    Assert.assertEquals(helixTagResourceMap.get(defaultHelixTag), Resource.newInstance(defaultContainerMemory, defaultContainerCores));
+    Assert.assertEquals((int) helixTagContainerCountMap.get(helixTag), 2);
+    Assert.assertEquals((int) helixTagContainerCountMap.get(defaultHelixTag), 3);
+
   }
 
   private static class TestYarnAutoScalingRunnable extends YarnAutoScalingManager.YarnAutoScalingRunnable {
@@ -608,8 +651,9 @@ public class YarnAutoScalingManagerTest {
     boolean alwaysUnused = false;
 
     public TestYarnAutoScalingRunnable(TaskDriver taskDriver, YarnService yarnService, int partitionsPerContainer,
-        int minContainers, int maxContainers, HelixDataAccessor helixDataAccessor) {
-      super(taskDriver, yarnService, partitionsPerContainer, minContainers, maxContainers, 1.0, noopQueue, helixDataAccessor);
+        HelixDataAccessor helixDataAccessor) {
+      super(taskDriver, yarnService, partitionsPerContainer, 1.0,
+          noopQueue, helixDataAccessor, defaultHelixTag, defaultContainerMemory, defaultContainerCores);
     }
 
     @Override
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
index 215e1bd03..bf595ec8e 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTest.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -207,14 +208,16 @@ public class YarnServiceTest {
    */
   @Test(groups = {"gobblin.yarn", "disabledOnCI"})
   public void testScaleUp() {
-    this.yarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
+    Resource resource = Resource.newInstance(16, 1);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), Collections.EMPTY_SET);
 
-    Assert.assertFalse(this.yarnService.getMatchingRequestsList(64, 1).isEmpty());
+    Assert.assertFalse(this.yarnService.getMatchingRequestsList(resource).isEmpty());
     Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 10);
     Assert.assertTrue(this.yarnService.waitForContainerCount(10, 60000));
 
     // container request list that had entries earlier should now be empty
-    Assert.assertEquals(this.yarnService.getMatchingRequestsList(64, 1).size(), 0);
+    Assert.assertEquals(this.yarnService.getMatchingRequestsList(resource).size(), 0);
   }
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = "testScaleUp")
@@ -224,8 +227,9 @@ public class YarnServiceTest {
     for (int i = 1; i <= 8; i++) {
       inUseInstances.add("GobblinYarnTaskRunner_" + i);
     }
-
-    this.yarnService.requestTargetNumberOfContainers(6, inUseInstances);
+    Resource resource = Resource.newInstance(16, 1);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(6, resource), inUseInstances);
 
     Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 6);
 
@@ -239,7 +243,9 @@ public class YarnServiceTest {
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"}, dependsOnMethods = "testScaleDownWithInUseInstances")
   public void testScaleDown() throws Exception {
-    this.yarnService.requestTargetNumberOfContainers(4, Collections.EMPTY_SET);
+    Resource resource = Resource.newInstance(16, 1);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(4, resource), Collections.EMPTY_SET);
 
     Assert.assertEquals(this.yarnService.getNumRequestedContainers(), 4);
     Assert.assertTrue(this.yarnService.waitForContainerCount(4, 60000));
@@ -279,13 +285,25 @@ public class YarnServiceTest {
         0), 0);
     Resource resource = Resource.newInstance(2048, 1);
     Container container = Container.newInstance(containerId, null, null, resource, null, null);
+    YarnService.ContainerInfo
+        containerInfo = new YarnService.ContainerInfo(container, "helixInstance1", "helixTag");
 
-    String command = yarnService.buildContainerCommand(container, "helixInstance1");
+    String command = yarnService.buildContainerCommand(containerInfo);
 
     // 1628 is from 2048 * 0.8 - 10
     Assert.assertTrue(command.contains("-Xmx1628"));
   }
 
+  /**
+   * Test if requested resource exceed the resource limit, yarnService should fail.
+   */
+  @Test(groups = {"gobblin.yarn", "disabledOnCI"}, expectedExceptions = IllegalArgumentException.class)
+  public void testExceedResourceLimit() {
+    Resource resource = Resource.newInstance(204800, 10240);
+    this.yarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), Collections.EMPTY_SET);
+  }
+
    static class TestYarnService extends YarnService {
     public TestYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration,
         FileSystem fs, EventBus eventBus) throws Exception {
@@ -319,10 +337,8 @@ public class YarnServiceTest {
     /**
      * Get the list of matching container requests for the specified resource memory and cores.
      */
-    public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(int memory, int cores) {
-      Resource resource = Resource.newInstance(memory, cores);
+    public List<? extends Collection<AMRMClient.ContainerRequest>> getMatchingRequestsList(Resource resource) {
       Priority priority = Priority.newInstance(0);
-
       return getAmrmClientAsync().getMatchingRequests(priority, ResourceRequest.ANY, resource);
     }
 
@@ -346,13 +362,12 @@ public class YarnServiceTest {
           Thread.currentThread().interrupt();
           break;
         }
-
+        ConcurrentMap<ContainerId, ContainerInfo> containerMap = getContainerMap();
         if (expectedCount == getContainerMap().size()) {
           success = true;
           break;
         }
       }
-
       return success;
     }
   }
diff --git a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
index 1934ecebb..ad2e91f49 100644
--- a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
+++ b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/YarnServiceTestWithExpiration.java
@@ -198,9 +198,11 @@ public class YarnServiceTestWithExpiration {
 
   @Test(groups = {"gobblin.yarn", "disabledOnCI"})
   public void testStartError() throws Exception{
-    this.expiredYarnService.requestTargetNumberOfContainers(10, Collections.EMPTY_SET);
+    Resource resource = Resource.newInstance(16, 1);
+    this.expiredYarnService.requestTargetNumberOfContainers(
+        GobblinYarnTestUtils.createYarnContainerRequest(10, resource), Collections.EMPTY_SET);
 
-    Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(64, 1).isEmpty());
+    Assert.assertFalse(this.expiredYarnService.getMatchingRequestsList(resource).isEmpty());
     Assert.assertEquals(this.expiredYarnService.getNumRequestedContainers(), 10);
 
     AssertWithBackoff.create().logger(LOG).timeoutMs(60000).maxSleepMs(2000).backoffFactor(1.5)
diff --git a/gobblin-yarn/src/test/resources/YarnServiceTest.conf b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
index d2dc49c46..73ecf85cd 100644
--- a/gobblin-yarn/src/test/resources/YarnServiceTest.conf
+++ b/gobblin-yarn/src/test/resources/YarnServiceTest.conf
@@ -17,6 +17,7 @@
 
 # Yarn/Helix configuration properties
 gobblin.cluster.helix.cluster.name=YarnServiceTest
+gobblin.cluster.helixInstanceTags=GobblinKafkaStreaming
 gobblin.yarn.app.name=YarnServiceTest
 gobblin.yarn.work.dir=YarnServiceTest
 
@@ -30,7 +31,7 @@ gobblin.yarn.app.master.cores=1
 gobblin.yarn.app.report.interval.minutes=1
 gobblin.yarn.max.get.app.report.failures=4
 gobblin.yarn.email.notification.on.shutdown=false
-gobblin.yarn.initial.containers=1
+gobblin.yarn.initial.containers=0
 gobblin.yarn.container.memory.mbs=64
 gobblin.yarn.container.cores=1
 gobblin.yarn.container.affinity.enabled=true