You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2014/12/09 23:00:40 UTC
hadoop git commit: YARN-2910. FSLeafQueue can throw
ConcurrentModificationException. (Wilfred Spiegelenburg via kasha)
Repository: hadoop
Updated Branches:
refs/heads/trunk 134061753 -> a2e07a545
YARN-2910. FSLeafQueue can throw ConcurrentModificationException. (Wilfred Spiegelenburg via kasha)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a2e07a54
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a2e07a54
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a2e07a54
Branch: refs/heads/trunk
Commit: a2e07a54561a57a83b943628ebbc53ed5ba52718
Parents: 1340617
Author: Karthik Kambatla <ka...@apache.org>
Authored: Tue Dec 9 14:00:31 2014 -0800
Committer: Karthik Kambatla <ka...@apache.org>
Committed: Tue Dec 9 14:00:31 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/fair/FSAppAttempt.java | 2 +-
.../scheduler/fair/FSLeafQueue.java | 151 +++++++++++++------
.../scheduler/fair/TestFSLeafQueue.java | 93 +++++++++++-
4 files changed, 199 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d06c831..d87322f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -203,6 +203,9 @@ Release 2.7.0 - UNRELEASED
YARN-2931. PublicLocalizer may fail until directory is initialized by
LocalizeRunner. (Anubhav Dhoot via kasha)
+ YARN-2910. FSLeafQueue can throw ConcurrentModificationException.
+ (Wilfred Spiegelenburg via kasha)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index b9966e7..b23ec3e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -172,7 +172,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
@Override
- public synchronized Resource getHeadroom() {
+ public Resource getHeadroom() {
final FSQueue queue = (FSQueue) this.queue;
SchedulingPolicy policy = queue.getPolicy();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 345ea8b..bbf1be7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -23,6 +23,9 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
@@ -50,6 +53,10 @@ public class FSLeafQueue extends FSQueue {
new ArrayList<FSAppAttempt>();
private final List<FSAppAttempt> nonRunnableApps =
new ArrayList<FSAppAttempt>();
+ // get a lock with fair distribution for app list updates
+ private final ReadWriteLock rwl = new ReentrantReadWriteLock(true);
+ private final Lock readLock = rwl.readLock();
+ private final Lock writeLock = rwl.writeLock();
private Resource demand = Resources.createResource(0);
@@ -72,16 +79,26 @@ public class FSLeafQueue extends FSQueue {
}
public void addApp(FSAppAttempt app, boolean runnable) {
- if (runnable) {
- runnableApps.add(app);
- } else {
- nonRunnableApps.add(app);
+ writeLock.lock();
+ try {
+ if (runnable) {
+ runnableApps.add(app);
+ } else {
+ nonRunnableApps.add(app);
+ }
+ } finally {
+ writeLock.unlock();
}
}
// for testing
void addAppSchedulable(FSAppAttempt appSched) {
- runnableApps.add(appSched);
+ writeLock.lock();
+ try {
+ runnableApps.add(appSched);
+ } finally {
+ writeLock.unlock();
+ }
}
/**
@@ -89,18 +106,25 @@ public class FSLeafQueue extends FSQueue {
* @return whether or not the app was runnable
*/
public boolean removeApp(FSAppAttempt app) {
- if (runnableApps.remove(app)) {
- // Update AM resource usage
- if (app.isAmRunning() && app.getAMResource() != null) {
- Resources.subtractFrom(amResourceUsage, app.getAMResource());
+ boolean runnable = false;
+ writeLock.lock();
+ try {
+ if (runnableApps.remove(app)) {
+ runnable = true;
+ } else if (nonRunnableApps.remove(app)) {
+ runnable = false; //nop, runnable is initialised to false already
+ } else {
+ throw new IllegalStateException("Given app to remove " + app +
+ " does not exist in queue " + this);
}
- return true;
- } else if (nonRunnableApps.remove(app)) {
- return false;
- } else {
- throw new IllegalStateException("Given app to remove " + app +
- " does not exist in queue " + this);
+ } finally {
+ writeLock.unlock();
+ }
+ // Update AM resource usage if needed
+ if (runnable && app.isAmRunning() && app.getAMResource() != null) {
+ Resources.subtractFrom(amResourceUsage, app.getAMResource());
}
+ return runnable;
}
public Collection<FSAppAttempt> getRunnableAppSchedulables() {
@@ -114,11 +138,16 @@ public class FSLeafQueue extends FSQueue {
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
- for (FSAppAttempt appSched : runnableApps) {
- apps.add(appSched.getApplicationAttemptId());
- }
- for (FSAppAttempt appSched : nonRunnableApps) {
- apps.add(appSched.getApplicationAttemptId());
+ readLock.lock();
+ try {
+ for (FSAppAttempt appSched : runnableApps) {
+ apps.add(appSched.getApplicationAttemptId());
+ }
+ for (FSAppAttempt appSched : nonRunnableApps) {
+ apps.add(appSched.getApplicationAttemptId());
+ }
+ } finally {
+ readLock.unlock();
}
}
@@ -144,11 +173,16 @@ public class FSLeafQueue extends FSQueue {
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
- for (FSAppAttempt app : runnableApps) {
- Resources.addTo(usage, app.getResourceUsage());
- }
- for (FSAppAttempt app : nonRunnableApps) {
- Resources.addTo(usage, app.getResourceUsage());
+ readLock.lock();
+ try {
+ for (FSAppAttempt app : runnableApps) {
+ Resources.addTo(usage, app.getResourceUsage());
+ }
+ for (FSAppAttempt app : nonRunnableApps) {
+ Resources.addTo(usage, app.getResourceUsage());
+ }
+ } finally {
+ readLock.unlock();
}
return usage;
}
@@ -164,17 +198,22 @@ public class FSLeafQueue extends FSQueue {
Resource maxRes = scheduler.getAllocationConfiguration()
.getMaxResources(getName());
demand = Resources.createResource(0);
- for (FSAppAttempt sched : runnableApps) {
- if (Resources.equals(demand, maxRes)) {
- break;
+ readLock.lock();
+ try {
+ for (FSAppAttempt sched : runnableApps) {
+ if (Resources.equals(demand, maxRes)) {
+ break;
+ }
+ updateDemandForApp(sched, maxRes);
}
- updateDemandForApp(sched, maxRes);
- }
- for (FSAppAttempt sched : nonRunnableApps) {
- if (Resources.equals(demand, maxRes)) {
- break;
+ for (FSAppAttempt sched : nonRunnableApps) {
+ if (Resources.equals(demand, maxRes)) {
+ break;
+ }
+ updateDemandForApp(sched, maxRes);
}
- updateDemandForApp(sched, maxRes);
+ } finally {
+ readLock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand
@@ -198,7 +237,8 @@ public class FSLeafQueue extends FSQueue {
public Resource assignContainer(FSSchedulerNode node) {
Resource assigned = Resources.none();
if (LOG.isDebugEnabled()) {
- LOG.debug("Node " + node.getNodeName() + " offered to queue: " + getName());
+ LOG.debug("Node " + node.getNodeName() + " offered to queue: " +
+ getName());
}
if (!assignContainerPreCheck(node)) {
@@ -206,16 +246,26 @@ public class FSLeafQueue extends FSQueue {
}
Comparator<Schedulable> comparator = policy.getComparator();
- Collections.sort(runnableApps, comparator);
- for (FSAppAttempt sched : runnableApps) {
- if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
- continue;
- }
-
- assigned = sched.assignContainer(node);
- if (!assigned.equals(Resources.none())) {
- break;
+ writeLock.lock();
+ try {
+ Collections.sort(runnableApps, comparator);
+ } finally {
+ writeLock.unlock();
+ }
+ readLock.lock();
+ try {
+ for (FSAppAttempt sched : runnableApps) {
+ if (SchedulerAppUtils.isBlacklisted(sched, node, LOG)) {
+ continue;
+ }
+
+ assigned = sched.assignContainer(node);
+ if (!assigned.equals(Resources.none())) {
+ break;
+ }
}
+ } finally {
+ readLock.unlock();
}
return assigned;
}
@@ -237,11 +287,16 @@ public class FSLeafQueue extends FSQueue {
// Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator();
FSAppAttempt candidateSched = null;
- for (FSAppAttempt sched : runnableApps) {
- if (candidateSched == null ||
- comparator.compare(sched, candidateSched) > 0) {
- candidateSched = sched;
+ readLock.lock();
+ try {
+ for (FSAppAttempt sched : runnableApps) {
+ if (candidateSched == null ||
+ comparator.compare(sched, candidateSched) > 0) {
+ candidateSched = sched;
+ }
}
+ } finally {
+ readLock.unlock();
}
// Preempt from the selected app
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a2e07a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 97736be..385ea0b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -28,12 +28,22 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
import java.util.Collection;
-
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@@ -222,4 +232,85 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
assertFalse(queueB1.isStarvedForFairShare());
assertFalse(queueB2.isStarvedForFairShare());
}
+
+ @Test
+ public void testConcurrentAccess() {
+ conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
+ resourceManager = new MockRM(conf);
+ resourceManager.start();
+ scheduler = (FairScheduler) resourceManager.getResourceScheduler();
+
+ String queueName = "root.queue1";
+ final FSLeafQueue schedulable = scheduler.getQueueManager().
+ getLeafQueue(queueName, true);
+ ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+ RMContext rmContext = resourceManager.getRMContext();
+ final FSAppAttempt app =
+ new FSAppAttempt(scheduler, applicationAttemptId, "user1",
+ schedulable, null, rmContext);
+
+ // this needs to be in sync with the number of runnables declared below
+ int testThreads = 2;
+ List<Runnable> runnables = new ArrayList<Runnable>();
+
+ // add applications to modify the list
+ runnables.add(new Runnable() {
+ @Override
+ public void run() {
+ for (int i=0; i < 500; i++) {
+ schedulable.addAppSchedulable(app);
+ }
+ }
+ });
+
+ // iterate over the list a couple of times in a different thread
+ runnables.add(new Runnable() {
+ @Override
+ public void run() {
+ for (int i=0; i < 500; i++) {
+ schedulable.getResourceUsage();
+ }
+ }
+ });
+
+ final List<Throwable> exceptions = Collections.synchronizedList(
+ new ArrayList<Throwable>());
+ final ExecutorService threadPool = Executors.newFixedThreadPool(
+ testThreads);
+
+ try {
+ final CountDownLatch allExecutorThreadsReady =
+ new CountDownLatch(testThreads);
+ final CountDownLatch startBlocker = new CountDownLatch(1);
+ final CountDownLatch allDone = new CountDownLatch(testThreads);
+ for (final Runnable submittedTestRunnable : runnables) {
+ threadPool.submit(new Runnable() {
+ public void run() {
+ allExecutorThreadsReady.countDown();
+ try {
+ startBlocker.await();
+ submittedTestRunnable.run();
+ } catch (final Throwable e) {
+ exceptions.add(e);
+ } finally {
+ allDone.countDown();
+ }
+ }
+ });
+ }
+ // wait until all threads are ready
+ allExecutorThreadsReady.await();
+ // start all test runners
+ startBlocker.countDown();
+ int testTimeout = 2;
+ assertTrue("Timeout waiting for more than " + testTimeout + " seconds",
+ allDone.await(testTimeout, TimeUnit.SECONDS));
+ } catch (InterruptedException ie) {
+ exceptions.add(ie);
+ } finally {
+ threadPool.shutdownNow();
+ }
+ assertTrue("Test failed with exception(s)" + exceptions,
+ exceptions.isEmpty());
+ }
}