You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/01/07 23:33:56 UTC
hadoop git commit: YARN-4438. Implement RM leader election with
curator. Contributed by Jian He
Repository: hadoop
Updated Branches:
refs/heads/trunk 52b77577c -> 89022f8d4
YARN-4438. Implement RM leader election with curator. Contributed by Jian He
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/89022f8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/89022f8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/89022f8d
Branch: refs/heads/trunk
Commit: 89022f8d4bac0e9d0b848fd91e9c4d700fe1cdbe
Parents: 52b7757
Author: Xuan <xg...@apache.org>
Authored: Thu Jan 7 14:33:06 2016 -0800
Committer: Xuan <xg...@apache.org>
Committed: Thu Jan 7 14:33:06 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 2 +
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../yarn/conf/TestYarnConfigurationFields.java | 1 +
.../server/resourcemanager/AdminService.java | 49 +++-
.../resourcemanager/LeaderElectorService.java | 144 ++++++++++
.../yarn/server/resourcemanager/RMContext.java | 4 +
.../server/resourcemanager/RMContextImpl.java | 11 +
.../server/resourcemanager/ResourceManager.java | 24 +-
.../TestLeaderElectorService.java | 269 +++++++++++++++++++
.../yarn/server/resourcemanager/TestRMHA.java | 8 +-
10 files changed, 496 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 5273614..00d31d8 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -86,6 +86,8 @@ Release 2.9.0 - UNRELEASED
YARN-4524. Cleanup AppSchedulingInfo. (Karthik Kambatla via wangda)
+ YARN-4438. Implement RM leader election with curator. (Jian He via xgong)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 9a1eb54..37c81ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -545,6 +545,11 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries";
+ @Private
+ public static final String CURATOR_LEADER_ELECTOR =
+ RM_HA_PREFIX + "curator-leader-elector.enabled";
+ public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
+
////////////////////////////////
// RM state store configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 41c3d87..0e508ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -90,6 +90,7 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+ configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Ignore all YARN Application Timeline Service (version 1) properties
configurationPrefixToSkipCompare.add("yarn.timeline-service.");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 353e72d..fcce722 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -106,6 +106,7 @@ public class AdminService extends CompositeService implements
private String rmId;
private boolean autoFailoverEnabled;
+ private boolean curatorEnabled;
private EmbeddedElectorService embeddedElector;
private Server server;
@@ -132,13 +133,16 @@ public class AdminService extends CompositeService implements
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
+ curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+ YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
- if (autoFailoverEnabled) {
+ if (autoFailoverEnabled && !curatorEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
+
}
masterServiceBindAddress = conf.getSocketAddr(
@@ -319,7 +323,7 @@ public class AdminService extends CompositeService implements
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
- "", "RMHAProtocolService",
+ "", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
@@ -338,7 +342,7 @@ public class AdminService extends CompositeService implements
"Error on refreshAll during transistion to Active", e);
}
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
- "RMHAProtocolService");
+ "RM");
}
@Override
@@ -356,10 +360,10 @@ public class AdminService extends CompositeService implements
try {
rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
- "transitionToStandby", "RMHAProtocolService");
+ "transitionToStandby", "RM");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
- "", "RMHAProtocolService",
+ "", "RM",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
@@ -369,15 +373,28 @@ public class AdminService extends CompositeService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
- HAServiceState haState = rmContext.getHAServiceState();
- HAServiceStatus ret = new HAServiceStatus(haState);
- if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
- ret.setReadyToBecomeActive();
+ if (curatorEnabled) {
+ HAServiceStatus state;
+ if (rmContext.getLeaderElectorService().hasLeaderShip()) {
+ state = new HAServiceStatus(HAServiceState.ACTIVE);
+ } else {
+ state = new HAServiceStatus(HAServiceState.STANDBY);
+ }
+ // set empty string to avoid NPE at
+ // HAServiceProtocolServerSideTranslatorPB#getServiceStatus
+ state.setNotReadyToBecomeActive("");
+ return state;
} else {
- ret.setNotReadyToBecomeActive("State is " + haState);
+ HAServiceState haState = rmContext.getHAServiceState();
+ HAServiceStatus ret = new HAServiceStatus(haState);
+ if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
+ ret.setReadyToBecomeActive();
+ } else {
+ ret.setNotReadyToBecomeActive("State is " + haState);
+ }
+ return ret;
}
- return ret;
- }
+ }
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
@@ -837,6 +854,12 @@ public class AdminService extends CompositeService implements
} else if (!autoFailoverEnabled) {
return "Auto Failover is not enabled.";
}
- return this.embeddedElector.getHAZookeeperConnectionState();
+ if (curatorEnabled) {
+ return "Connected to zookeeper : " + rmContext
+ .getLeaderElectorService().getCuratorClient().getZookeeperClient()
+ .isConnected();
+ } else {
+ return this.embeddedElector.getHAZookeeperConnectionState();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
new file mode 100644
index 0000000..3766676
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+
+
+public class LeaderElectorService extends AbstractService implements
+ LeaderLatchListener {
+ public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
+ private LeaderLatch leaderLatch;
+ private CuratorFramework curator;
+ private RMContext rmContext;
+ private String latchPath;
+ private String rmId;
+
+ public LeaderElectorService(RMContext rmContext) {
+ super(LeaderElectorService.class.getName());
+ this.rmContext = rmContext;
+
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ Preconditions.checkNotNull(zkHostPort,
+ YarnConfiguration.RM_ZK_ADDRESS + " is not set");
+
+ rmId = HAUtil.getRMHAId(conf);
+ String clusterId = YarnConfiguration.getClusterId(conf);
+
+ int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
+ YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+
+ String zkBasePath = conf.get(
+ YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+ latchPath = zkBasePath + "/" + clusterId;
+
+ curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
+ .retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
+ curator.start();
+ initAndStartLeaderLatch();
+ super.serviceInit(conf);
+ }
+
+ private void initAndStartLeaderLatch() throws Exception {
+ leaderLatch = new LeaderLatch(curator, latchPath, rmId);
+ leaderLatch.addListener(this);
+ leaderLatch.start();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ closeLeaderLatch();
+ super.serviceStop();
+ }
+
+ public boolean hasLeaderShip() {
+ return leaderLatch.hasLeadership();
+ }
+
+
+ @Override
+ public void isLeader() {
+ LOG.info(rmId + "is elected leader, transitioning to active");
+ try {
+ rmContext.getRMAdminService().transitionToActive(
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+ } catch (Exception e) {
+ LOG.info(rmId + " failed to transition to active, giving up leadership",
+ e);
+ notLeader();
+ reJoinElection();
+ }
+ }
+
+ public void reJoinElection() {
+ try {
+ closeLeaderLatch();
+ Thread.sleep(1000);
+ initAndStartLeaderLatch();
+ } catch (Exception e) {
+ LOG.info("Fail to re-join election.", e);
+ }
+ }
+
+ private void closeLeaderLatch() throws IOException {
+ if (leaderLatch != null) {
+ leaderLatch.close();
+ }
+ }
+ @Override
+ public void notLeader() {
+ LOG.info(rmId + " relinquish leadership");
+ try {
+ rmContext.getRMAdminService().transitionToStandby(
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+ } catch (Exception e) {
+ LOG.info(rmId + " did not transition to standby successfully.");
+ }
+ }
+
+ // only for testing
+ @VisibleForTesting
+ public CuratorFramework getCuratorClient() {
+ return this.curator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 9802a37..f50da3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -135,4 +135,8 @@ public interface RMContext {
PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr);
+
+ void setLeaderElectorService(LeaderElectorService elector);
+
+ LeaderElectorService getLeaderElectorService();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index ed9942b..ec2aeb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
+ private LeaderElectorService elector;
/**
* Default constructor. To be used in conjunction with setter methods for
@@ -134,6 +135,16 @@ public class RMContextImpl implements RMContext {
}
@Override
+ public void setLeaderElectorService(LeaderElectorService elector) {
+ this.elector = elector;
+ }
+
+ @Override
+ public LeaderElectorService getLeaderElectorService() {
+ return this.elector;
+ }
+
+ @Override
public RMStateStore getStateStore() {
return activeServiceContext.getStateStore();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index aada69f..3b23ad8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor;
+ private boolean curatorEnabled = false;
@VisibleForTesting
protected String webAppAddress;
@@ -228,6 +229,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
+ curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+ YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
+ if (curatorEnabled) {
+ LeaderElectorService elector = new LeaderElectorService(rmContext);
+ addService(elector);
+ rmContext.setLeaderElectorService(elector);
+ }
}
// Set UGI and do login
@@ -759,7 +767,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
- adminService.resetLeaderElection();
+ if (curatorEnabled) {
+ rmContext.getLeaderElectorService().reJoinElection();
+ } else {
+ adminService.resetLeaderElection();
+ }
return;
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.");
@@ -996,7 +1008,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* instance of {@link RMActiveServices} and initializes it.
* @throws Exception
*/
- protected void createAndInitActiveServices() throws Exception {
+ protected void createAndInitActiveServices() {
activeServices = new RMActiveServices(this);
activeServices.init(conf);
}
@@ -1016,14 +1028,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
* Helper method to stop {@link #activeServices}.
* @throws Exception
*/
- void stopActiveServices() throws Exception {
+ void stopActiveServices() {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
}
}
- void reinitialize(boolean initialize) throws Exception {
+ void reinitialize(boolean initialize) {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
@@ -1042,7 +1054,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.info("Already in active state");
return;
}
-
LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@@ -1083,7 +1094,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) {
- transitionToStandby(true);
+ transitionToStandby(false);
} else {
transitionToActive();
}
@@ -1338,4 +1349,5 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n");
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
new file mode 100644
index 0000000..bb10041
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.base.Supplier;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestLeaderElectorService {
+
+ private static final String RM1_ADDRESS = "1.1.1.1:1";
+ private static final String RM1_NODE_ID = "rm1";
+
+ private static final String RM2_ADDRESS = "0.0.0.0:0";
+ private static final String RM2_NODE_ID = "rm2";
+
+ Configuration conf ;
+ MockRM rm1;
+ MockRM rm2;
+ TestingCluster zkCluster;
+ @Before
+ public void setUp() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.INFO);
+ conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
+
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+
+ for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
+ conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
+ conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
+ }
+ zkCluster = new TestingCluster(3);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
+ zkCluster.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (rm1 != null) {
+ rm1.stop();
+ }
+ if (rm2 !=null) {
+ rm2.stop();
+ }
+ }
+
+ // 1. rm1 active
+ // 2. rm2 standby
+ // 3. stop rm1
+ // 4. rm2 become active
+ @Test (timeout = 20000)
+ public void testRMShutDownCauseFailover() throws Exception {
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+ rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+ // wait for some time to make sure rm2 will not become active;
+ Thread.sleep(5000);
+ waitFor(rm2, HAServiceState.STANDBY);
+
+ rm1.stop();
+ // rm2 should become active;
+ waitFor(rm2, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. rm2 standby
+ // 3. submit a job to rm1 which triggers state-store failure.
+ // 4. rm2 become
+ @Test
+ public void testStateStoreFailureCauseFailover() throws Exception {
+
+ conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ @Override
+ public synchronized void storeApplicationStateInternal(ApplicationId
+ appId, ApplicationStateData appState) throws Exception{
+ throw new Exception("store app failure.");
+ }
+ };
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.init(conf);
+ rm1.start();
+
+ waitFor(rm1, HAServiceState.ACTIVE);
+
+ rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+ // submit an app which will trigger state-store failure.
+ rm1.submitApp(200, "app1", "user1", null, "default", false);
+ waitFor(rm1, HAServiceState.STANDBY);
+
+ // rm2 should become active;
+ waitFor(rm2, HAServiceState.ACTIVE);
+
+ rm2.stop();
+ // rm1 will become active again
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. restart zk cluster
+ // 3. rm1 will first relinquish leadership and re-acquire leadership
+ @Test
+ public void testZKClusterDown() throws Exception {
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+
+ // stop zk cluster
+ zkCluster.stop();
+ waitFor(rm1, HAServiceState.STANDBY);
+
+ Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
+ zkCluster = new TestingCluster(instanceSpecs);
+ zkCluster.start();
+ // rm becomes active again
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. kill the zk session between the rm and zk cluster.
+ // 3. rm1 will first relinquish leadership and re-acquire leadership
+ @Test
+ public void testExpireCurrentZKSession() throws Exception{
+
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+
+ LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
+ CuratorZookeeperClient client =
+ service.getCuratorClient().getZookeeperClient();
+ // this will expire current curator client session. curator will re-establish
+ // the session. RM will first relinquish leadership and re-acquire leadership
+ KillSession
+ .kill(client.getZooKeeper(), client.getCurrentConnectionString());
+
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 fail to become active.
+ // 2. rm1 will rejoin leader election and retry the leadership
+ @Test
+ public void testRMFailToTransitionToActive() throws Exception{
+ conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+ final AtomicBoolean throwException = new AtomicBoolean(true);
+ Thread launchRM = new Thread() {
+ @Override
+ public void run() {
+ rm1 = new MockRM(conf) {
+ @Override
+ synchronized void transitionToActive() throws Exception {
+ if (throwException.get()) {
+ throw new Exception("Fail to transition to active");
+ } else {
+ super.transitionToActive();
+ }
+ }
+ };
+ rm1.init(conf);
+ rm1.start();
+ }
+ };
+ launchRM.start();
+ // wait some time, rm will keep retry the leadership;
+ Thread.sleep(5000);
+ throwException.set(false);
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. rm2 standby
+ // 3. kill the current connected zk instance
+ // 4. either rm1 or rm2 will become active.
+ @Test
+ public void testKillZKInstance() throws Exception {
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+ rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+ ZooKeeper zkClient =
+ rm1.getRMContext().getLeaderElectorService().getCuratorClient()
+ .getZookeeperClient().getZooKeeper();
+ InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
+ zkCluster.killServer(connectionInstance);
+
+ // wait for rm1 or rm2 to be active by randomness
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ try {
+ HAServiceState rm1State =
+ rm1.getAdminService().getServiceStatus().getState();
+ HAServiceState rm2State =
+ rm2.getAdminService().getServiceStatus().getState();
+ return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
+ .equals(HAServiceState.STANDBY)) || (
+ rm1State.equals(HAServiceState.STANDBY) && rm2State
+ .equals(HAServiceState.ACTIVE));
+ } catch (IOException e) {
+ }
+ return false;
+ }
+ }, 2000, 15000);
+ }
+
+ private MockRM startRM(String rmId, HAServiceState state) throws Exception{
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
+ MockRM rm = new MockRM(yarnConf);
+ rm.init(yarnConf);
+ rm.start();
+ waitFor(rm, state);
+ return rm;
+ }
+
+ private void waitFor(final MockRM rm,
+ final HAServiceState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ try {
+ return rm.getAdminService().getServiceStatus().getState()
+ .equals(state);
+ } catch (IOException e) {
+ }
+ return false;
+ }
+ }, 2000, 15000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/89022f8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 62cfe84..70bba15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -471,8 +471,12 @@ public class TestRMHA {
memStore.init(conf);
rm = new MockRM(conf, memStore) {
@Override
- void stopActiveServices() throws Exception {
- Thread.sleep(10000);
+ void stopActiveServices() {
+ try {
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ throw new RuntimeException (e);
+ }
super.stopActiveServices();
}
};