You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/09 23:00:40 UTC

hadoop git commit: YARN-2910. FSLeafQueue can throw ConcurrentModificationException. (Wilfred Spiegelenburg via kasha)

Repository: hadoop
Updated Branches:
  refs/heads/trunk 134061753 -> a2e07a545


YARN-2910. FSLeafQueue can throw ConcurrentModificationException. (Wilfred Spiegelenburg via kasha)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a2e07a54
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a2e07a54
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a2e07a54

Branch: refs/heads/trunk
Commit: a2e07a54561a57a83b943628ebbc53ed5ba52718
Parents: 1340617
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Dec 9 14:00:31 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Dec 9 14:00:31 2014 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/fair/FSAppAttempt.java            |   2 +-
 .../scheduler/fair/FSLeafQueue.java             | 151 +++++++++++++------
 .../scheduler/fair/TestFSLeafQueue.java         |  93 +++++++++++-
 4 files changed, 199 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d06c831..d87322f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -203,6 +203,9 @@ Release 2.7.0 - UNRELEASED
     YARN-2931. PublicLocalizer may fail until directory is initialized by
     LocalizeRunner. (Anubhav Dhoot via kasha)
 
+    YARN-2910. FSLeafQueue can throw ConcurrentModificationException. 
+    (Wilfred Spiegelenburg via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index b9966e7..b23ec3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -172,7 +172,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   }
 
   @Override
-  public synchronized Resource getHeadroom() {
+  public Resource getHeadroom() {
     final FSQueue queue = (FSQueue) this.queue;
     SchedulingPolicy policy = queue.getPolicy();
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 345ea8b..bbf1be7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -23,6 +23,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -50,6 +53,10 @@ public class FSLeafQueue extends FSQueue {
       new ArrayList<FSAppAttempt>();
   private final List<FSAppAttempt> nonRunnableApps =
       new ArrayList<FSAppAttempt>();
+  // get a lock with fair distribution for app list updates
+  private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+  private final Lock readLock = rwl.readLock();
+  private final Lock writeLock = rwl.writeLock();
   
   private Resource demand = Resources.createResource(0);
   
@@ -72,16 +79,26 @@ public class FSLeafQueue extends FSQueue {
   }
   
   public void addApp(FSAppAttempt app, boolean runnable) {
-    if (runnable) {
-      runnableApps.add(app);
-    } else {
-      nonRunnableApps.add(app);
+    writeLock.lock();
+    try {
+      if (runnable) {
+        runnableApps.add(app);
+      } else {
+        nonRunnableApps.add(app);
+      }
+    } finally {
+      writeLock.unlock();
     }
   }
   
   // for testing
   void addAppSchedulable(FSAppAttempt appSched) {
-    runnableApps.add(appSched);
+    writeLock.lock();
+    try {
+      runnableApps.add(appSched);
+    } finally {
+      writeLock.unlock();
+    }
   }
   
   /**
@@ -89,18 +106,25 @@ public class FSLeafQueue extends FSQueue {
    * @return whether or not the app was runnable
    */
   public boolean removeApp(FSAppAttempt app) {
-    if (runnableApps.remove(app)) {
-      // Update AM resource usage
-      if (app.isAmRunning() && app.getAMResource() != null) {
-        Resources.subtractFrom(amResourceUsage, app.getAMResource());
+    boolean runnable = false;
+    writeLock.lock();
+    try {
+      if (runnableApps.remove(app)) {
+        runnable = true;
+      } else if (nonRunnableApps.remove(app)) {
+        runnable = false; //nop, runnable is initialised to false already
+      } else {
+        throw new IllegalStateException("Given app to remove " + app +
+            " does not exist in queue " + this);
       }
-      return true;
-    } else if (nonRunnableApps.remove(app)) {
-      return false;
-    } else {
-      throw new IllegalStateException("Given app to remove " + app +
-          " does not exist in queue " + this);
+    } finally {
+      writeLock.unlock();
+    }
+    // Update AM resource usage if needed
+    if (runnable && app.isAmRunning() && app.getAMResource() != null) {
+      Resources.subtractFrom(amResourceUsage, app.getAMResource());
     }
+    return runnable;
   }
   
   public Collection<FSAppAttempt> getRunnableAppSchedulables() {
@@ -114,11 +138,16 @@ public class FSLeafQueue extends FSQueue {
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
-    for (FSAppAttempt appSched : runnableApps) {
-      apps.add(appSched.getApplicationAttemptId());
-    }
-    for (FSAppAttempt appSched : nonRunnableApps) {
-      apps.add(appSched.getApplicationAttemptId());
+    readLock.lock();
+    try {
+      for (FSAppAttempt appSched : runnableApps) {
+        apps.add(appSched.getApplicationAttemptId());
+      }
+      for (FSAppAttempt appSched : nonRunnableApps) {
+        apps.add(appSched.getApplicationAttemptId());
+      }
+    } finally {
+      readLock.unlock();
     }
   }
 
@@ -144,11 +173,16 @@ public class FSLeafQueue extends FSQueue {
   @Override
   public Resource getResourceUsage() {
     Resource usage = Resources.createResource(0);
-    for (FSAppAttempt app : runnableApps) {
-      Resources.addTo(usage, app.getResourceUsage());
-    }
-    for (FSAppAttempt app : nonRunnableApps) {
-      Resources.addTo(usage, app.getResourceUsage());
+    readLock.lock();
+    try {
+      for (FSAppAttempt app : runnableApps) {
+        Resources.addTo(usage, app.getResourceUsage());
+      }
+      for (FSAppAttempt app : nonRunnableApps) {
+        Resources.addTo(usage, app.getResourceUsage());
+      }
+    } finally {
+      readLock.unlock();
     }
     return usage;
   }
@@ -164,17 +198,22 @@ public class FSLeafQueue extends FSQueue {
     Resource maxRes = scheduler.getAllocationConfiguration()
         .getMaxResources(getName());
     demand = Resources.createResource(0);
-    for (FSAppAttempt sched : runnableApps) {
-      if (Resources.equals(demand, maxRes)) {
-        break;
+    readLock.lock();
+    try {
+      for (FSAppAttempt sched : runnableApps) {
+        if (Resources.equals(demand, maxRes)) {
+          break;
+        }
+        updateDemandForApp(sched, maxRes);
       }
-      updateDemandForApp(sched, maxRes);
-    }
-    for (FSAppAttempt sched : nonRunnableApps) {
-      if (Resources.equals(demand, maxRes)) {
-        break;
+      for (FSAppAttempt sched : nonRunnableApps) {
+        if (Resources.equals(demand, maxRes)) {
+          break;
+        }
+        updateDemandForApp(sched, maxRes);
       }
-      updateDemandForApp(sched, maxRes);
+    } finally {
+      readLock.unlock();
     }
     if (LOG.isDebugEnabled()) {
       LOG.debug("The updated demand for " + getName() + " is " + demand
@@ -198,7 +237,8 @@ public class FSLeafQueue extends FSQueue {
   public Resource assignContainer(FSSchedulerNode node) {
     Resource assigned = Resources.none();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName());
+      LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
+          getName());
     }
 
     if (!assignContainerPreCheck(node)) {
@@ -206,16 +246,26 @@ public class FSLeafQueue extends FSQueue {
     }
 
     Comparator<Schedulable> comparator = policy.getComparator();
-    Collections.sort(runnableApps, comparator);
-    for (FSAppAttempt sched : runnableApps) {
-      if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
-        continue;
-      }
-
-      assigned = sched.assignContainer(node);
-      if (!assigned.equals(Resources.none())) {
-        break;
+    writeLock.lock();
+    try {
+      Collections.sort(runnableApps, comparator);
+    } finally {
+      writeLock.unlock();
+    }
+    readLock.lock();
+    try {
+      for (FSAppAttempt sched : runnableApps) {
+        if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
+          continue;
+        }
+
+        assigned = sched.assignContainer(node);
+        if (!assigned.equals(Resources.none())) {
+          break;
+        }
       }
+    } finally {
+      readLock.unlock();
     }
     return assigned;
   }
@@ -237,11 +287,16 @@ public class FSLeafQueue extends FSQueue {
     // Choose the app that is most over fair share
     Comparator<Schedulable> comparator = policy.getComparator();
     FSAppAttempt candidateSched = null;
-    for (FSAppAttempt sched : runnableApps) {
-      if (candidateSched == null ||
-          comparator.compare(sched, candidateSched) > 0) {
-        candidateSched = sched;
+    readLock.lock();
+    try {
+      for (FSAppAttempt sched : runnableApps) {
+        if (candidateSched == null ||
+            comparator.compare(sched, candidateSched) > 0) {
+          candidateSched = sched;
+        }
       }
+    } finally {
+      readLock.unlock();
     }
 
     // Preempt from the selected app

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 97736be..385ea0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -28,12 +28,22 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.ArrayList;
 import java.util.Collection;
-
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -222,4 +232,85 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
     assertFalse(queueB1.isStarvedForFairShare());
     assertFalse(queueB2.isStarvedForFairShare());
   }
+
+  @Test
+  public void testConcurrentAccess() {
+    conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+    resourceManager = new MockRM(conf);
+    resourceManager.start();
+    scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+    String queueName = "root.queue1";
+    final FSLeafQueue schedulable = scheduler.getQueueManager().
+      getLeafQueue(queueName, true);
+    ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+    RMContext rmContext = resourceManager.getRMContext();
+    final FSAppAttempt app =
+        new FSAppAttempt(scheduler, applicationAttemptId, "user1",
+            schedulable, null, rmContext);
+
+    // this needs to be in sync with the number of runnables declared below
+    int testThreads = 2;
+    List<Runnable> runnables = new ArrayList<Runnable>();
+
+    // add applications to modify the list
+    runnables.add(new Runnable() {
+      @Override
+      public void run() {
+        for (int i=0; i < 500; i++) {
+          schedulable.addAppSchedulable(app);
+        }
+      }
+    });
+
+    // iterate over the list a couple of times in a different thread
+    runnables.add(new Runnable() {
+      @Override
+      public void run() {
+        for (int i=0; i < 500; i++) {
+          schedulable.getResourceUsage();
+        }
+      }
+    });
+
+    final List<Throwable> exceptions = Collections.synchronizedList(
+        new ArrayList<Throwable>());
+    final ExecutorService threadPool = Executors.newFixedThreadPool(
+        testThreads);
+
+    try {
+      final CountDownLatch allExecutorThreadsReady =
+          new CountDownLatch(testThreads);
+      final CountDownLatch startBlocker = new CountDownLatch(1);
+      final CountDownLatch allDone = new CountDownLatch(testThreads);
+      for (final Runnable submittedTestRunnable : runnables) {
+        threadPool.submit(new Runnable() {
+          public void run() {
+            allExecutorThreadsReady.countDown();
+            try {
+              startBlocker.await();
+              submittedTestRunnable.run();
+            } catch (final Throwable e) {
+              exceptions.add(e);
+            } finally {
+              allDone.countDown();
+            }
+          }
+        });
+      }
+      // wait until all threads are ready
+      allExecutorThreadsReady.await();
+      // start all test runners
+      startBlocker.countDown();
+      int testTimeout = 2;
+      assertTrue("Timeout waiting for more than " + testTimeout + " seconds",
+          allDone.await(testTimeout, TimeUnit.SECONDS));
+    } catch (InterruptedException ie) {
+      exceptions.add(ie);
+    } finally {
+      threadPool.shutdownNow();
+    }
+    assertTrue("Test failed with exception(s)" + exceptions,
+        exceptions.isEmpty());
+  }
 }