You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/05/15 23:58:12 UTC
[05/50] [abbrv] hadoop git commit: YARN-7003. DRAINING state of
queues is not recovered after RM restart. Contributed by Tao Yang.
YARN-7003. DRAINING state of queues is not recovered after RM restart. Contributed by Tao Yang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9db9cd95
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9db9cd95
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9db9cd95
Branch: refs/heads/HDDS-4
Commit: 9db9cd95bd0348070a286e69e7965c03c9bd39d6
Parents: d76fbbc
Author: Weiwei Yang <ww...@apache.org>
Authored: Fri May 11 10:47:04 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Fri May 11 10:47:04 2018 +0800
----------------------------------------------------------------------
.../scheduler/capacity/AbstractCSQueue.java | 15 +++++
.../scheduler/capacity/CapacityScheduler.java | 7 +++
.../scheduler/capacity/TestQueueState.java | 60 ++++++++++++++++++++
3 files changed, 82 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db9cd95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 651d0e9..67b676b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -1244,4 +1244,19 @@ public abstract class AbstractCSQueue implements CSQueue {
public Map<String, Float> getUserWeights() {
return userWeights;
}
+
+ public void recoverDrainingState() {
+ try {
+ this.writeLock.lock();
+ if (getState() == QueueState.STOPPED) {
+ updateQueueState(QueueState.DRAINING);
+ }
+ LOG.info("Recover draining state for queue " + this.getQueuePath());
+ if (getParent() != null && getParent().getState() == QueueState.STOPPED) {
+ ((AbstractCSQueue) getParent()).recoverDrainingState();
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db9cd95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1d6c104..162d3bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -808,6 +809,12 @@ public class CapacityScheduler extends
throw new QueueInvalidException(queueErrorMsg);
}
}
+ // When recovering apps in this queue but queue is in STOPPED state,
+ // that means its previous state was DRAINING. So we auto transit
+ // the state to DRAINING for recovery.
+ if (queue.getState() == QueueState.STOPPED) {
+ ((LeafQueue) queue).recoverDrainingState();
+ }
// Submit to the queue
try {
queue.submitApplication(applicationId, user, queueName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9db9cd95/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
index 9f2933e..0a39e99 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java
@@ -32,7 +32,12 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
@@ -197,4 +202,59 @@ public class TestQueueState {
.thenCallRealMethod();
return application;
}
+
+ @Test (timeout = 30000)
+ public void testRecoverDrainingStateAfterRMRestart() throws Exception {
+ // init conf
+ CapacitySchedulerConfiguration newConf =
+ new CapacitySchedulerConfiguration();
+ newConf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ newConf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
+ false);
+ newConf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ newConf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
+ newConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{Q1});
+ newConf.setQueues(Q1_PATH, new String[]{Q2});
+ newConf.setCapacity(Q1_PATH, 100);
+ newConf.setCapacity(Q2_PATH, 100);
+
+ // init state store
+ MemoryRMStateStore newMemStore = new MemoryRMStateStore();
+ newMemStore.init(newConf);
+ // init RM & NMs & Nodes
+ MockRM rm = new MockRM(newConf, newMemStore);
+ rm.start();
+ MockNM nm = rm.registerNode("h1:1234", 204800);
+
+ // submit an app, AM is running on nm1
+ RMApp app = rm.submitApp(1024, "appname", "appuser", null, Q2);
+ MockRM.launchAM(app, rm, nm);
+ rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+ // update queue state to STOPPED
+ newConf.setState(Q1_PATH, QueueState.STOPPED);
+ CapacityScheduler capacityScheduler =
+ (CapacityScheduler) rm.getRMContext().getScheduler();
+ capacityScheduler.reinitialize(newConf, rm.getRMContext());
+ // current queue state should be DRAINING
+ Assert.assertEquals(QueueState.DRAINING,
+ capacityScheduler.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.DRAINING,
+ capacityScheduler.getQueue(Q1).getState());
+
+ // RM restart
+ rm = new MockRM(newConf, newMemStore);
+ rm.start();
+ rm.registerNode("h1:1234", 204800);
+
+ // queue state should be DRAINING after app recovered
+ rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
+ capacityScheduler = (CapacityScheduler) rm.getRMContext().getScheduler();
+ Assert.assertEquals(QueueState.DRAINING,
+ capacityScheduler.getQueue(Q2).getState());
+ Assert.assertEquals(QueueState.DRAINING,
+ capacityScheduler.getQueue(Q1).getState());
+
+ // close rm
+ rm.close();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org