You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2016/12/13 21:37:35 UTC
hadoop git commit: YARN-4438. Implement RM leader election with
curator. Contributed by Jian He
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 6a18ae849 -> 120f3a0ff
YARN-4438. Implement RM leader election with curator. Contributed by Jian He
(cherry picked from commit 89022f8d4bac0e9d0b848fd91e9c4d700fe1cdbe)
(cherry picked from commit 2cbbf76c3dd9570e08914077de8e610d93ee0c9e)
Conflicts:
hadoop-yarn-project/CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/120f3a0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/120f3a0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/120f3a0f
Branch: refs/heads/branch-2.8
Commit: 120f3a0ff9da6252aa8004ee43242d75d36805f0
Parents: 6a18ae8
Author: Xuan <xg...@apache.org>
Authored: Thu Jan 7 14:33:06 2016 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Tue Dec 13 13:30:15 2016 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../yarn/conf/TestYarnConfigurationFields.java | 1 +
.../server/resourcemanager/AdminService.java | 49 +++-
.../resourcemanager/LeaderElectorService.java | 144 ++++++++++
.../yarn/server/resourcemanager/RMContext.java | 4 +
.../server/resourcemanager/RMContextImpl.java | 11 +
.../server/resourcemanager/ResourceManager.java | 24 +-
.../TestLeaderElectorService.java | 269 +++++++++++++++++++
.../yarn/server/resourcemanager/TestRMHA.java | 8 +-
9 files changed, 494 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5ce171c..8f2d02c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -550,6 +550,11 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries";
+ @Private
+ public static final String CURATOR_LEADER_ELECTOR =
+ RM_HA_PREFIX + "curator-leader-elector.enabled";
+ public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
+
////////////////////////////////
// RM state store configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index dcfbd5d..65e1c9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -90,6 +90,7 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
.add(YarnConfiguration.DEFAULT_SHARED_CACHE_CHECKSUM_ALGO_IMPL);
configurationPropsToSkipCompare
.add(YarnConfiguration.DEFAULT_AMRM_PROXY_INTERCEPTOR_CLASS_PIPELINE);
+ configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR);
// Ignore blacklisting nodes for AM failures feature since it is still a
// "work in progress"
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index 20ce76b..82e8578 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -109,6 +109,7 @@ public class AdminService extends CompositeService implements
private String rmId;
private boolean autoFailoverEnabled;
+ private boolean curatorEnabled;
private EmbeddedElectorService embeddedElector;
private Server server;
@@ -135,13 +136,16 @@ public class AdminService extends CompositeService implements
@Override
public void serviceInit(Configuration conf) throws Exception {
if (rmContext.isHAEnabled()) {
+ curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+ YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
- if (autoFailoverEnabled) {
+ if (autoFailoverEnabled && !curatorEnabled) {
if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
embeddedElector = createEmbeddedElectorService();
addIfService(embeddedElector);
}
}
+
}
masterServiceBindAddress = conf.getSocketAddr(
@@ -322,7 +326,7 @@ public class AdminService extends CompositeService implements
rm.transitionToActive();
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
- "", "RMHAProtocolService",
+ "", "RM",
"Exception transitioning to active");
throw new ServiceFailedException(
"Error when transitioning to Active mode", e);
@@ -341,7 +345,7 @@ public class AdminService extends CompositeService implements
"Error on refreshAll during transistion to Active", e);
}
RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
- "RMHAProtocolService");
+ "RM");
}
@Override
@@ -359,10 +363,10 @@ public class AdminService extends CompositeService implements
try {
rm.transitionToStandby(true);
RMAuditLogger.logSuccess(user.getShortUserName(),
- "transitionToStandby", "RMHAProtocolService");
+ "transitionToStandby", "RM");
} catch (Exception e) {
RMAuditLogger.logFailure(user.getShortUserName(), "transitionToStandby",
- "", "RMHAProtocolService",
+ "", "RM",
"Exception transitioning to standby");
throw new ServiceFailedException(
"Error when transitioning to Standby mode", e);
@@ -372,15 +376,28 @@ public class AdminService extends CompositeService implements
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
- HAServiceState haState = rmContext.getHAServiceState();
- HAServiceStatus ret = new HAServiceStatus(haState);
- if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
- ret.setReadyToBecomeActive();
+ if (curatorEnabled) {
+ HAServiceStatus state;
+ if (rmContext.getLeaderElectorService().hasLeaderShip()) {
+ state = new HAServiceStatus(HAServiceState.ACTIVE);
+ } else {
+ state = new HAServiceStatus(HAServiceState.STANDBY);
+ }
+ // set empty string to avoid NPE at
+ // HAServiceProtocolServerSideTranslatorPB#getServiceStatus
+ state.setNotReadyToBecomeActive("");
+ return state;
} else {
- ret.setNotReadyToBecomeActive("State is " + haState);
+ HAServiceState haState = rmContext.getHAServiceState();
+ HAServiceStatus ret = new HAServiceStatus(haState);
+ if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
+ ret.setReadyToBecomeActive();
+ } else {
+ ret.setNotReadyToBecomeActive("State is " + haState);
+ }
+ return ret;
}
- return ret;
- }
+ }
@Override
public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
@@ -885,6 +902,12 @@ public class AdminService extends CompositeService implements
} else if (!autoFailoverEnabled) {
return "Auto Failover is not enabled.";
}
- return this.embeddedElector.getHAZookeeperConnectionState();
+ if (curatorEnabled) {
+ return "Connected to zookeeper : " + rmContext
+ .getLeaderElectorService().getCuratorClient().getZookeeperClient()
+ .isConnected();
+ } else {
+ return this.embeddedElector.getHAZookeeperConnectionState();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
new file mode 100644
index 0000000..3766676
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+
+
+public class LeaderElectorService extends AbstractService implements
+ LeaderLatchListener {
+ public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
+ private LeaderLatch leaderLatch;
+ private CuratorFramework curator;
+ private RMContext rmContext;
+ private String latchPath;
+ private String rmId;
+
+ public LeaderElectorService(RMContext rmContext) {
+ super(LeaderElectorService.class.getName());
+ this.rmContext = rmContext;
+
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ Preconditions.checkNotNull(zkHostPort,
+ YarnConfiguration.RM_ZK_ADDRESS + " is not set");
+
+ rmId = HAUtil.getRMHAId(conf);
+ String clusterId = YarnConfiguration.getClusterId(conf);
+
+ int zkSessionTimeout = conf.getInt(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+ int maxRetryNum = conf.getInt(YarnConfiguration.RM_ZK_NUM_RETRIES,
+ YarnConfiguration.DEFAULT_ZK_RM_NUM_RETRIES);
+
+ String zkBasePath = conf.get(
+ YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+ latchPath = zkBasePath + "/" + clusterId;
+
+ curator = CuratorFrameworkFactory.builder().connectString(zkHostPort)
+ .retryPolicy(new RetryNTimes(maxRetryNum, zkSessionTimeout)).build();
+ curator.start();
+ initAndStartLeaderLatch();
+ super.serviceInit(conf);
+ }
+
+ private void initAndStartLeaderLatch() throws Exception {
+ leaderLatch = new LeaderLatch(curator, latchPath, rmId);
+ leaderLatch.addListener(this);
+ leaderLatch.start();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ closeLeaderLatch();
+ super.serviceStop();
+ }
+
+ public boolean hasLeaderShip() {
+ return leaderLatch.hasLeadership();
+ }
+
+
+ @Override
+ public void isLeader() {
+ LOG.info(rmId + "is elected leader, transitioning to active");
+ try {
+ rmContext.getRMAdminService().transitionToActive(
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+ } catch (Exception e) {
+ LOG.info(rmId + " failed to transition to active, giving up leadership",
+ e);
+ notLeader();
+ reJoinElection();
+ }
+ }
+
+ public void reJoinElection() {
+ try {
+ closeLeaderLatch();
+ Thread.sleep(1000);
+ initAndStartLeaderLatch();
+ } catch (Exception e) {
+ LOG.info("Fail to re-join election.", e);
+ }
+ }
+
+ private void closeLeaderLatch() throws IOException {
+ if (leaderLatch != null) {
+ leaderLatch.close();
+ }
+ }
+ @Override
+ public void notLeader() {
+ LOG.info(rmId + " relinquish leadership");
+ try {
+ rmContext.getRMAdminService().transitionToStandby(
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+ } catch (Exception e) {
+ LOG.info(rmId + " did not transition to standby successfully.");
+ }
+ }
+
+ // only for testing
+ @VisibleForTesting
+ public CuratorFramework getCuratorClient() {
+ return this.curator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 9802a37..f50da3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -135,4 +135,8 @@ public interface RMContext {
PlacementManager getQueuePlacementManager();
void setQueuePlacementManager(PlacementManager placementMgr);
+
+ void setLeaderElectorService(LeaderElectorService elector);
+
+ LeaderElectorService getLeaderElectorService();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index a549930..0199638 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -72,6 +72,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
+ private LeaderElectorService elector;
private final Object haServiceStateLock = new Object();
@@ -136,6 +137,16 @@ public class RMContextImpl implements RMContext {
}
@Override
+ public void setLeaderElectorService(LeaderElectorService elector) {
+ this.elector = elector;
+ }
+
+ @Override
+ public LeaderElectorService getLeaderElectorService() {
+ return this.elector;
+ }
+
+ @Override
public RMStateStore getStateStore() {
return activeServiceContext.getStateStore();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 7e4eb91..76b8a7b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -157,6 +157,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
private AppReportFetcher fetcher = null;
protected ResourceTrackerService resourceTracker;
private JvmPauseMonitor pauseMonitor;
+ private boolean curatorEnabled = false;
@VisibleForTesting
protected String webAppAddress;
@@ -233,6 +234,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
+ curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+ YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
+ if (curatorEnabled) {
+ LeaderElectorService elector = new LeaderElectorService(rmContext);
+ addService(elector);
+ rmContext.setLeaderElectorService(elector);
+ }
}
// Set UGI and do login
@@ -772,7 +780,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
- adminService.resetLeaderElection();
+ if (curatorEnabled) {
+ rmContext.getLeaderElectorService().reJoinElection();
+ } else {
+ adminService.resetLeaderElection();
+ }
return;
} catch (Exception e) {
LOG.fatal("Failed to transition RM to Standby mode.");
@@ -982,7 +994,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
* instance of {@link RMActiveServices} and initializes it.
* @throws Exception
*/
- protected void createAndInitActiveServices() throws Exception {
+ protected void createAndInitActiveServices() {
activeServices = new RMActiveServices(this);
activeServices.init(conf);
}
@@ -1002,14 +1014,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
* Helper method to stop {@link #activeServices}.
* @throws Exception
*/
- void stopActiveServices() throws Exception {
+ void stopActiveServices() {
if (activeServices != null) {
activeServices.stop();
activeServices = null;
}
}
- void reinitialize(boolean initialize) throws Exception {
+ void reinitialize(boolean initialize) {
ClusterMetrics.destroy();
QueueMetrics.clearQueueMetrics();
if (initialize) {
@@ -1028,7 +1040,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
LOG.info("Already in active state");
return;
}
-
LOG.info("Transitioning to active state");
this.rmLoginUGI.doAs(new PrivilegedExceptionAction<Void>() {
@@ -1069,7 +1080,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
protected void serviceStart() throws Exception {
if (this.rmContext.isHAEnabled()) {
- transitionToStandby(true);
+ transitionToStandby(false);
} else {
transitionToActive();
}
@@ -1324,4 +1335,5 @@ public class ResourceManager extends CompositeService implements Recoverable {
out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n");
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
new file mode 100644
index 0000000..bb10041
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
@@ -0,0 +1,269 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.base.Supplier;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.KillSession;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class TestLeaderElectorService {
+
+ private static final String RM1_ADDRESS = "1.1.1.1:1";
+ private static final String RM1_NODE_ID = "rm1";
+
+ private static final String RM2_ADDRESS = "0.0.0.0:0";
+ private static final String RM2_NODE_ID = "rm2";
+
+ Configuration conf ;
+ MockRM rm1;
+ MockRM rm2;
+ TestingCluster zkCluster;
+ @Before
+ public void setUp() throws Exception {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.INFO);
+ conf = new Configuration();
+ conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+ conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
+ conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
+
+ conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
+ conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
+
+ for (String confKey : YarnConfiguration.getServiceAddressConfKeys(conf)) {
+ conf.set(HAUtil.addSuffix(confKey, RM1_NODE_ID), RM1_ADDRESS);
+ conf.set(HAUtil.addSuffix(confKey, RM2_NODE_ID), RM2_ADDRESS);
+ }
+ zkCluster = new TestingCluster(3);
+ conf.set(YarnConfiguration.RM_ZK_ADDRESS, zkCluster.getConnectString());
+ zkCluster.start();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (rm1 != null) {
+ rm1.stop();
+ }
+ if (rm2 !=null) {
+ rm2.stop();
+ }
+ }
+
+ // 1. rm1 active
+ // 2. rm2 standby
+ // 3. stop rm1
+ // 4. rm2 become active
+ @Test (timeout = 20000)
+ public void testRMShutDownCauseFailover() throws Exception {
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+ rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+ // wait for some time to make sure rm2 will not become active;
+ Thread.sleep(5000);
+ waitFor(rm2, HAServiceState.STANDBY);
+
+ rm1.stop();
+ // rm2 should become active;
+ waitFor(rm2, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. rm2 standby
+ // 3. submit a job to rm1 which triggers state-store failure.
+ // 4. rm2 become
+ @Test
+ public void testStateStoreFailureCauseFailover() throws Exception {
+
+ conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+ MemoryRMStateStore memStore = new MemoryRMStateStore() {
+ @Override
+ public synchronized void storeApplicationStateInternal(ApplicationId
+ appId, ApplicationStateData appState) throws Exception{
+ throw new Exception("store app failure.");
+ }
+ };
+ memStore.init(conf);
+ rm1 = new MockRM(conf, memStore);
+ rm1.init(conf);
+ rm1.start();
+
+ waitFor(rm1, HAServiceState.ACTIVE);
+
+ rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+ // submit an app which will trigger state-store failure.
+ rm1.submitApp(200, "app1", "user1", null, "default", false);
+ waitFor(rm1, HAServiceState.STANDBY);
+
+ // rm2 should become active;
+ waitFor(rm2, HAServiceState.ACTIVE);
+
+ rm2.stop();
+ // rm1 will become active again
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. restart zk cluster
+ // 3. rm1 will first relinquish leadership and re-acquire leadership
+ @Test
+ public void testZKClusterDown() throws Exception {
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+
+ // stop zk cluster
+ zkCluster.stop();
+ waitFor(rm1, HAServiceState.STANDBY);
+
+ Collection<InstanceSpec> instanceSpecs = zkCluster.getInstances();
+ zkCluster = new TestingCluster(instanceSpecs);
+ zkCluster.start();
+ // rm becomes active again
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. kill the zk session between the rm and zk cluster.
+ // 3. rm1 will first relinquish leadership and re-acquire leadership
+ @Test
+ public void testExpireCurrentZKSession() throws Exception{
+
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+
+ LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
+ CuratorZookeeperClient client =
+ service.getCuratorClient().getZookeeperClient();
+ // this will expire current curator client session. curator will re-establish
+ // the session. RM will first relinquish leadership and re-acquire leadership
+ KillSession
+ .kill(client.getZooKeeper(), client.getCurrentConnectionString());
+
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 fail to become active.
+ // 2. rm1 will rejoin leader election and retry the leadership
+ @Test
+ public void testRMFailToTransitionToActive() throws Exception{
+ conf.set(YarnConfiguration.RM_HA_ID, "rm1");
+ final AtomicBoolean throwException = new AtomicBoolean(true);
+ Thread launchRM = new Thread() {
+ @Override
+ public void run() {
+ rm1 = new MockRM(conf) {
+ @Override
+ synchronized void transitionToActive() throws Exception {
+ if (throwException.get()) {
+ throw new Exception("Fail to transition to active");
+ } else {
+ super.transitionToActive();
+ }
+ }
+ };
+ rm1.init(conf);
+ rm1.start();
+ }
+ };
+ launchRM.start();
+ // wait some time, rm will keep retry the leadership;
+ Thread.sleep(5000);
+ throwException.set(false);
+ waitFor(rm1, HAServiceState.ACTIVE);
+ }
+
+ // 1. rm1 active
+ // 2. rm2 standby
+ // 3. kill the current connected zk instance
+ // 4. either rm1 or rm2 will become active.
+ @Test
+ public void testKillZKInstance() throws Exception {
+ rm1 = startRM("rm1", HAServiceState.ACTIVE);
+ rm2 = startRM("rm2", HAServiceState.STANDBY);
+
+ ZooKeeper zkClient =
+ rm1.getRMContext().getLeaderElectorService().getCuratorClient()
+ .getZookeeperClient().getZooKeeper();
+ InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
+ zkCluster.killServer(connectionInstance);
+
+ // wait for rm1 or rm2 to be active by randomness
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ try {
+ HAServiceState rm1State =
+ rm1.getAdminService().getServiceStatus().getState();
+ HAServiceState rm2State =
+ rm2.getAdminService().getServiceStatus().getState();
+ return (rm1State.equals(HAServiceState.ACTIVE) && rm2State
+ .equals(HAServiceState.STANDBY)) || (
+ rm1State.equals(HAServiceState.STANDBY) && rm2State
+ .equals(HAServiceState.ACTIVE));
+ } catch (IOException e) {
+ }
+ return false;
+ }
+ }, 2000, 15000);
+ }
+
+ private MockRM startRM(String rmId, HAServiceState state) throws Exception{
+ YarnConfiguration yarnConf = new YarnConfiguration(conf);
+ yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
+ MockRM rm = new MockRM(yarnConf);
+ rm.init(yarnConf);
+ rm.start();
+ waitFor(rm, state);
+ return rm;
+ }
+
+ private void waitFor(final MockRM rm,
+ final HAServiceState state)
+ throws TimeoutException, InterruptedException {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override public Boolean get() {
+ try {
+ return rm.getAdminService().getServiceStatus().getState()
+ .equals(state);
+ } catch (IOException e) {
+ }
+ return false;
+ }
+ }, 2000, 15000);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/120f3a0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 0b9e57b..d5e6281 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -471,8 +471,12 @@ public class TestRMHA {
memStore.init(conf);
rm = new MockRM(conf, memStore) {
@Override
- void stopActiveServices() throws Exception {
- Thread.sleep(10000);
+ void stopActiveServices() {
+ try {
+ Thread.sleep(10000);
+ } catch (Exception e) {
+ throw new RuntimeException (e);
+ }
super.stopActiveServices();
}
};
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org