You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/11/20 04:49:59 UTC
hadoop git commit: YARN-2865. Fixed RM to always create a new
RMContext when transtions from StandBy to Active. Contributed by Rohith
Sharmaks (cherry picked from commit 9cb8b75ba57f18639492bfa3b7e7c11c00bb3d3b)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 988682ac2 -> db31ef7e7
YARN-2865. Fixed RM to always create a new RMContext when transtions from StandBy to Active. Contributed by Rohith Sharmaks
(cherry picked from commit 9cb8b75ba57f18639492bfa3b7e7c11c00bb3d3b)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/db31ef7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/db31ef7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/db31ef7e
Branch: refs/heads/branch-2
Commit: db31ef7e7f55436bbf88c6d93e2273c4463ca9f0
Parents: 988682a
Author: Jian He <ji...@apache.org>
Authored: Wed Nov 19 19:48:33 2014 -0800
Committer: Jian He <ji...@apache.org>
Committed: Wed Nov 19 19:49:44 2014 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resourcemanager/RMActiveServiceContext.java | 455 +++++++++++++++++++
.../server/resourcemanager/RMContextImpl.java | 235 ++++------
.../server/resourcemanager/ResourceManager.java | 26 +-
.../yarn/server/resourcemanager/TestRMHA.java | 62 +++
5 files changed, 626 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db31ef7e/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index fd59311..3dd08b1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -88,6 +88,9 @@ Release 2.7.0 - UNRELEASED
YARN-2878. Fix DockerContainerExecutor.apt.vm formatting. (Abin Shahab via
jianhe)
+ YARN-2865. Fixed RM to always create a new RMContext when transtions from
+ StandBy to Active. (Rohith Sharmaks via jianhe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db31ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
new file mode 100644
index 0000000..3bc2e9b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -0,0 +1,455 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * The RMActiveServiceContext is the class that maintains all the
+ * RMActiveService contexts.This is expected to be used only by ResourceManager
+ * and RMContext.
+ */
+@Private
+@Unstable
+public class RMActiveServiceContext {
+
+ private static final Log LOG = LogFactory
+ .getLog(RMActiveServiceContext.class);
+
+ private final ConcurrentMap<ApplicationId, RMApp> applications =
+ new ConcurrentHashMap<ApplicationId, RMApp>();
+
+ private final ConcurrentMap<NodeId, RMNode> nodes =
+ new ConcurrentHashMap<NodeId, RMNode>();
+
+ private final ConcurrentMap<String, RMNode> inactiveNodes =
+ new ConcurrentHashMap<String, RMNode>();
+
+ private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
+ new ConcurrentHashMap<ApplicationId, ByteBuffer>();
+
+ private boolean isWorkPreservingRecoveryEnabled;
+
+ private AMLivelinessMonitor amLivelinessMonitor;
+ private AMLivelinessMonitor amFinishingMonitor;
+ private RMStateStore stateStore = null;
+ private ContainerAllocationExpirer containerAllocationExpirer;
+ private DelegationTokenRenewer delegationTokenRenewer;
+ private AMRMTokenSecretManager amRMTokenSecretManager;
+ private RMContainerTokenSecretManager containerTokenSecretManager;
+ private NMTokenSecretManagerInRM nmTokenSecretManager;
+ private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
+ private ClientRMService clientRMService;
+ private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
+ private ResourceScheduler scheduler;
+ private ReservationSystem reservationSystem;
+ private NodesListManager nodesListManager;
+ private ResourceTrackerService resourceTrackerService;
+ private ApplicationMasterService applicationMasterService;
+ private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+ private SystemMetricsPublisher systemMetricsPublisher;
+ private RMNodeLabelsManager nodeLabelManager;
+ private long epoch;
+ private Clock systemClock = new SystemClock();
+ private long schedulerRecoveryStartTime = 0;
+ private long schedulerRecoveryWaitTime = 0;
+ private boolean printLog = true;
+ private boolean isSchedulerReady = false;
+
+ public RMActiveServiceContext() {
+
+ }
+
+ @Private
+ @Unstable
+ public RMActiveServiceContext(Dispatcher rmDispatcher,
+ ContainerAllocationExpirer containerAllocationExpirer,
+ AMLivelinessMonitor amLivelinessMonitor,
+ AMLivelinessMonitor amFinishingMonitor,
+ DelegationTokenRenewer delegationTokenRenewer,
+ AMRMTokenSecretManager appTokenSecretManager,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ NMTokenSecretManagerInRM nmTokenSecretManager,
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ this();
+ this.setContainerAllocationExpirer(containerAllocationExpirer);
+ this.setAMLivelinessMonitor(amLivelinessMonitor);
+ this.setAMFinishingMonitor(amFinishingMonitor);
+ this.setDelegationTokenRenewer(delegationTokenRenewer);
+ this.setAMRMTokenSecretManager(appTokenSecretManager);
+ this.setContainerTokenSecretManager(containerTokenSecretManager);
+ this.setNMTokenSecretManager(nmTokenSecretManager);
+ this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
+ this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+
+ RMStateStore nullStore = new NullRMStateStore();
+ nullStore.setRMDispatcher(rmDispatcher);
+ try {
+ nullStore.init(new YarnConfiguration());
+ setStateStore(nullStore);
+ } catch (Exception e) {
+ assert false;
+ }
+ }
+
+ @Private
+ @Unstable
+ public void setStateStore(RMStateStore store) {
+ stateStore = store;
+ }
+
+ @Private
+ @Unstable
+ public ClientRMService getClientRMService() {
+ return clientRMService;
+ }
+
+ @Private
+ @Unstable
+ public ApplicationMasterService getApplicationMasterService() {
+ return applicationMasterService;
+ }
+
+ @Private
+ @Unstable
+ public ResourceTrackerService getResourceTrackerService() {
+ return resourceTrackerService;
+ }
+
+ @Private
+ @Unstable
+ public RMStateStore getStateStore() {
+ return stateStore;
+ }
+
+ @Private
+ @Unstable
+ public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
+ return this.applications;
+ }
+
+ @Private
+ @Unstable
+ public ConcurrentMap<NodeId, RMNode> getRMNodes() {
+ return this.nodes;
+ }
+
+ @Private
+ @Unstable
+ public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
+ return this.inactiveNodes;
+ }
+
+ @Private
+ @Unstable
+ public ContainerAllocationExpirer getContainerAllocationExpirer() {
+ return this.containerAllocationExpirer;
+ }
+
+ @Private
+ @Unstable
+ public AMLivelinessMonitor getAMLivelinessMonitor() {
+ return this.amLivelinessMonitor;
+ }
+
+ @Private
+ @Unstable
+ public AMLivelinessMonitor getAMFinishingMonitor() {
+ return this.amFinishingMonitor;
+ }
+
+ @Private
+ @Unstable
+ public DelegationTokenRenewer getDelegationTokenRenewer() {
+ return delegationTokenRenewer;
+ }
+
+ @Private
+ @Unstable
+ public AMRMTokenSecretManager getAMRMTokenSecretManager() {
+ return this.amRMTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ public RMContainerTokenSecretManager getContainerTokenSecretManager() {
+ return this.containerTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ public NMTokenSecretManagerInRM getNMTokenSecretManager() {
+ return this.nmTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ public ResourceScheduler getScheduler() {
+ return this.scheduler;
+ }
+
+ @Private
+ @Unstable
+ public ReservationSystem getReservationSystem() {
+ return this.reservationSystem;
+ }
+
+ @Private
+ @Unstable
+ public NodesListManager getNodesListManager() {
+ return this.nodesListManager;
+ }
+
+ @Private
+ @Unstable
+ public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
+ return this.clientToAMTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ public void setClientRMService(ClientRMService clientRMService) {
+ this.clientRMService = clientRMService;
+ }
+
+ @Private
+ @Unstable
+ public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
+ return this.rmDelegationTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ public void setRMDelegationTokenSecretManager(
+ RMDelegationTokenSecretManager delegationTokenSecretManager) {
+ this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ void setContainerAllocationExpirer(
+ ContainerAllocationExpirer containerAllocationExpirer) {
+ this.containerAllocationExpirer = containerAllocationExpirer;
+ }
+
+ @Private
+ @Unstable
+ void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
+ this.amLivelinessMonitor = amLivelinessMonitor;
+ }
+
+ @Private
+ @Unstable
+ void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
+ this.amFinishingMonitor = amFinishingMonitor;
+ }
+
+ @Private
+ @Unstable
+ void setContainerTokenSecretManager(
+ RMContainerTokenSecretManager containerTokenSecretManager) {
+ this.containerTokenSecretManager = containerTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
+ this.nmTokenSecretManager = nmTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ void setScheduler(ResourceScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ @Private
+ @Unstable
+ void setReservationSystem(ReservationSystem reservationSystem) {
+ this.reservationSystem = reservationSystem;
+ }
+
+ @Private
+ @Unstable
+ void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
+ this.delegationTokenRenewer = delegationTokenRenewer;
+ }
+
+ @Private
+ @Unstable
+ void setClientToAMTokenSecretManager(
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
+ this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
+ this.amRMTokenSecretManager = amRMTokenSecretManager;
+ }
+
+ @Private
+ @Unstable
+ void setNodesListManager(NodesListManager nodesListManager) {
+ this.nodesListManager = nodesListManager;
+ }
+
+ @Private
+ @Unstable
+ void setApplicationMasterService(
+ ApplicationMasterService applicationMasterService) {
+ this.applicationMasterService = applicationMasterService;
+ }
+
+ @Private
+ @Unstable
+ void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
+ this.resourceTrackerService = resourceTrackerService;
+ }
+
+ @Private
+ @Unstable
+ public void setWorkPreservingRecoveryEnabled(boolean enabled) {
+ this.isWorkPreservingRecoveryEnabled = enabled;
+ }
+
+ @Private
+ @Unstable
+ public boolean isWorkPreservingRecoveryEnabled() {
+ return this.isWorkPreservingRecoveryEnabled;
+ }
+
+ @Private
+ @Unstable
+ public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+ return rmApplicationHistoryWriter;
+ }
+
+ @Private
+ @Unstable
+ public void setSystemMetricsPublisher(
+ SystemMetricsPublisher systemMetricsPublisher) {
+ this.systemMetricsPublisher = systemMetricsPublisher;
+ }
+
+ @Private
+ @Unstable
+ public SystemMetricsPublisher getSystemMetricsPublisher() {
+ return systemMetricsPublisher;
+ }
+
+ @Private
+ @Unstable
+ public void setRMApplicationHistoryWriter(
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+ }
+
+ @Private
+ @Unstable
+ public long getEpoch() {
+ return this.epoch;
+ }
+
+ @Private
+ @Unstable
+ void setEpoch(long epoch) {
+ this.epoch = epoch;
+ }
+
+ @Private
+ @Unstable
+ public RMNodeLabelsManager getNodeLabelManager() {
+ return nodeLabelManager;
+ }
+
+ @Private
+ @Unstable
+ public void setNodeLabelManager(RMNodeLabelsManager mgr) {
+ nodeLabelManager = mgr;
+ }
+
+ @Private
+ @Unstable
+ public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
+ this.schedulerRecoveryStartTime = systemClock.getTime();
+ this.schedulerRecoveryWaitTime = waitTime;
+ }
+
+ @Private
+ @Unstable
+ public boolean isSchedulerReadyForAllocatingContainers() {
+ if (isSchedulerReady) {
+ return isSchedulerReady;
+ }
+ isSchedulerReady =
+ (systemClock.getTime() - schedulerRecoveryStartTime) > schedulerRecoveryWaitTime;
+ if (!isSchedulerReady && printLog) {
+ LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
+ printLog = false;
+ }
+ if (isSchedulerReady) {
+ LOG.info("Scheduler recovery is done. Start allocating new containers.");
+ }
+ return isSchedulerReady;
+ }
+
+ @Private
+ @Unstable
+ public void setSystemClock(Clock clock) {
+ this.systemClock = clock;
+ }
+
+ @Private
+ @Unstable
+ public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+ return systemCredentials;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db31ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 7c1db3d..55d7667 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -19,24 +19,20 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,7 +47,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManag
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.SystemClock;
import com.google.common.annotations.VisibleForTesting;
@@ -59,52 +54,16 @@ public class RMContextImpl implements RMContext {
private Dispatcher rmDispatcher;
- private final ConcurrentMap<ApplicationId, RMApp> applications
- = new ConcurrentHashMap<ApplicationId, RMApp>();
-
- private final ConcurrentMap<NodeId, RMNode> nodes
- = new ConcurrentHashMap<NodeId, RMNode>();
-
- private final ConcurrentMap<String, RMNode> inactiveNodes
- = new ConcurrentHashMap<String, RMNode>();
-
- private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
- new ConcurrentHashMap<ApplicationId, ByteBuffer>();
-
private boolean isHAEnabled;
- private boolean isWorkPreservingRecoveryEnabled;
+
private HAServiceState haServiceState =
HAServiceProtocol.HAServiceState.INITIALIZING;
-
- private AMLivelinessMonitor amLivelinessMonitor;
- private AMLivelinessMonitor amFinishingMonitor;
- private RMStateStore stateStore = null;
- private ContainerAllocationExpirer containerAllocationExpirer;
- private DelegationTokenRenewer delegationTokenRenewer;
- private AMRMTokenSecretManager amRMTokenSecretManager;
- private RMContainerTokenSecretManager containerTokenSecretManager;
- private NMTokenSecretManagerInRM nmTokenSecretManager;
- private ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;
+
private AdminService adminService;
- private ClientRMService clientRMService;
- private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;
- private ResourceScheduler scheduler;
- private ReservationSystem reservationSystem;
- private NodesListManager nodesListManager;
- private ResourceTrackerService resourceTrackerService;
- private ApplicationMasterService applicationMasterService;
- private RMApplicationHistoryWriter rmApplicationHistoryWriter;
- private SystemMetricsPublisher systemMetricsPublisher;
+
private ConfigurationProvider configurationProvider;
- private RMNodeLabelsManager nodeLabelManager;
- private long epoch;
- private Clock systemClock = new SystemClock();
- private long schedulerRecoveryStartTime = 0;
- private long schedulerRecoveryWaitTime = 0;
- private boolean printLog = true;
- private boolean isSchedulerReady = false;
- private static final Log LOG = LogFactory.getLog(RMContextImpl.class);
+ private RMActiveServiceContext activeServiceContext;
/**
* Default constructor. To be used in conjunction with setter methods for
@@ -128,24 +87,11 @@ public class RMContextImpl implements RMContext {
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this();
this.setDispatcher(rmDispatcher);
- this.setContainerAllocationExpirer(containerAllocationExpirer);
- this.setAMLivelinessMonitor(amLivelinessMonitor);
- this.setAMFinishingMonitor(amFinishingMonitor);
- this.setDelegationTokenRenewer(delegationTokenRenewer);
- this.setAMRMTokenSecretManager(appTokenSecretManager);
- this.setContainerTokenSecretManager(containerTokenSecretManager);
- this.setNMTokenSecretManager(nmTokenSecretManager);
- this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
- this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
-
- RMStateStore nullStore = new NullRMStateStore();
- nullStore.setRMDispatcher(rmDispatcher);
- try {
- nullStore.init(new YarnConfiguration());
- setStateStore(nullStore);
- } catch (Exception e) {
- assert false;
- }
+ setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
+ containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
+ delegationTokenRenewer, appTokenSecretManager,
+ containerTokenSecretManager, nmTokenSecretManager,
+ clientToAMTokenSecretManager, rmApplicationHistoryWriter));
ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider);
@@ -155,80 +101,80 @@ public class RMContextImpl implements RMContext {
public Dispatcher getDispatcher() {
return this.rmDispatcher;
}
-
- @Override
+
+ @Override
public RMStateStore getStateStore() {
- return stateStore;
+ return activeServiceContext.getStateStore();
}
@Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
- return this.applications;
+ return activeServiceContext.getRMApps();
}
@Override
public ConcurrentMap<NodeId, RMNode> getRMNodes() {
- return this.nodes;
+ return activeServiceContext.getRMNodes();
}
-
+
@Override
public ConcurrentMap<String, RMNode> getInactiveRMNodes() {
- return this.inactiveNodes;
+ return activeServiceContext.getInactiveRMNodes();
}
@Override
public ContainerAllocationExpirer getContainerAllocationExpirer() {
- return this.containerAllocationExpirer;
+ return activeServiceContext.getContainerAllocationExpirer();
}
@Override
public AMLivelinessMonitor getAMLivelinessMonitor() {
- return this.amLivelinessMonitor;
+ return activeServiceContext.getAMLivelinessMonitor();
}
@Override
public AMLivelinessMonitor getAMFinishingMonitor() {
- return this.amFinishingMonitor;
+ return activeServiceContext.getAMFinishingMonitor();
}
@Override
public DelegationTokenRenewer getDelegationTokenRenewer() {
- return delegationTokenRenewer;
+ return activeServiceContext.getDelegationTokenRenewer();
}
@Override
public AMRMTokenSecretManager getAMRMTokenSecretManager() {
- return this.amRMTokenSecretManager;
+ return activeServiceContext.getAMRMTokenSecretManager();
}
@Override
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
- return this.containerTokenSecretManager;
+ return activeServiceContext.getContainerTokenSecretManager();
}
-
+
@Override
public NMTokenSecretManagerInRM getNMTokenSecretManager() {
- return this.nmTokenSecretManager;
+ return activeServiceContext.getNMTokenSecretManager();
}
@Override
public ResourceScheduler getScheduler() {
- return this.scheduler;
+ return activeServiceContext.getScheduler();
}
@Override
public ReservationSystem getReservationSystem() {
- return this.reservationSystem;
+ return activeServiceContext.getReservationSystem();
}
-
+
@Override
public NodesListManager getNodesListManager() {
- return this.nodesListManager;
+ return activeServiceContext.getNodesListManager();
}
@Override
public ClientToAMTokenSecretManagerInRM getClientToAMTokenSecretManager() {
- return this.clientToAMTokenSecretManager;
+ return activeServiceContext.getClientToAMTokenSecretManager();
}
@Override
@@ -238,22 +184,22 @@ public class RMContextImpl implements RMContext {
@VisibleForTesting
public void setStateStore(RMStateStore store) {
- stateStore = store;
+ activeServiceContext.setStateStore(store);
}
-
+
@Override
public ClientRMService getClientRMService() {
- return this.clientRMService;
+ return activeServiceContext.getClientRMService();
}
@Override
public ApplicationMasterService getApplicationMasterService() {
- return applicationMasterService;
+ return activeServiceContext.getApplicationMasterService();
}
@Override
public ResourceTrackerService getResourceTrackerService() {
- return resourceTrackerService;
+ return activeServiceContext.getResourceTrackerService();
}
void setHAEnabled(boolean isHAEnabled) {
@@ -276,78 +222,78 @@ public class RMContextImpl implements RMContext {
@Override
public void setClientRMService(ClientRMService clientRMService) {
- this.clientRMService = clientRMService;
+ activeServiceContext.setClientRMService(clientRMService);
}
-
+
@Override
public RMDelegationTokenSecretManager getRMDelegationTokenSecretManager() {
- return this.rmDelegationTokenSecretManager;
+ return activeServiceContext.getRMDelegationTokenSecretManager();
}
-
+
@Override
public void setRMDelegationTokenSecretManager(
RMDelegationTokenSecretManager delegationTokenSecretManager) {
- this.rmDelegationTokenSecretManager = delegationTokenSecretManager;
+ activeServiceContext
+ .setRMDelegationTokenSecretManager(delegationTokenSecretManager);
}
void setContainerAllocationExpirer(
ContainerAllocationExpirer containerAllocationExpirer) {
- this.containerAllocationExpirer = containerAllocationExpirer;
+ activeServiceContext
+ .setContainerAllocationExpirer(containerAllocationExpirer);
}
void setAMLivelinessMonitor(AMLivelinessMonitor amLivelinessMonitor) {
- this.amLivelinessMonitor = amLivelinessMonitor;
+ activeServiceContext.setAMLivelinessMonitor(amLivelinessMonitor);
}
void setAMFinishingMonitor(AMLivelinessMonitor amFinishingMonitor) {
- this.amFinishingMonitor = amFinishingMonitor;
+ activeServiceContext.setAMFinishingMonitor(amFinishingMonitor);
}
void setContainerTokenSecretManager(
RMContainerTokenSecretManager containerTokenSecretManager) {
- this.containerTokenSecretManager = containerTokenSecretManager;
+ activeServiceContext
+ .setContainerTokenSecretManager(containerTokenSecretManager);
}
- void setNMTokenSecretManager(
- NMTokenSecretManagerInRM nmTokenSecretManager) {
- this.nmTokenSecretManager = nmTokenSecretManager;
+ void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
+ activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
}
void setScheduler(ResourceScheduler scheduler) {
- this.scheduler = scheduler;
+ activeServiceContext.setScheduler(scheduler);
}
-
+
void setReservationSystem(ReservationSystem reservationSystem) {
- this.reservationSystem = reservationSystem;
+ activeServiceContext.setReservationSystem(reservationSystem);
}
- void setDelegationTokenRenewer(
- DelegationTokenRenewer delegationTokenRenewer) {
- this.delegationTokenRenewer = delegationTokenRenewer;
+ void setDelegationTokenRenewer(DelegationTokenRenewer delegationTokenRenewer) {
+ activeServiceContext.setDelegationTokenRenewer(delegationTokenRenewer);
}
void setClientToAMTokenSecretManager(
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
- this.clientToAMTokenSecretManager = clientToAMTokenSecretManager;
+ activeServiceContext
+ .setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
}
- void setAMRMTokenSecretManager(
- AMRMTokenSecretManager amRMTokenSecretManager) {
- this.amRMTokenSecretManager = amRMTokenSecretManager;
+ void setAMRMTokenSecretManager(AMRMTokenSecretManager amRMTokenSecretManager) {
+ activeServiceContext.setAMRMTokenSecretManager(amRMTokenSecretManager);
}
void setNodesListManager(NodesListManager nodesListManager) {
- this.nodesListManager = nodesListManager;
+ activeServiceContext.setNodesListManager(nodesListManager);
}
void setApplicationMasterService(
ApplicationMasterService applicationMasterService) {
- this.applicationMasterService = applicationMasterService;
+ activeServiceContext.setApplicationMasterService(applicationMasterService);
}
- void setResourceTrackerService(
- ResourceTrackerService resourceTrackerService) {
- this.resourceTrackerService = resourceTrackerService;
+ void setResourceTrackerService(ResourceTrackerService resourceTrackerService) {
+ activeServiceContext.setResourceTrackerService(resourceTrackerService);
}
@Override
@@ -363,34 +309,35 @@ public class RMContextImpl implements RMContext {
}
public void setWorkPreservingRecoveryEnabled(boolean enabled) {
- this.isWorkPreservingRecoveryEnabled = enabled;
+ activeServiceContext.setWorkPreservingRecoveryEnabled(enabled);
}
@Override
public boolean isWorkPreservingRecoveryEnabled() {
- return this.isWorkPreservingRecoveryEnabled;
+ return activeServiceContext.isWorkPreservingRecoveryEnabled();
}
@Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
- return rmApplicationHistoryWriter;
+ return activeServiceContext.getRMApplicationHistoryWriter();
}
@Override
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
- this.systemMetricsPublisher = systemMetricsPublisher;
+ activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher);
}
@Override
public SystemMetricsPublisher getSystemMetricsPublisher() {
- return systemMetricsPublisher;
+ return activeServiceContext.getSystemMetricsPublisher();
}
@Override
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
- this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+ activeServiceContext
+ .setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
}
@Override
@@ -405,51 +352,51 @@ public class RMContextImpl implements RMContext {
@Override
public long getEpoch() {
- return this.epoch;
+ return activeServiceContext.getEpoch();
}
void setEpoch(long epoch) {
- this.epoch = epoch;
+ activeServiceContext.setEpoch(epoch);
}
@Override
public RMNodeLabelsManager getNodeLabelManager() {
- return nodeLabelManager;
+ return activeServiceContext.getNodeLabelManager();
}
-
+
@Override
public void setNodeLabelManager(RMNodeLabelsManager mgr) {
- nodeLabelManager = mgr;
+ activeServiceContext.setNodeLabelManager(mgr);
}
public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
- this.schedulerRecoveryStartTime = systemClock.getTime();
- this.schedulerRecoveryWaitTime = waitTime;
+ activeServiceContext.setSchedulerRecoveryStartAndWaitTime(waitTime);
}
public boolean isSchedulerReadyForAllocatingContainers() {
- if (isSchedulerReady) {
- return isSchedulerReady;
- }
- isSchedulerReady = (systemClock.getTime() - schedulerRecoveryStartTime)
- > schedulerRecoveryWaitTime;
- if (!isSchedulerReady && printLog) {
- LOG.info("Skip allocating containers. Scheduler is waiting for recovery.");
- printLog = false;
- }
- if (isSchedulerReady) {
- LOG.info("Scheduler recovery is done. Start allocating new containers.");
- }
- return isSchedulerReady;
+ return activeServiceContext.isSchedulerReadyForAllocatingContainers();
}
@Private
@VisibleForTesting
public void setSystemClock(Clock clock) {
- this.systemClock = clock;
+ activeServiceContext.setSystemClock(clock);
}
public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
- return systemCredentials;
+ return activeServiceContext.getSystemCredentialsForApps();
}
+
+ @Private
+ @Unstable
+ public RMActiveServiceContext getActiveServiceContext() {
+ return activeServiceContext;
+ }
+
+ @Private
+ @Unstable
+ void setActiveServiceContext(RMActiveServiceContext activeServiceContext) {
+ this.activeServiceContext = activeServiceContext;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db31ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index e0840b6..3ce42a3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -400,6 +400,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private ContainerAllocationExpirer containerAllocationExpirer;
private ResourceManager rm;
private boolean recoveryEnabled;
+ private RMActiveServiceContext activeServiceContext;
RMActiveServices(ResourceManager rm) {
super("RMActiveServices");
@@ -408,6 +409,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
+ activeServiceContext = new RMActiveServiceContext();
+ rmContext.setActiveServiceContext(activeServiceContext);
+
conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
rmSecretManagerService = createRMSecretManagerService();
@@ -1008,11 +1012,15 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
- rmContext.getRMNodes().clear();
- rmContext.getInactiveRMNodes().clear();
- rmContext.getRMApps().clear();
- ClusterMetrics.destroy();
- QueueMetrics.clearQueueMetrics();
+ }
+ }
+
+ void reinitialize(boolean initialize) throws Exception {
+ ClusterMetrics.destroy();
+ QueueMetrics.clearQueueMetrics();
+ if (initialize) {
+ resetDispatcher();
+ createAndInitActiveServices();
}
}
@@ -1036,8 +1044,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
startActiveServices();
return null;
} catch (Exception e) {
- resetDispatcher();
- createAndInitActiveServices();
+ reinitialize(true);
throw e;
}
}
@@ -1059,10 +1066,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
if (rmContext.getHAServiceState() ==
HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices();
- if (initialize) {
- resetDispatcher();
- createAndInitActiveServices();
- }
+ reinitialize(initialize);
}
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/db31ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index c6d7d09..122eb60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -513,6 +513,68 @@ public class TestRMHA {
rm.stop();
}
+ @Test
+ public void testFailoverClearsRMContext() throws Exception {
+ configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+ configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+ Configuration conf = new YarnConfiguration(configuration);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // 1. start RM
+ rm = new MockRM(conf, memStore);
+ rm.init(conf);
+ rm.start();
+
+ StateChangeRequestInfo requestInfo =
+ new StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+ checkMonitorHealth();
+ checkStandbyRMFunctionality();
+
+ // 2. Transition to active
+ rm.adminService.transitionToActive(requestInfo);
+ checkMonitorHealth();
+ checkActiveRMFunctionality();
+ verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
+ assertEquals(1, rm.getRMContext().getRMNodes().size());
+ assertEquals(1, rm.getRMContext().getRMApps().size());
+
+ // 3. Create new RM
+ rm = new MockRM(conf, memStore) {
+ @Override
+ protected ResourceTrackerService createResourceTrackerService() {
+ return new ResourceTrackerService(this.rmContext,
+ this.nodesListManager, this.nmLivelinessMonitor,
+ this.rmContext.getContainerTokenSecretManager(),
+ this.rmContext.getNMTokenSecretManager()) {
+ @Override
+ protected void serviceStart() throws Exception {
+ throw new Exception("ResourceTracker service failed");
+ }
+ };
+ }
+ };
+ rm.init(conf);
+ rm.start();
+ checkMonitorHealth();
+ checkStandbyRMFunctionality();
+
+ // 4. Try Transition to active, throw exception
+ try {
+ rm.adminService.transitionToActive(requestInfo);
+ Assert.fail("Transitioned to Active should throw exception.");
+ } catch (Exception e) {
+ assertTrue("Error when transitioning to Active mode".contains(e
+ .getMessage()));
+ }
+ // 5. Clears the metrics
+ verifyClusterMetrics(0, 0, 0, 0, 0, 0);
+ assertEquals(0, rm.getRMContext().getRMNodes().size());
+ assertEquals(0, rm.getRMContext().getRMApps().size());
+ }
+
public void innerTestHAWithRMHostName(boolean includeBindHost) {
//this is run two times, with and without a bind host configured
if (includeBindHost) {