You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2017/02/17 14:50:06 UTC

[26/50] [abbrv] hadoop git commit: YARN-6171. ConcurrentModificationException on FSAppAttempt.containersToPreempt. (Miklos Szegedi via kasha)

YARN-6171. ConcurrentModificationException on FSAppAttempt.containersToPreempt. (Miklos Szegedi via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a77f4324
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a77f4324
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a77f4324

Branch: refs/heads/HDFS-10285
Commit: a77f432449aad67da31bd8bf8644b71def741bde
Parents: 5d339c4
Author: Karthik Kambatla <ka...@cloudera.com>
Authored: Thu Feb 16 14:54:51 2017 -0800
Committer: Karthik Kambatla <ka...@cloudera.com>
Committed: Thu Feb 16 14:54:58 2017 -0800

----------------------------------------------------------------------
 .../scheduler/fair/FSAppAttempt.java            | 49 +++++++++++---------
 .../scheduler/fair/FairScheduler.java           | 15 +++---
 2 files changed, 34 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a77f4324/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 563b892..b1bb9a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -83,8 +83,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   private Resource fairShare = Resources.createResource(0, 0);
 
   // Preemption related variables
+  private final Object preemptionVariablesLock = new Object();
   private final Resource preemptedResources = Resources.clone(Resources.none());
   private final Set<RMContainer> containersToPreempt = new HashSet<>();
+
   private Resource fairshareStarvation = Resources.none();
   private long lastTimeAtFairShare;
   private long nextStarvationCheck;
@@ -552,29 +554,29 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   void trackContainerForPreemption(RMContainer container) {
-    if (containersToPreempt.add(container)) {
-      synchronized (preemptedResources) {
+    synchronized (preemptionVariablesLock) {
+      if (containersToPreempt.add(container)) {
         Resources.addTo(preemptedResources, container.getAllocatedResource());
       }
     }
   }
 
   private void untrackContainerForPreemption(RMContainer container) {
-    if (containersToPreempt.remove(container)) {
-      synchronized (preemptedResources) {
+    synchronized (preemptionVariablesLock) {
+      if (containersToPreempt.remove(container)) {
         Resources.subtractFrom(preemptedResources,
             container.getAllocatedResource());
       }
     }
   }
 
-  Set<RMContainer> getPreemptionContainers() {
-    return containersToPreempt;
-  }
-
-  private Resource getPreemptedResources() {
-    synchronized (preemptedResources) {
-      return preemptedResources;
+  Set<ContainerId> getPreemptionContainerIds() {
+    synchronized (preemptionVariablesLock) {
+      Set<ContainerId> preemptionContainerIds = new HashSet<>();
+      for (RMContainer container : containersToPreempt) {
+        preemptionContainerIds.add(container.getContainerId());
+      }
+      return preemptionContainerIds;
     }
   }
 
@@ -591,9 +593,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
       return false;
     }
 
-    if (containersToPreempt.contains(container)) {
-      // The container is already under consideration for preemption
-      return false;
+    synchronized (preemptionVariablesLock) {
+      if (containersToPreempt.contains(container)) {
+        // The container is already under consideration for preemption
+        return false;
+      }
     }
 
     // Check if the app's allocation will be over its fairshare even
@@ -969,7 +973,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
             if (LOG.isTraceEnabled()) {
               LOG.trace("Assign container on " + node.getNodeName()
                   + " node, assignType: OFF_SWITCH" + ", allowedLocality: "
-                  + allowedLocality + ", priority: " + schedulerKey.getPriority()
+                  + allowedLocality + ", priority: "
+                  + schedulerKey.getPriority()
                   + ", app attempt id: " + this.attemptId);
             }
             return assignContainer(node, offswitchAsk, NodeType.OFF_SWITCH,
@@ -1226,13 +1231,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
   @Override
   public Resource getResourceUsage() {
-    /*
-     * getResourcesToPreempt() returns zero, except when there are containers
-     * to preempt. Avoid creating an object in the common case.
-     */
-    return getPreemptedResources().equals(Resources.none())
-        ? getCurrentConsumption()
-        : Resources.subtract(getCurrentConsumption(), getPreemptedResources());
+    // Subtract copies the object, so that we have a snapshot,
+    // in case usage changes, while the caller is using the value
+    synchronized (preemptionVariablesLock) {
+      return containersToPreempt.isEmpty()
+          ? getCurrentConsumption()
+          : Resources.subtract(getCurrentConsumption(), preemptedResources);
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a77f4324/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index c5bf02a..a15e6b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -103,6 +103,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * A scheduler that schedules resources between a set of queues. The scheduler
@@ -831,8 +832,9 @@ public class FairScheduler extends
     // Release containers
     releaseContainers(release, application);
 
+    ReentrantReadWriteLock.WriteLock lock = application.getWriteLock();
+    lock.lock();
     try {
-      application.getWriteLock().lock();
       if (!ask.isEmpty()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(
@@ -847,24 +849,21 @@ public class FairScheduler extends
         application.showRequests();
       }
     } finally {
-      application.getWriteLock().unlock();
+      lock.unlock();
     }
 
+    Set<ContainerId> preemptionContainerIds =
+        application.getPreemptionContainerIds();
     if (LOG.isDebugEnabled()) {
       LOG.debug(
           "allocate: post-update" + " applicationAttemptId=" + appAttemptId
               + " #ask=" + ask.size() + " reservation= " + application
               .getCurrentReservation());
 
-      LOG.debug("Preempting " + application.getPreemptionContainers().size()
+      LOG.debug("Preempting " + preemptionContainerIds.size()
           + " container(s)");
     }
 
-    Set<ContainerId> preemptionContainerIds = new HashSet<ContainerId>();
-    for (RMContainer container : application.getPreemptionContainers()) {
-      preemptionContainerIds.add(container.getContainerId());
-    }
-
     application.updateBlacklist(blacklistAdditions, blacklistRemovals);
 
     List<Container> newlyAllocatedContainers =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org