You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:51:01 UTC
svn commit: r1619012 [17/26] - in
/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project: ./
hadoop-yarn/bin/ hadoop-yarn/conf/ hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-api/
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java Tue Aug 19 23:49:39 2014
@@ -18,17 +18,24 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import com.google.common.annotations.VisibleForTesting;
+
+@Private
+@Unstable
public abstract class QueuePlacementRule {
protected boolean create;
@@ -58,16 +65,20 @@ public abstract class QueuePlacementRule
* continue to the next rule, and null indicates that the app should be rejected.
*/
public String assignAppToQueue(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) throws IOException {
- String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues);
- if (create || configuredQueues.contains(queue)) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ String queue = getQueueForApp(requestedQueue, user, groups,
+ configuredQueues);
+ if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
+ || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
return queue;
} else {
return "";
}
}
- public void initializeFromXml(Element el) {
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
boolean create = true;
NamedNodeMap attributes = el.getAttributes();
Map<String, String> args = new HashMap<String, String>();
@@ -104,15 +115,16 @@ public abstract class QueuePlacementRule
* continue to the next rule.
*/
protected abstract String getQueueForApp(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) throws IOException;
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException;
/**
* Places apps in queues by username of the submitter
*/
public static class User extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups, Collection<String> configuredQueues) {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return "root." + user;
}
@@ -127,9 +139,9 @@ public abstract class QueuePlacementRule
*/
public static class PrimaryGroup extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups,
- Collection<String> configuredQueues) throws IOException {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
return "root." + groups.getGroups(user).get(0);
}
@@ -147,12 +159,15 @@ public abstract class QueuePlacementRule
*/
public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups,
- Collection<String> configuredQueues) throws IOException {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
List<String> groupNames = groups.getGroups(user);
for (int i = 1; i < groupNames.size(); i++) {
- if (configuredQueues.contains("root." + groupNames.get(i))) {
+ String group = groupNames.get(i);
+ if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
+ || configuredQueues.get(FSQueueType.PARENT).contains(
+ "root." + group)) {
return "root." + groupNames.get(i);
}
}
@@ -167,12 +182,83 @@ public abstract class QueuePlacementRule
}
/**
+ * Places apps in queues with name of the submitter under the queue
+ * returned by the nested rule.
+ */
+ public static class NestedUserQueue extends QueuePlacementRule {
+ @VisibleForTesting
+ QueuePlacementRule nestedRule;
+
+ /**
+ * Parse xml and instantiate the nested rule
+ */
+ @Override
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
+ NodeList elements = el.getChildNodes();
+
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ Element element = (Element) node;
+ if ("rule".equals(element.getTagName())) {
+ QueuePlacementRule rule = QueuePlacementPolicy
+ .createAndInitializeRule(node);
+ if (rule == null) {
+ throw new AllocationConfigurationException(
+ "Unable to create nested rule in nestedUserQueue rule");
+ }
+ this.nestedRule = rule;
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ if (this.nestedRule == null) {
+ throw new AllocationConfigurationException(
+ "No nested rule specified in <nestedUserQueue> rule");
+ }
+ super.initializeFromXml(el);
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ // Apply the nested rule
+ String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
+ groups, configuredQueues);
+
+ if (queueName != null && queueName.length() != 0) {
+ if (!queueName.startsWith("root.")) {
+ queueName = "root." + queueName;
+ }
+
+ // Verify if the queue returned by the nested rule is an configured leaf queue,
+ // if yes then skip to next rule in the queue placement policy
+ if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
+ return "";
+ }
+ return queueName + "." + user;
+ }
+ return queueName;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
* Places apps in queues by requested queue of the submitter
*/
public static class Specified extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups, Collection<String> configuredQueues) {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
return "";
} else {
@@ -190,34 +276,61 @@ public abstract class QueuePlacementRule
}
/**
- * Places all apps in the default queue
+ * Places apps in the specified default queue. If no default queue is
+ * specified the app is placed in root.default queue.
*/
public static class Default extends QueuePlacementRule {
+ @VisibleForTesting
+ String defaultQueueName;
+
@Override
- protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) {
- return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+ public QueuePlacementRule initialize(boolean create,
+ Map<String, String> args) {
+ if (defaultQueueName == null) {
+ defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+ }
+ return super.initialize(create, args);
}
@Override
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
+ defaultQueueName = el.getAttribute("queue");
+ if (defaultQueueName != null && !defaultQueueName.isEmpty()) {
+ if (!defaultQueueName.startsWith("root.")) {
+ defaultQueueName = "root." + defaultQueueName;
+ }
+ } else {
+ defaultQueueName = "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
+ }
+ super.initializeFromXml(el);
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
+ return defaultQueueName;
+ }
+
+ @Override
public boolean isTerminal() {
return true;
}
}
-
+
/**
* Rejects all apps
*/
public static class Reject extends QueuePlacementRule {
@Override
public String assignAppToQueue(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return null;
}
@Override
protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
throw new UnsupportedOperationException();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Tue Aug 19 23:49:39 2014
@@ -23,23 +23,18 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
- * A Schedulable represents an entity that can launch tasks, such as a job
- * or a queue. It provides a common interface so that algorithms such as fair
- * sharing can be applied both within a queue and across queues. There are
- * currently two types of Schedulables: JobSchedulables, which represent a
- * single job, and QueueSchedulables, which allocate among jobs in their queue.
- *
- * Separate sets of Schedulables are used for maps and reduces. Each queue has
- * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ * A Schedulable represents an entity that can be scheduled such as an
+ * application or a queue. It provides a common interface so that algorithms
+ * such as fair sharing can be applied both within a queue and across queues.
*
* A Schedulable is responsible for three roles:
- * 1) It can launch tasks through assignTask().
- * 2) It provides information about the job/queue to the scheduler, including:
+ * 1) Assign resources through {@link #assignContainer}.
+ * 2) It provides information about the app/queue to the scheduler, including:
* - Demand (maximum number of tasks required)
- * - Number of currently running tasks
* - Minimum share (for queues)
* - Job/queue weight (for fair sharing)
* - Start time and priority (for FIFO)
@@ -56,64 +51,61 @@ import org.apache.hadoop.yarn.util.resou
*/
@Private
@Unstable
-public abstract class Schedulable {
- /** Fair share assigned to this Schedulable */
- private Resource fairShare = Resources.createResource(0);
-
+public interface Schedulable {
/**
* Name of job/queue, used for debugging as well as for breaking ties in
* scheduling order deterministically.
*/
- public abstract String getName();
+ public String getName();
/**
* Maximum number of resources required by this Schedulable. This is defined as
* number of currently utilized resources + number of unlaunched resources (that
* are either not yet launched or need to be speculated).
*/
- public abstract Resource getDemand();
+ public Resource getDemand();
/** Get the aggregate amount of resources consumed by the schedulable. */
- public abstract Resource getResourceUsage();
+ public Resource getResourceUsage();
/** Minimum Resource share assigned to the schedulable. */
- public abstract Resource getMinShare();
+ public Resource getMinShare();
/** Maximum Resource share assigned to the schedulable. */
- public abstract Resource getMaxShare();
+ public Resource getMaxShare();
/** Job/queue weight in fair sharing. */
- public abstract ResourceWeights getWeights();
+ public ResourceWeights getWeights();
/** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
- public abstract long getStartTime();
+ public long getStartTime();
/** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
- public abstract Priority getPriority();
+ public Priority getPriority();
/** Refresh the Schedulable's demand and those of its children if any. */
- public abstract void updateDemand();
+ public void updateDemand();
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned.
*/
- public abstract Resource assignContainer(FSSchedulerNode node);
+ public Resource assignContainer(FSSchedulerNode node);
- /** Assign a fair share to this Schedulable. */
- public void setFairShare(Resource fairShare) {
- this.fairShare = fairShare;
- }
+ /**
+ * Preempt a container from this Schedulable if possible.
+ */
+ public RMContainer preemptContainer();
/** Get the fair share assigned to this Schedulable. */
- public Resource getFairShare() {
- return fairShare;
- }
-
- /** Convenient toString implementation for debugging. */
- @Override
- public String toString() {
- return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
- getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
- }
+ public Resource getFairShare();
+
+ /** Assign a fair share to this Schedulable. */
+ public void setFairShare(Resource fairShare);
+
+ /**
+ * Returns true if queue has atleast one app running. Always returns true for
+ * AppSchedulables.
+ */
+ public boolean isActive();
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java Tue Aug 19 23:49:39 2014
@@ -139,4 +139,25 @@ public abstract class SchedulingPolicy {
*/
public abstract void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources);
+
+ /**
+ * Check if the resource usage is over the fair share under this policy
+ *
+ * @param usage {@link Resource} the resource usage
+ * @param fairShare {@link Resource} the fair share
+ * @return true if check passes (is over) or false otherwise
+ */
+ public abstract boolean checkIfUsageOverFairShare(
+ Resource usage, Resource fairShare);
+
+ /**
+ * Check if a leaf queue's AM resource usage over its limit under this policy
+ *
+ * @param usage {@link Resource} the resource used by application masters
+ * @param maxAMResource {@link Resource} the maximum allowed resource for
+ * application masters
+ * @return true if AM resource usage is over the limit
+ */
+ public abstract boolean checkIfAMResourceUsageOverLimit(
+ Resource usage, Resource maxAMResource);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Tue Aug 19 23:49:39 2014
@@ -32,5 +32,5 @@ import org.apache.hadoop.conf.Configurab
@Private
@Unstable
public interface WeightAdjuster {
- public double adjustWeight(AppSchedulable app, double curWeight);
+ public double adjustWeight(FSAppAttempt app, double curWeight);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java Tue Aug 19 23:49:39 2014
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
+import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -33,7 +34,31 @@ import org.apache.hadoop.yarn.server.res
public class ComputeFairShares {
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
-
+
+ /**
+ * Compute fair share of the given schedulables.Fair share is an allocation of
+ * shares considering only active schedulables ie schedulables which have
+ * running apps.
+ *
+ * @param schedulables
+ * @param totalResources
+ * @param type
+ */
+ public static void computeShares(
+ Collection<? extends Schedulable> schedulables, Resource totalResources,
+ ResourceType type) {
+ Collection<Schedulable> activeSchedulables = new ArrayList<Schedulable>();
+ for (Schedulable sched : schedulables) {
+ if (sched.isActive()) {
+ activeSchedulables.add(sched);
+ } else {
+ setResourceValue(0, sched.getFairShare(), type);
+ }
+ }
+
+ computeSharesInternal(activeSchedulables, totalResources, type);
+ }
+
/**
* Given a set of Schedulables and a number of slots, compute their weighted
* fair shares. The min and max shares and of the Schedulables are assumed to
@@ -75,7 +100,7 @@ public class ComputeFairShares {
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
* iterations of binary search is a constant (dependent on desired precision).
*/
- public static void computeShares(
+ private static void computeSharesInternal(
Collection<? extends Schedulable> schedulables, Resource totalResources,
ResourceType type) {
if (schedulables.isEmpty()) {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java Tue Aug 19 23:49:39 2014
@@ -70,6 +70,16 @@ public class DominantResourceFairnessPol
}
@Override
+ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+ return !Resources.fitsIn(usage, fairShare);
+ }
+
+ @Override
+ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+ return !Resources.fitsIn(usage, maxAMResource);
+ }
+
+ @Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java Tue Aug 19 23:49:39 2014
@@ -65,6 +65,7 @@ public class FairSharePolicy extends Sch
private static class FairShareComparator implements Comparator<Schedulable>,
Serializable {
private static final long serialVersionUID = 5564969375856699313L;
+ private static final Resource ONE = Resources.createResource(1);
@Override
public int compare(Schedulable s1, Schedulable s2) {
@@ -78,11 +79,10 @@ public class FairSharePolicy extends Sch
s1.getResourceUsage(), minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
s2.getResourceUsage(), minShare2);
- Resource one = Resources.createResource(1);
minShareRatio1 = (double) s1.getResourceUsage().getMemory()
- / Resources.max(RESOURCE_CALCULATOR, null, minShare1, one).getMemory();
+ / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemory();
minShareRatio2 = (double) s2.getResourceUsage().getMemory()
- / Resources.max(RESOURCE_CALCULATOR, null, minShare2, one).getMemory();
+ / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemory();
useToWeightRatio1 = s1.getResourceUsage().getMemory() /
s1.getWeights().getWeight(ResourceType.MEMORY);
useToWeightRatio2 = s2.getResourceUsage().getMemory() /
@@ -120,6 +120,16 @@ public class FairSharePolicy extends Sch
}
@Override
+ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+ return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
+ }
+
+ @Override
+ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+ return usage.getMemory() > maxAMResource.getMemory();
+ }
+
+ @Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java Tue Aug 19 23:49:39 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.re
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
-import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -89,6 +88,18 @@ public class FifoPolicy extends Scheduli
}
@Override
+ public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
+ throw new UnsupportedOperationException(
+ "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " +
+ "as FifoPolicy only works for FSLeafQueue.");
+ }
+
+ @Override
+ public boolean checkIfAMResourceUsageOverLimit(Resource usage, Resource maxAMResource) {
+ return usage.getMemory() > maxAMResource.getMemory();
+ }
+
+ @Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Tue Aug 19 23:49:39 2014
@@ -25,7 +25,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
@@ -38,7 +37,6 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -54,8 +52,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -68,7 +64,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@@ -76,11 +71,10 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.ContainersAndNMTokensAllocation;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -104,7 +98,8 @@ import com.google.common.annotations.Vis
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
-public class FifoScheduler extends AbstractYarnScheduler implements
+public class FifoScheduler extends
+ AbstractYarnScheduler<FiCaSchedulerApp, FiCaSchedulerNode> implements
Configurable {
private static final Log LOG = LogFactory.getLog(FifoScheduler.class);
@@ -114,11 +109,6 @@ public class FifoScheduler extends Abstr
Configuration conf;
- protected Map<NodeId, FiCaSchedulerNode> nodes = new ConcurrentHashMap<NodeId, FiCaSchedulerNode>();
-
- private boolean initialized;
- private Resource minimumAllocation;
- private Resource maximumAllocation;
private boolean usePortForNodeName;
private ActiveUsersManager activeUsersManager;
@@ -185,8 +175,60 @@ public class FifoScheduler extends Abstr
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
}
+
+ @Override
+ public void recoverContainer(Resource clusterResource,
+ SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
+ if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
+ return;
+ }
+ increaseUsedResources(rmContainer);
+ updateAppHeadRoom(schedulerAttempt);
+ updateAvailableResourcesMetrics();
+ }
};
+ public FifoScheduler() {
+ super(FifoScheduler.class.getName());
+ }
+
+ private synchronized void initScheduler(Configuration conf) {
+ validateConf(conf);
+ //Use ConcurrentSkipListMap because applications need to be ordered
+ this.applications =
+ new ConcurrentSkipListMap<ApplicationId, SchedulerApplication<FiCaSchedulerApp>>();
+ this.minimumAllocation =
+ Resources.createResource(conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
+ this.maximumAllocation =
+ Resources.createResource(conf.getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
+ this.usePortForNodeName = conf.getBoolean(
+ YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
+ this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
+ conf);
+ this.activeUsersManager = new ActiveUsersManager(metrics);
+ }
+
+ @Override
+ public void serviceInit(Configuration conf) throws Exception {
+ initScheduler(conf);
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void serviceStart() throws Exception {
+ super.serviceStart();
+ }
+
+ @Override
+ public void serviceStop() throws Exception {
+ super.serviceStop();
+ }
+
@Override
public synchronized void setConf(Configuration conf) {
this.conf = conf;
@@ -218,18 +260,13 @@ public class FifoScheduler extends Abstr
}
@Override
- public Resource getMinimumResourceCapability() {
- return minimumAllocation;
- }
-
- @Override
public int getNumClusterNodes() {
return nodes.size();
}
-
+
@Override
- public Resource getMaximumResourceCapability() {
- return maximumAllocation;
+ public synchronized void setRMContext(RMContext rmContext) {
+ this.rmContext = rmContext;
}
@Override
@@ -237,31 +274,8 @@ public class FifoScheduler extends Abstr
reinitialize(Configuration conf, RMContext rmContext) throws IOException
{
setConf(conf);
- if (!this.initialized) {
- validateConf(conf);
- this.rmContext = rmContext;
- //Use ConcurrentSkipListMap because applications need to be ordered
- this.applications =
- new ConcurrentSkipListMap<ApplicationId, SchedulerApplication>();
- this.minimumAllocation =
- Resources.createResource(conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB));
- this.maximumAllocation =
- Resources.createResource(conf.getInt(
- YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
- this.usePortForNodeName = conf.getBoolean(
- YarnConfiguration.RM_SCHEDULER_INCLUDE_PORT_IN_NODE_NAME,
- YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
- this.metrics = QueueMetrics.forQueue(DEFAULT_QUEUE_NAME, null, false,
- conf);
- this.activeUsersManager = new ActiveUsersManager(metrics);
- this.initialized = true;
- }
}
-
@Override
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
@@ -278,21 +292,7 @@ public class FifoScheduler extends Abstr
clusterResource, minimumAllocation, maximumAllocation);
// Release containers
- for (ContainerId releasedContainer : release) {
- RMContainer rmContainer = getRMContainer(releasedContainer);
- if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FifoScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainer);
- }
- containerCompleted(rmContainer,
- SchedulerUtils.createAbnormalContainerStatus(
- releasedContainer,
- SchedulerUtils.RELEASED_CONTAINER),
- RMContainerEventType.RELEASED);
- }
+ releaseContainers(release, application);
synchronized (application) {
@@ -332,50 +332,35 @@ public class FifoScheduler extends Abstr
}
}
- @VisibleForTesting
- FiCaSchedulerApp getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
- SchedulerApplication app =
- applications.get(applicationAttemptId.getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
- @Override
- public SchedulerAppReport getSchedulerAppInfo(
- ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : new SchedulerAppReport(app);
- }
-
- @Override
- public ApplicationResourceUsageReport getAppResourceUsageReport(
- ApplicationAttemptId applicationAttemptId) {
- FiCaSchedulerApp app = getApplicationAttempt(applicationAttemptId);
- return app == null ? null : app.getResourceUsageReport();
- }
-
private FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
- private synchronized void addApplication(ApplicationId applicationId,
- String queue, String user) {
- SchedulerApplication application =
- new SchedulerApplication(DEFAULT_QUEUE, user);
+ @VisibleForTesting
+ public synchronized void addApplication(ApplicationId applicationId,
+ String queue, String user, boolean isAppRecovering) {
+ SchedulerApplication<FiCaSchedulerApp> application =
+ new SchedulerApplication<FiCaSchedulerApp>(DEFAULT_QUEUE, user);
applications.put(applicationId, application);
metrics.submitApp(user);
LOG.info("Accepted application " + applicationId + " from user: " + user
+ ", currently num of applications: " + applications.size());
- rmContext.getDispatcher().getEventHandler()
+ if (isAppRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler()
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
+ }
}
- private synchronized void
+ @VisibleForTesting
+ public synchronized void
addApplicationAttempt(ApplicationAttemptId appAttemptId,
- boolean transferStateFromPreviousAttempt) {
- SchedulerApplication application =
+ boolean transferStateFromPreviousAttempt,
+ boolean isAttemptRecovering) {
+ SchedulerApplication<FiCaSchedulerApp> application =
applications.get(appAttemptId.getApplicationId());
String user = application.getUser();
// TODO: Fix store
@@ -392,14 +377,22 @@ public class FifoScheduler extends Abstr
metrics.submitAppAttempt(user);
LOG.info("Added Application Attempt " + appAttemptId
+ " to scheduler from user " + application.getUser());
- rmContext.getDispatcher().getEventHandler().handle(
+ if (isAttemptRecovering) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(appAttemptId
+ + " is recovering. Skipping notifying ATTEMPT_ADDED");
+ }
+ } else {
+ rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptEvent(appAttemptId,
RMAppAttemptEventType.ATTEMPT_ADDED));
+ }
}
private synchronized void doneApplication(ApplicationId applicationId,
RMAppState finalState) {
- SchedulerApplication application = applications.get(applicationId);
+ SchedulerApplication<FiCaSchedulerApp> application =
+ applications.get(applicationId);
if (application == null){
LOG.warn("Couldn't find application " + applicationId);
return;
@@ -417,7 +410,7 @@ public class FifoScheduler extends Abstr
RMAppAttemptState rmAppAttemptFinalState, boolean keepContainers)
throws IOException {
FiCaSchedulerApp attempt = getApplicationAttempt(applicationAttemptId);
- SchedulerApplication application =
+ SchedulerApplication<FiCaSchedulerApp> application =
applications.get(applicationAttemptId.getApplicationId());
if (application == null || attempt == null) {
throw new IOException("Unknown application " + applicationAttemptId +
@@ -433,7 +426,7 @@ public class FifoScheduler extends Abstr
LOG.info("Skip killing " + container.getContainerId());
continue;
}
- containerCompleted(container,
+ completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
@@ -454,10 +447,13 @@ public class FifoScheduler extends Abstr
" #applications=" + applications.size());
// Try to assign containers to applications in fifo order
- for (Map.Entry<ApplicationId, SchedulerApplication> e : applications
+ for (Map.Entry<ApplicationId, SchedulerApplication<FiCaSchedulerApp>> e : applications
.entrySet()) {
- FiCaSchedulerApp application =
- (FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
+ FiCaSchedulerApp application = e.getValue().getCurrentAppAttempt();
+ if (application == null) {
+ continue;
+ }
+
LOG.debug("pre-assignContainers");
application.showRequests();
synchronized (application) {
@@ -494,10 +490,13 @@ public class FifoScheduler extends Abstr
// Update the applications' headroom to correctly take into
// account the containers assigned in this update.
- for (SchedulerApplication application : applications.values()) {
+ for (SchedulerApplication<FiCaSchedulerApp> application : applications.values()) {
FiCaSchedulerApp attempt =
(FiCaSchedulerApp) application.getCurrentAppAttempt();
- attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
+ if (attempt == null) {
+ continue;
+ }
+ updateAppHeadRoom(attempt);
}
}
@@ -668,11 +667,10 @@ public class FifoScheduler extends Abstr
application.allocate(type, node, priority, request, container);
// Inform the node
- node.allocateContainer(application.getApplicationId(),
- rmContainer);
+ node.allocateContainer(rmContainer);
// Update usage for this container
- Resources.addTo(usedResource, capability);
+ increaseUsedResources(rmContainer);
}
}
@@ -702,7 +700,7 @@ public class FifoScheduler extends Abstr
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId);
- containerCompleted(getRMContainer(containerId),
+ completedContainer(getRMContainer(containerId),
completedContainer, RMContainerEventType.FINISHED);
}
@@ -716,9 +714,22 @@ public class FifoScheduler extends Abstr
LOG.debug("Node after allocation " + rmNode.getNodeID() + " resource = "
+ node.getAvailableResource());
}
-
- metrics.setAvailableResourcesToQueue(
- Resources.subtract(clusterResource, usedResource));
+
+ updateAvailableResourcesMetrics();
+ }
+
+ private void increaseUsedResources(RMContainer rmContainer) {
+ Resources.addTo(usedResource, rmContainer.getAllocatedResource());
+ }
+
+ private void updateAppHeadRoom(SchedulerApplicationAttempt schedulerAttempt) {
+ schedulerAttempt.setHeadroom(Resources.subtract(clusterResource,
+ usedResource));
+ }
+
+ private void updateAvailableResourcesMetrics() {
+ metrics.setAvailableResourcesToQueue(Resources.subtract(clusterResource,
+ usedResource));
}
@Override
@@ -728,6 +739,9 @@ public class FifoScheduler extends Abstr
{
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
+ recoverContainersOnNode(nodeAddedEvent.getContainerReports(),
+ nodeAddedEvent.getAddedRMNode());
+
}
break;
case NODE_REMOVED:
@@ -747,7 +761,8 @@ public class FifoScheduler extends Abstr
{
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
addApplication(appAddedEvent.getApplicationId(),
- appAddedEvent.getQueue(), appAddedEvent.getUser());
+ appAddedEvent.getQueue(), appAddedEvent.getUser(),
+ appAddedEvent.getIsAppRecovering());
}
break;
case APP_REMOVED:
@@ -762,7 +777,8 @@ public class FifoScheduler extends Abstr
AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
(AppAttemptAddedSchedulerEvent) event;
addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
- appAttemptAddedEvent.getTransferStateFromPreviousAttempt());
+ appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
+ appAttemptAddedEvent.getIsAttemptRecovering());
}
break;
case APP_ATTEMPT_REMOVED:
@@ -785,7 +801,7 @@ public class FifoScheduler extends Abstr
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
ContainerId containerid = containerExpiredEvent.getContainerId();
- containerCompleted(getRMContainer(containerid),
+ completedContainer(getRMContainer(containerid),
SchedulerUtils.createAbnormalContainerStatus(
containerid,
SchedulerUtils.EXPIRED_CONTAINER),
@@ -797,25 +813,9 @@ public class FifoScheduler extends Abstr
}
}
- private void containerLaunchedOnNode(ContainerId containerId, FiCaSchedulerNode node) {
- // Get the application for the finished container
- FiCaSchedulerApp application = getCurrentAttemptForContainer(containerId);
- if (application == null) {
- LOG.info("Unknown application "
- + containerId.getApplicationAttemptId().getApplicationId()
- + " launched container " + containerId + " on node: " + node);
- // Some unknown container sneaked into the system. Kill it.
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
-
- return;
- }
-
- application.containerLaunchedOnNode(containerId, node.getNodeID());
- }
-
@Lock(FifoScheduler.class)
- private synchronized void containerCompleted(RMContainer rmContainer,
+ @Override
+ protected synchronized void completedContainer(RMContainer rmContainer,
ContainerStatus containerStatus, RMContainerEventType event) {
if (rmContainer == null) {
LOG.info("Null container completed...");
@@ -856,7 +856,6 @@ public class FifoScheduler extends Abstr
}
- private Resource clusterResource = recordFactory.newRecordInstance(Resource.class);
private Resource usedResource = recordFactory.newRecordInstance(Resource.class);
private synchronized void removeNode(RMNode nodeInfo) {
@@ -866,7 +865,7 @@ public class FifoScheduler extends Abstr
}
// Kill running containers
for(RMContainer container : node.getRunningContainers()) {
- containerCompleted(container,
+ completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
SchedulerUtils.LOST_CONTAINER),
@@ -903,28 +902,11 @@ public class FifoScheduler extends Abstr
}
@Override
- public synchronized SchedulerNodeReport getNodeReport(NodeId nodeId) {
- FiCaSchedulerNode node = getNode(nodeId);
- return node == null ? null : new SchedulerNodeReport(node);
- }
-
- @Override
public RMContainer getRMContainer(ContainerId containerId) {
FiCaSchedulerApp attempt = getCurrentAttemptForContainer(containerId);
return (attempt == null) ? null : attempt.getRMContainer(containerId);
}
- private FiCaSchedulerApp getCurrentAttemptForContainer(
- ContainerId containerId) {
- SchedulerApplication app =
- applications.get(containerId.getApplicationAttemptId()
- .getApplicationId());
- if (app != null) {
- return (FiCaSchedulerApp) app.getCurrentAppAttempt();
- }
- return null;
- }
-
@Override
public QueueMetrics getRootQueueMetrics() {
return DEFAULT_QUEUE.getMetrics();
@@ -935,13 +917,14 @@ public class FifoScheduler extends Abstr
QueueACL acl, String queueName) {
return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
}
-
+
@Override
- public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
+ public synchronized List<ApplicationAttemptId>
+ getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
- List<ApplicationAttemptId> attempts = new ArrayList<ApplicationAttemptId>(
- applications.size());
- for (SchedulerApplication app : applications.values()) {
+ List<ApplicationAttemptId> attempts =
+ new ArrayList<ApplicationAttemptId>(applications.size());
+ for (SchedulerApplication<FiCaSchedulerApp> app : applications.values()) {
attempts.add(app.getCurrentAppAttempt().getApplicationAttemptId());
}
return attempts;
@@ -950,4 +933,7 @@ public class FifoScheduler extends Abstr
}
}
+ public Resource getUsedResource() {
+ return usedResource;
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/AMRMTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -19,22 +19,32 @@
package org.apache.hadoop.yarn.server.resourcemanager.security;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.security.SecureRandom;
+import java.util.HashSet;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-
-import javax.crypto.SecretKey;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.MasterKey;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
+import org.apache.hadoop.yarn.server.security.MasterKeyData;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* AMRM-tokens are per ApplicationAttempt. If users redistribute their
@@ -49,40 +59,73 @@ public class AMRMTokenSecretManager exte
private static final Log LOG = LogFactory
.getLog(AMRMTokenSecretManager.class);
- private SecretKey masterKey;
+ private int serialNo = new SecureRandom().nextInt();
+ private MasterKeyData nextMasterKey;
+ private MasterKeyData currentMasterKey;
+
+ private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final Lock readLock = readWriteLock.readLock();
+ private final Lock writeLock = readWriteLock.writeLock();
+
private final Timer timer;
private final long rollingInterval;
+ private final long activationDelay;
+ private RMContext rmContext;
- private final Map<ApplicationAttemptId, byte[]> passwords =
- new HashMap<ApplicationAttemptId, byte[]>();
+ private final Set<ApplicationAttemptId> appAttemptSet =
+ new HashSet<ApplicationAttemptId>();
/**
* Create an {@link AMRMTokenSecretManager}
*/
- public AMRMTokenSecretManager(Configuration conf) {
- rollMasterKey();
+ public AMRMTokenSecretManager(Configuration conf, RMContext rmContext) {
+ this.rmContext = rmContext;
this.timer = new Timer();
this.rollingInterval =
conf
.getLong(
YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS,
YarnConfiguration.DEFAULT_RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS) * 1000;
+ // Adding delay = 1.5 * expiry interval makes sure that all active AMs get
+ // the updated shared-key.
+ this.activationDelay =
+ (long) (conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS) * 1.5);
+ LOG.info("AMRMTokenKeyRollingInterval: " + this.rollingInterval
+ + "ms and AMRMTokenKeyActivationDelay: " + this.activationDelay + " ms");
+ if (rollingInterval <= activationDelay * 2) {
+ throw new IllegalArgumentException(
+ YarnConfiguration.RM_AMRM_TOKEN_MASTER_KEY_ROLLING_INTERVAL_SECS
+ + " should be more than 2 X "
+ + YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS);
+ }
}
public void start() {
- this.timer.scheduleAtFixedRate(new MasterKeyRoller(), 0, rollingInterval);
+ if (this.currentMasterKey == null) {
+ this.currentMasterKey = createNewMasterKey();
+ AMRMTokenSecretManagerState state =
+ AMRMTokenSecretManagerState.newInstance(
+ this.currentMasterKey.getMasterKey(), null);
+ rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
+ false);
+ }
+ this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
+ rollingInterval);
}
public void stop() {
this.timer.cancel();
}
- public synchronized void applicationMasterFinished(
- ApplicationAttemptId appAttemptId) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Application finished, removing password for " + appAttemptId);
+ public void applicationMasterFinished(ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Application finished, removing password for " + appAttemptId);
+ this.appAttemptSet.remove(appAttemptId);
+ } finally {
+ this.writeLock.unlock();
}
- this.passwords.remove(appAttemptId);
}
private class MasterKeyRoller extends TimerTask {
@@ -93,49 +136,100 @@ public class AMRMTokenSecretManager exte
}
@Private
- public synchronized void setMasterKey(SecretKey masterKey) {
- this.masterKey = masterKey;
+ void rollMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Rolling master-key for amrm-tokens");
+ this.nextMasterKey = createNewMasterKey();
+ AMRMTokenSecretManagerState state =
+ AMRMTokenSecretManagerState.newInstance(
+ this.currentMasterKey.getMasterKey(),
+ this.nextMasterKey.getMasterKey());
+ rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
+ true);
+ this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+ } finally {
+ this.writeLock.unlock();
+ }
}
- @Private
- public synchronized SecretKey getMasterKey() {
- return this.masterKey;
+ private class NextKeyActivator extends TimerTask {
+ @Override
+ public void run() {
+ activateNextMasterKey();
+ }
+ }
+
+ public void activateNextMasterKey() {
+ this.writeLock.lock();
+ try {
+ LOG.info("Activating next master key with id: "
+ + this.nextMasterKey.getMasterKey().getKeyId());
+ this.currentMasterKey = this.nextMasterKey;
+ this.nextMasterKey = null;
+ AMRMTokenSecretManagerState state =
+ AMRMTokenSecretManagerState.newInstance(
+ this.currentMasterKey.getMasterKey(), null);
+ rmContext.getStateStore().storeOrUpdateAMRMTokenSecretManagerState(state,
+ true);
+ } finally {
+ this.writeLock.unlock();
+ }
}
@Private
- synchronized void rollMasterKey() {
- LOG.info("Rolling master-key for amrm-tokens");
- this.masterKey = generateSecret();
+ @VisibleForTesting
+ public MasterKeyData createNewMasterKey() {
+ this.writeLock.lock();
+ try {
+ return new MasterKeyData(serialNo++, generateSecret());
+ } finally {
+ this.writeLock.unlock();
+ }
}
- /**
- * Create a password for a given {@link AMRMTokenIdentifier}. Used to
- * send to the AppicationAttempt which can give it back during authentication.
- */
- @Override
- public synchronized byte[] createPassword(
- AMRMTokenIdentifier identifier) {
- ApplicationAttemptId applicationAttemptId =
- identifier.getApplicationAttemptId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Creating password for " + applicationAttemptId);
- }
- byte[] password = createPassword(identifier.getBytes(), masterKey);
- this.passwords.put(applicationAttemptId, password);
- return password;
+ public Token<AMRMTokenIdentifier> createAndGetAMRMToken(
+ ApplicationAttemptId appAttemptId) {
+ this.writeLock.lock();
+ try {
+ LOG.info("Create AMRMToken for ApplicationAttempt: " + appAttemptId);
+ AMRMTokenIdentifier identifier =
+ new AMRMTokenIdentifier(appAttemptId, getMasterKey().getMasterKey()
+ .getKeyId());
+ byte[] password = this.createPassword(identifier);
+ appAttemptSet.add(appAttemptId);
+ return new Token<AMRMTokenIdentifier>(identifier.getBytes(), password,
+ identifier.getKind(), new Text());
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ // If nextMasterKey is not Null, then return nextMasterKey
+ // otherwise return currentMasterKey
+ @VisibleForTesting
+ public MasterKeyData getMasterKey() {
+ this.readLock.lock();
+ try {
+ return nextMasterKey == null ? currentMasterKey : nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
}
/**
* Populate persisted password of AMRMToken back to AMRMTokenSecretManager.
*/
- public synchronized void
- addPersistedPassword(Token<AMRMTokenIdentifier> token) throws IOException {
- AMRMTokenIdentifier identifier = token.decodeIdentifier();
- if (LOG.isDebugEnabled()) {
+ public void addPersistedPassword(Token<AMRMTokenIdentifier> token)
+ throws IOException {
+ this.writeLock.lock();
+ try {
+ AMRMTokenIdentifier identifier = token.decodeIdentifier();
LOG.debug("Adding password for " + identifier.getApplicationAttemptId());
+ appAttemptSet.add(identifier.getApplicationAttemptId());
+ } finally {
+ this.writeLock.unlock();
}
- this.passwords.put(identifier.getApplicationAttemptId(),
- token.getPassword());
}
/**
@@ -143,19 +237,33 @@ public class AMRMTokenSecretManager exte
* Used by RPC layer to validate a remote {@link AMRMTokenIdentifier}.
*/
@Override
- public synchronized byte[] retrievePassword(
- AMRMTokenIdentifier identifier) throws InvalidToken {
- ApplicationAttemptId applicationAttemptId =
- identifier.getApplicationAttemptId();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Trying to retrieve password for " + applicationAttemptId);
- }
- byte[] password = this.passwords.get(applicationAttemptId);
- if (password == null) {
- throw new InvalidToken("Password not found for ApplicationAttempt "
- + applicationAttemptId);
+ public byte[] retrievePassword(AMRMTokenIdentifier identifier)
+ throws InvalidToken {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to retrieve password for " + applicationAttemptId);
+ }
+ if (!appAttemptSet.contains(applicationAttemptId)) {
+ throw new InvalidToken(applicationAttemptId
+ + " not found in AMRMTokenSecretManager.");
+ }
+ if (identifier.getKeyId() == this.currentMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.currentMasterKey.getSecretKey());
+ } else if (nextMasterKey != null
+ && identifier.getKeyId() == this.nextMasterKey.getMasterKey()
+ .getKeyId()) {
+ return createPassword(identifier.getBytes(),
+ this.nextMasterKey.getSecretKey());
+ }
+ throw new InvalidToken("Invalid AMRMToken from " + applicationAttemptId);
+ } finally {
+ this.readLock.unlock();
}
- return password;
}
/**
@@ -167,4 +275,61 @@ public class AMRMTokenSecretManager exte
return new AMRMTokenIdentifier();
}
+ @Private
+ @VisibleForTesting
+ public MasterKeyData getCurrnetMasterKeyData() {
+ this.readLock.lock();
+ try {
+ return this.currentMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Private
+ @VisibleForTesting
+ public MasterKeyData getNextMasterKeyData() {
+ this.readLock.lock();
+ try {
+ return this.nextMasterKey;
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
+ @Private
+ protected byte[] createPassword(AMRMTokenIdentifier identifier) {
+ this.readLock.lock();
+ try {
+ ApplicationAttemptId applicationAttemptId =
+ identifier.getApplicationAttemptId();
+ LOG.info("Creating password for " + applicationAttemptId);
+ return createPassword(identifier.getBytes(), getMasterKey()
+ .getSecretKey());
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ public void recover(RMState state) {
+ if (state.getAMRMTokenSecretManagerState() != null) {
+ // recover the current master key
+ MasterKey currentKey =
+ state.getAMRMTokenSecretManagerState().getCurrentMasterKey();
+ this.currentMasterKey =
+ new MasterKeyData(currentKey, createSecretKey(currentKey.getBytes()
+ .array()));
+
+ // recover the next master key if not null
+ MasterKey nextKey =
+ state.getAMRMTokenSecretManagerState().getNextMasterKey();
+ if (nextKey != null) {
+ this.nextMasterKey =
+ new MasterKeyData(nextKey, createSecretKey(nextKey.getBytes()
+ .array()));
+ this.timer.schedule(new NextKeyActivator(), this.activationDelay);
+ }
+ }
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Aug 19 23:49:39 2014
@@ -388,7 +388,11 @@ public class DelegationTokenRenewer exte
// If user provides incorrect token then it should not be added for
// renewal.
for (DelegationTokenToRenew dtr : tokenList) {
- renewToken(dtr);
+ try {
+ renewToken(dtr);
+ } catch (IOException ioe) {
+ throw new IOException("Failed to renew token: " + dtr.token, ioe);
+ }
}
for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -34,8 +35,8 @@ import org.apache.hadoop.yarn.security.C
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.security.MasterKeyData;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
/**
* SecretManager for ContainerTokens. This is RM-specific and rolls the
@@ -169,11 +170,13 @@ public class RMContainerTokenSecretManag
* @param nodeId
* @param appSubmitter
* @param capability
+ * @param priority
+ * @param createTime
* @return the container-token
*/
- public Token
- createContainerToken(ContainerId containerId, NodeId nodeId,
- String appSubmitter, Resource capability) {
+ public Token createContainerToken(ContainerId containerId, NodeId nodeId,
+ String appSubmitter, Resource capability, Priority priority,
+ long createTime) {
byte[] password;
ContainerTokenIdentifier tokenIdentifier;
long expiryTimeStamp =
@@ -185,7 +188,8 @@ public class RMContainerTokenSecretManag
tokenIdentifier =
new ContainerTokenIdentifier(containerId, nodeId.toString(),
appSubmitter, capability, expiryTimeStamp, this.currentMasterKey
- .getMasterKey().getKeyId(), ResourceManager.getClusterTimeStamp());
+ .getMasterKey().getKeyId(),
+ ResourceManager.getClusterTimeStamp(), priority, createTime);
password = this.createPassword(tokenIdentifier);
} finally {
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMDelegationTokenSecretManager.java Tue Aug 19 23:49:39 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager.DelegationTokenInformation;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -193,4 +195,14 @@ public class RMDelegationTokenSecretMana
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
+
+ public long getRenewDate(RMDelegationTokenIdentifier ident)
+ throws InvalidToken {
+ DelegationTokenInformation info = currentTokens.get(ident);
+ if (info == null) {
+ throw new InvalidToken("token (" + ident.toString()
+ + ") can't be found in cache");
+ }
+ return info.getRenewDate();
+ }
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppBlock.java Tue Aug 19 23:49:39 2014
@@ -36,7 +36,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -110,19 +112,42 @@ public class AppBlock extends HtmlBlock
setTitle(join("Application ", aid));
- info("Application Overview").
- _("User:", app.getUser()).
- _("Name:", app.getName()).
- _("Application Type:", app.getApplicationType()).
- _("Application Tags:", app.getApplicationTags()).
- _("State:", app.getState()).
- _("FinalStatus:", app.getFinalStatus()).
- _("Started:", Times.format(app.getStartTime())).
- _("Elapsed:", StringUtils.formatTime(
- Times.elapsed(app.getStartTime(), app.getFinishTime()))).
- _("Tracking URL:", !app.isTrackingUrlReady() ?
- "#" : app.getTrackingUrlPretty(), app.getTrackingUI()).
- _("Diagnostics:", app.getNote());
+ RMAppMetrics appMerics = rmApp.getRMAppMetrics();
+ RMAppAttemptMetrics attemptMetrics =
+ rmApp.getCurrentAppAttempt().getRMAppAttemptMetrics();
+ info("Application Overview")
+ ._("User:", app.getUser())
+ ._("Name:", app.getName())
+ ._("Application Type:", app.getApplicationType())
+ ._("Application Tags:", app.getApplicationTags())
+ ._("State:", app.getState())
+ ._("FinalStatus:", app.getFinalStatus())
+ ._("Started:", Times.format(app.getStartTime()))
+ ._("Elapsed:",
+ StringUtils.formatTime(Times.elapsed(app.getStartTime(),
+ app.getFinishTime())))
+ ._("Tracking URL:",
+ !app.isTrackingUrlReady() ? "#" : app.getTrackingUrlPretty(),
+ app.getTrackingUI())
+ ._("Diagnostics:", app.getNote());
+
+ DIV<Hamlet> pdiv = html.
+ _(InfoBlock.class).
+ div(_INFO_WRAP);
+ info("Application Overview").clear();
+ info("Application Metrics")
+ ._("Total Resource Preempted:",
+ appMerics.getResourcePreempted())
+ ._("Total Number of Non-AM Containers Preempted:",
+ String.valueOf(appMerics.getNumNonAMContainersPreempted()))
+ ._("Total Number of AM Containers Preempted:",
+ String.valueOf(appMerics.getNumAMContainersPreempted()))
+ ._("Resource Preempted from Current Attempt:",
+ attemptMetrics.getResourcePreempted())
+ ._("Number of Non-AM Containers Preempted from Current Attempt:",
+ String.valueOf(attemptMetrics
+ .getNumNonAMContainersPreempted()));
+ pdiv._();
Collection<RMAppAttempt> attempts = rmApp.getAppAttempts().values();
String amString =
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Tue Aug 19 23:49:39 2014
@@ -23,7 +23,6 @@ import static org.apache.hadoop.yarn.uti
import java.util.ArrayList;
import org.apache.hadoop.util.StringUtils;
-import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
@@ -109,7 +108,7 @@ class CapacitySchedulerPage extends RmVi
_("Absolute Used Capacity:", percent(lqinfo.getAbsoluteUsedCapacity() / 100)).
_("Absolute Capacity:", percent(lqinfo.getAbsoluteCapacity() / 100)).
_("Absolute Max Capacity:", percent(lqinfo.getAbsoluteMaxCapacity() / 100)).
- _("Used Resources:", StringEscapeUtils.escapeHtml(lqinfo.getResourcesUsed().toString())).
+ _("Used Resources:", lqinfo.getResourcesUsed().toString()).
_("Num Schedulable Applications:", Integer.toString(lqinfo.getNumActiveApplications())).
_("Num Non-Schedulable Applications:", Integer.toString(lqinfo.getNumPendingApplications())).
_("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
@@ -263,7 +262,7 @@ class CapacitySchedulerPage extends RmVi
" var q = $('.q', data.rslt.obj).first().text();",
" if (q == 'root') q = '';",
" else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';",
- " $('#apps').dataTable().fnFilter(q, 3, true);",
+ " $('#apps').dataTable().fnFilter(q, 4, true);",
" });",
" $('#cs').show();",
"});")._().
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/DefaultSchedulerPage.java Tue Aug 19 23:49:39 2014
@@ -137,7 +137,7 @@ class DefaultSchedulerPage extends RmVie
" $('#cs').bind('select_node.jstree', function(e, data) {",
" var q = $('.q', data.rslt.obj).first().text();",
" if (q == 'root') q = '';",
- " $('#apps').dataTable().fnFilter(q, 3);",
+ " $('#apps').dataTable().fnFilter(q, 4);",
" });",
" $('#cs').show();",
"});")._();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerAppsBlock.java Tue Aug 19 23:49:39 2014
@@ -25,6 +25,8 @@ import static org.apache.hadoop.yarn.web
import java.util.Collection;
import java.util.HashSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang.StringEscapeUtils;
@@ -35,6 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -60,7 +63,14 @@ public class FairSchedulerAppsBlock exte
super(ctx);
FairScheduler scheduler = (FairScheduler) rm.getResourceScheduler();
fsinfo = new FairSchedulerInfo(scheduler);
- apps = rmContext.getRMApps();
+ apps = new ConcurrentHashMap<ApplicationId, RMApp>();
+ for (Map.Entry<ApplicationId, RMApp> entry : rmContext.getRMApps().entrySet()) {
+ if (!(RMAppState.NEW.equals(entry.getValue().getState())
+ || RMAppState.NEW_SAVING.equals(entry.getValue().getState())
+ || RMAppState.SUBMITTED.equals(entry.getValue().getState()))) {
+ apps.put(entry.getKey(), entry.getValue());
+ }
+ }
this.conf = conf;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java Tue Aug 19 23:49:39 2014
@@ -22,7 +22,6 @@ import static org.apache.hadoop.yarn.uti
import java.util.Collection;
-import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.FairSchedulerInfo;
@@ -65,20 +64,16 @@ public class FairSchedulerPage extends R
@Override
protected void render(Block html) {
ResponseInfo ri = info("\'" + qinfo.getQueueName() + "\' Queue Status").
- _("Used Resources:", StringEscapeUtils.escapeHtml(
- qinfo.getUsedResources().toString())).
+ _("Used Resources:", qinfo.getUsedResources().toString()).
_("Num Active Applications:", qinfo.getNumActiveApplications()).
_("Num Pending Applications:", qinfo.getNumPendingApplications()).
- _("Min Resources:", StringEscapeUtils.escapeHtml(
- qinfo.getMinResources().toString())).
- _("Max Resources:", StringEscapeUtils.escapeHtml(
- qinfo.getMaxResources().toString()));
+ _("Min Resources:", qinfo.getMinResources().toString()).
+ _("Max Resources:", qinfo.getMaxResources().toString());
int maxApps = qinfo.getMaxApplications();
if (maxApps < Integer.MAX_VALUE) {
ri._("Max Running Applications:", qinfo.getMaxApplications());
}
- ri._("Fair Share:", StringEscapeUtils.escapeHtml(
- qinfo.getFairShare().toString()));
+ ri._("Fair Share:", qinfo.getFairShare().toString());
html._(InfoBlock.class);
@@ -215,7 +210,7 @@ public class FairSchedulerPage extends R
" var q = $('.q', data.rslt.obj).first().text();",
" if (q == 'root') q = '';",
" else q = '^' + q.substr(q.lastIndexOf('.') + 1) + '$';",
- " $('#apps').dataTable().fnFilter(q, 3, true);",
+ " $('#apps').dataTable().fnFilter(q, 4, true);",
" });",
" $('#cs').show();",
"});")._().