You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2018/01/05 23:34:12 UTC

hadoop git commit: YARN-7666. Introduce scheduler specific environment variable support in ApplicationSubmissionContext for better scheduling placement configurations. (Sunil G via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 2aa4f0a55 -> a81144daa


YARN-7666. Introduce scheduler specific environment variable support in ApplicationSubmissionContext for better scheduling placement configurations. (Sunil G via wangda)

Change-Id: I0fd826490f5160d47d42af2a9ac0bd8ec4e959dc


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a81144da
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a81144da
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a81144da

Branch: refs/heads/trunk
Commit: a81144daa012e13590725f67f53e35ef84a6f1ec
Parents: 2aa4f0a
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Jan 5 15:12:04 2018 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Jan 5 15:12:04 2018 -0800

----------------------------------------------------------------------
 .../records/ApplicationSubmissionContext.java   | 21 ++++++
 .../src/main/proto/yarn_protos.proto            |  1 +
 .../pb/ApplicationSubmissionContextPBImpl.java  | 74 +++++++++++++++++++-
 .../server/resourcemanager/rmapp/RMApp.java     |  6 ++
 .../server/resourcemanager/rmapp/RMAppImpl.java | 17 ++++-
 .../scheduler/AppSchedulingInfo.java            | 34 +++++----
 .../scheduler/ApplicationPlacementFactory.java  | 63 +++++++++++++++++
 .../scheduler/SchedulerApplicationAttempt.java  | 13 +++-
 .../common/ApplicationSchedulingConfig.java     | 35 +++++++++
 .../placement/AppPlacementAllocator.java        |  9 +++
 .../LocalityAppPlacementAllocator.java          | 11 +++
 .../applicationsmanager/MockAsm.java            |  5 ++
 .../server/resourcemanager/rmapp/MockRMApp.java |  5 ++
 .../scheduler/TestAppSchedulingInfo.java        | 12 ++--
 14 files changed, 279 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
index 38db60c..d2adfdc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ApplicationSubmissionContext.java
@@ -598,4 +598,25 @@ public abstract class ApplicationSubmissionContext {
   @Unstable
   public abstract void setApplicationTimeouts(
       Map<ApplicationTimeoutType, Long> applicationTimeouts);
+
+  /**
+   * Get application scheduling environment variables stored as a key value
+   * pair map for application.
+   *
+   * @return placement envs for application.
+   */
+  @Public
+  @Unstable
+  public abstract Map<String, String> getApplicationSchedulingPropertiesMap();
+
+  /**
+   * Set the scheduling envs for the application.
+   *
+   * @param schedulingEnvMap
+   *          A map of env's for the application scheduling preferences.
+   */
+  @Public
+  @Unstable
+  public abstract void setApplicationSchedulingPropertiesMap(
+      Map<String, String> schedulingEnvMap);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 3a9662b..b6ea5f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -459,6 +459,7 @@ message ApplicationSubmissionContextProto {
   optional string node_label_expression = 16;
   repeated ResourceRequestProto am_container_resource_request = 17;
   repeated ApplicationTimeoutMapProto application_timeouts = 18;
+  repeated StringStringMapProto application_scheduling_properties = 19;
 }
 
 enum ApplicationTimeoutTypeProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
index e3dbf40..0c91e18 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.PriorityProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceRequestProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.StringStringMapProto;
 
 import com.google.protobuf.TextFormat;
 
@@ -71,6 +72,7 @@ extends ApplicationSubmissionContext {
   private LogAggregationContext logAggregationContext = null;
   private ReservationId reservationId = null;
   private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
+  private Map<String, String> schedulingProperties = null;
 
   public ApplicationSubmissionContextPBImpl() {
     builder = ApplicationSubmissionContextProto.newBuilder();
@@ -141,6 +143,9 @@ extends ApplicationSubmissionContext {
     if (this.applicationTimeouts != null) {
       addApplicationTimeouts();
     }
+    if (this.schedulingProperties != null) {
+      addApplicationSchedulingProperties();
+    }
   }
 
   private void mergeLocalToProto() {
@@ -662,4 +667,71 @@ extends ApplicationSubmissionContext {
         };
     this.builder.addAllApplicationTimeouts(values);
   }
-}  
+
+  private void addApplicationSchedulingProperties() {
+    maybeInitBuilder();
+    builder.clearApplicationSchedulingProperties();
+    if (this.schedulingProperties == null) {
+      return;
+    }
+    Iterable<? extends StringStringMapProto> values =
+        new Iterable<StringStringMapProto>() {
+
+      @Override
+      public Iterator<StringStringMapProto> iterator() {
+        return new Iterator<StringStringMapProto>() {
+          private Iterator<String> iterator = schedulingProperties.keySet()
+              .iterator();
+
+          @Override
+          public boolean hasNext() {
+            return iterator.hasNext();
+          }
+
+          @Override
+          public StringStringMapProto next() {
+            String key = iterator.next();
+            return StringStringMapProto.newBuilder()
+                .setValue(schedulingProperties.get(key)).setKey(key).build();
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+    this.builder.addAllApplicationSchedulingProperties(values);
+  }
+
+  private void initApplicationSchedulingProperties() {
+    if (this.schedulingProperties != null) {
+      return;
+    }
+    ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
+    List<StringStringMapProto> properties = p
+        .getApplicationSchedulingPropertiesList();
+    this.schedulingProperties = new HashMap<String, String>(properties.size());
+    for (StringStringMapProto envProto : properties) {
+      this.schedulingProperties.put(envProto.getKey(), envProto.getValue());
+    }
+  }
+
+  @Override
+  public Map<String, String> getApplicationSchedulingPropertiesMap() {
+    initApplicationSchedulingProperties();
+    return this.schedulingProperties;
+  }
+
+  @Override
+  public void setApplicationSchedulingPropertiesMap(
+      Map<String, String> schedulingPropertyMap) {
+    if (schedulingPropertyMap == null) {
+      return;
+    }
+    initApplicationSchedulingProperties();
+    this.schedulingProperties.clear();
+    this.schedulingProperties.putAll(schedulingPropertyMap);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 39321cc..e286834 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -311,4 +311,10 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * @return ApplicationPlacementContext
    */
   ApplicationPlacementContext getApplicationPlacementContext();
+
+  /**
+   * Get the application scheduling environment variables.
+   * @return Map of envs related to application scheduling preferences.
+   */
+  Map<String, String> getApplicationSchedulingEnvs();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 38f666b..714f0e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -82,7 +82,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
 import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
 import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
-
 import org.apache.hadoop.yarn.server.resourcemanager.placement
     .ApplicationPlacementContext;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -151,6 +150,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private final Map<RMNode, NodeUpdateType> updatedNodes = new HashMap<>();
   private final String applicationType;
   private final Set<String> applicationTags;
+  private Map<String, String> applicationSchedulingEnvs = new HashMap<>();
 
   private final long attemptFailuresValidityInterval;
   private boolean amBlacklistingEnabled = false;
@@ -489,6 +489,15 @@ public class RMAppImpl implements RMApp, Recoverable {
 
     this.placementContext = placementContext;
 
+    // If applications are not explicitly specifying envs, try to pull from
+    // AM container environment lists.
+    if(submissionContext.getAMContainerSpec() != null) {
+      applicationSchedulingEnvs
+          .putAll(submissionContext.getAMContainerSpec().getEnvironment());
+    }
+    applicationSchedulingEnvs
+        .putAll(submissionContext.getApplicationSchedulingPropertiesMap());
+
     long localLogAggregationStatusTimeout =
         conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
           YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
@@ -2029,10 +2038,14 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   /**
      * Clear Unused fields to free memory.
-     * @param app
      */
   private void clearUnusedFields() {
     this.submissionContext.setAMContainerSpec(null);
     this.submissionContext.setLogAggregationContext(null);
   }
+
+  @Override
+  public Map<String, String> getApplicationSchedulingEnvs() {
+    return this.applicationSchedulingEnvs;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index e47f0c1..8858d3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -45,10 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PendingAskUpdateResult;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -90,10 +90,12 @@ public class AppSchedulingInfo {
   private final ReentrantReadWriteLock.WriteLock writeLock;
 
   public final ContainerUpdateContext updateContext;
+  public final Map<String, String> applicationSchedulingEnvs = new HashMap<>();
 
-  public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String user, Queue queue, AbstractUsersManager abstractUsersManager,
-      long epoch, ResourceUsage appResourceUsage) {
+  public AppSchedulingInfo(ApplicationAttemptId appAttemptId, String user,
+      Queue queue, AbstractUsersManager abstractUsersManager, long epoch,
+      ResourceUsage appResourceUsage,
+      Map<String, String> applicationSchedulingEnvs) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
@@ -102,6 +104,7 @@ public class AppSchedulingInfo {
     this.containerIdCounter = new AtomicLong(
         epoch << ResourceManager.EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
+    this.applicationSchedulingEnvs.putAll(applicationSchedulingEnvs);
 
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     updateContext = new ContainerUpdateContext(this);
@@ -211,24 +214,27 @@ public class AppSchedulingInfo {
       Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
     boolean offswitchResourcesUpdated = false;
     for (Map.Entry<SchedulerRequestKey, Map<String, ResourceRequest>> entry :
-        dedupRequests.entrySet()) {
+    dedupRequests.entrySet()) {
       SchedulerRequestKey schedulerRequestKey = entry.getKey();
 
-      if (!schedulerKeyToAppPlacementAllocator.containsKey(
-          schedulerRequestKey)) {
+      if (!schedulerKeyToAppPlacementAllocator
+          .containsKey(schedulerRequestKey)) {
+        AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = ApplicationPlacementFactory
+            .getAppPlacementAllocator(applicationSchedulingEnvs
+                .get(ApplicationSchedulingConfig.ENV_APPLICATION_PLACEMENT_TYPE_CLASS));
+        placementAllocatorInstance.setAppSchedulingInfo(this);
+
         schedulerKeyToAppPlacementAllocator.put(schedulerRequestKey,
-            new LocalityAppPlacementAllocator<>(this));
+            placementAllocatorInstance);
       }
 
       // Update AppPlacementAllocator
-      PendingAskUpdateResult pendingAmountChanges =
-          schedulerKeyToAppPlacementAllocator.get(schedulerRequestKey)
-              .updatePendingAsk(entry.getValue().values(),
-                  recoverPreemptedRequestForAContainer);
+      PendingAskUpdateResult pendingAmountChanges = schedulerKeyToAppPlacementAllocator
+          .get(schedulerRequestKey).updatePendingAsk(entry.getValue().values(),
+              recoverPreemptedRequestForAContainer);
 
       if (null != pendingAmountChanges) {
-        updatePendingResources(
-            pendingAmountChanges, schedulerRequestKey,
+        updatePendingResources(pendingAmountChanges, schedulerRequestKey,
             queue.getMetrics());
         offswitchResourcesUpdated = true;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
new file mode 100644
index 0000000..40c8d05
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ApplicationPlacementFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ApplicationSchedulingConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+
+/**
+ * Factory class to build various application placement policies.
+ */
+@Public
+@Unstable
+public class ApplicationPlacementFactory {
+
+  /**
+   * Get AppPlacementAllocator related to the placement type requested.
+   *
+   * @param appPlacementAllocatorName
+   *          allocator class name.
+   * @return Specific AppPlacementAllocator instance based on type
+   */
+  public static AppPlacementAllocator<SchedulerNode> getAppPlacementAllocator(
+      String appPlacementAllocatorName) {
+    Class<?> policyClass;
+    try {
+      if (appPlacementAllocatorName == null) {
+        policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+      } else {
+        policyClass = Class.forName(appPlacementAllocatorName);
+      }
+    } catch (ClassNotFoundException e) {
+      policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+    }
+
+    if (!AppPlacementAllocator.class.isAssignableFrom(policyClass)) {
+      policyClass = ApplicationSchedulingConfig.DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS;
+    }
+
+    @SuppressWarnings("unchecked")
+    AppPlacementAllocator<SchedulerNode> placementAllocatorInstance = (AppPlacementAllocator<SchedulerNode>) ReflectionUtils
+        .newInstance(policyClass, null);
+    return placementAllocatorInstance;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index dfb0e67..f02f113 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -197,6 +197,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   protected ReentrantReadWriteLock.ReadLock readLock;
   protected ReentrantReadWriteLock.WriteLock writeLock;
 
+  private Map<String, String> applicationSchedulingEnvs = new HashMap<>();
+
   // Not confirmed allocation resource, will be used to avoid too many proposal
   // rejected because of duplicated allocation
   private AtomicLong unconfirmedAllocatedMem = new AtomicLong();
@@ -207,9 +209,6 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       RMContext rmContext) {
     Preconditions.checkNotNull(rmContext, "RMContext should not be null");
     this.rmContext = rmContext;
-    this.appSchedulingInfo = 
-        new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
     this.queue = queue;
     this.pendingRelease = Collections.newSetFromMap(
         new ConcurrentHashMap<ContainerId, Boolean>());
@@ -227,8 +226,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
         this.logAggregationContext =
             appSubmissionContext.getLogAggregationContext();
       }
+      applicationSchedulingEnvs = rmApp.getApplicationSchedulingEnvs();
     }
 
+    this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
+        queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
+        applicationSchedulingEnvs);
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
@@ -1434,4 +1437,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
       return diagnosticMessage;
     }
   }
+
+  public Map<String, String> getApplicationSchedulingEnvs() {
+    return this.applicationSchedulingEnvs;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
new file mode 100644
index 0000000..1bd3743
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ApplicationSchedulingConfig.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.AppPlacementAllocator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalityAppPlacementAllocator;
+/**
+ * This class will keep all Scheduling env's names which will help in
+ * placement calculations.
+ */
+public class ApplicationSchedulingConfig {
+  @InterfaceAudience.Private
+  public static final String ENV_APPLICATION_PLACEMENT_TYPE_CLASS =
+      "APPLICATION_PLACEMENT_TYPE_CLASS";
+
+  @InterfaceAudience.Private
+  public static final Class<? extends AppPlacementAllocator>
+      DEFAULT_APPLICATION_PLACEMENT_TYPE_CLASS = LocalityAppPlacementAllocator.class;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
index dcb38aa..5c49450 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/AppPlacementAllocator.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
 
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
@@ -157,4 +158,12 @@ public interface AppPlacementAllocator<N extends SchedulerNode> {
    * Print human-readable requests to LOG debug.
    */
   void showRequests();
+
+  /**
+   * Set app scheduling info.
+   *
+   * @param appSchedulingInfo
+   *          app info object.
+   */
+  void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
index 766827c..be1c1cc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java
@@ -66,6 +66,12 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
     this.appSchedulingInfo = info;
   }
 
+  public LocalityAppPlacementAllocator() {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
   @Override
   @SuppressWarnings("unchecked")
   public Iterator<N> getPreferredNodeIterator(
@@ -419,4 +425,9 @@ public class LocalityAppPlacementAllocator <N extends SchedulerNode>
       writeLock.unlock();
     }
   }
+
+  @Override
+  public void setAppSchedulingInfo(AppSchedulingInfo appSchedulingInfo) {
+    this.appSchedulingInfo = appSchedulingInfo;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 2aca375..72de27c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -254,6 +254,11 @@ public abstract class MockAsm extends MockApps {
     public CollectorInfo getCollectorInfo() {
       throw new UnsupportedOperationException("Not supported yet.");
     }
+
+    @Override
+    public Map<String, String> getApplicationSchedulingEnvs() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
   }
 
   public static RMApp newApplication(int i) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 7567599..c399368 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -338,4 +338,9 @@ public class MockRMApp implements RMApp {
   public CollectorInfo getCollectorInfo() {
     throw new UnsupportedOperationException("Not supported yet.");
   }
+
+  @Override
+  public Map<String, String> getApplicationSchedulingEnvs() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a81144da/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
index bb29889..3692b29 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAppSchedulingInfo.java
@@ -21,10 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -47,8 +44,9 @@ public class TestAppSchedulingInfo {
 
     FSLeafQueue queue = mock(FSLeafQueue.class);
     doReturn("test").when(queue).getQueueName();
-    AppSchedulingInfo  appSchedulingInfo = new AppSchedulingInfo(
-        appAttemptId, "test", queue, null, 0, new ResourceUsage());
+    AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(appAttemptId,
+        "test", queue, null, 0, new ResourceUsage(),
+        new HashMap<String, String>());
 
     appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
         new ArrayList<String>());
@@ -120,7 +118,7 @@ public class TestAppSchedulingInfo {
     doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
     AppSchedulingInfo  info = new AppSchedulingInfo(
         appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
-        new ResourceUsage());
+        new ResourceUsage(), new HashMap<String, String>());
     Assert.assertEquals(0, info.getSchedulerKeys().size());
 
     Priority pri1 = Priority.newInstance(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org