You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:04:48 UTC
[20/50] [abbrv] hadoop git commit: YARN-5709. Cleanup leader election
configs and pluggability. Contribtued by Karthik Kambatla
YARN-5709. Cleanup leader election configs and pluggability. Contribtued by Karthik Kambatla
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a6410a54
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a6410a54
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a6410a54
Branch: refs/heads/HDFS-10285
Commit: a6410a542e59acd9827457df4a257a843f785c29
Parents: b0aace2
Author: Jian He <ji...@apache.org>
Authored: Fri Dec 9 16:38:49 2016 -0800
Committer: Jian He <ji...@apache.org>
Committed: Fri Dec 9 17:00:37 2016 -0800
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 14 +-
...ActiveStandbyElectorBasedElectorService.java | 271 +++++++++++++++++++
.../server/resourcemanager/AdminService.java | 75 +----
.../CuratorBasedElectorService.java | 139 ++++++++++
.../server/resourcemanager/EmbeddedElector.java | 41 +++
.../resourcemanager/EmbeddedElectorService.java | 260 ------------------
.../resourcemanager/LeaderElectorService.java | 129 ---------
.../yarn/server/resourcemanager/RMContext.java | 6 +-
.../server/resourcemanager/RMContextImpl.java | 15 +-
.../server/resourcemanager/ResourceManager.java | 39 ++-
.../server/resourcemanager/webapp/RMWebApp.java | 3 +-
.../resourcemanager/webapp/dao/ClusterInfo.java | 2 +-
.../yarn/server/resourcemanager/MockRM.java | 33 ++-
.../server/resourcemanager/RMHATestBase.java | 4 +-
.../TestLeaderElectorService.java | 17 +-
.../resourcemanager/TestRMEmbeddedElector.java | 49 ++--
.../yarn/server/resourcemanager/TestRMHA.java | 39 +--
17 files changed, 605 insertions(+), 531 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 69c7b00..dc7c629 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
@@ -654,9 +655,20 @@ public class YarnConfiguration extends Configuration {
public static final String RM_HA_FC_ELECTOR_ZK_RETRIES_KEY = RM_HA_PREFIX
+ "failover-controller.active-standby-elector.zk.retries";
- @Private
+
+ /**
+ * Whether to use curator-based elector for leader election.
+ *
+ * @deprecated Eventually, we want to default to the curator-based
+ * implementation and remove the {@link ActiveStandbyElector} based
+ * implementation. We should remove this config then.
+ */
+ @Unstable
+ @Deprecated
public static final String CURATOR_LEADER_ELECTOR =
RM_HA_PREFIX + "curator-leader-elector.enabled";
+ @Private
+ @Unstable
public static final boolean DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED = false;
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
new file mode 100644
index 0000000..751eedd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ActiveStandbyElectorBasedElectorService.java
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.ActiveStandbyElector;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.ServiceFailedException;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.ACL;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+
+/**
+ * Leader election implementation that uses {@link ActiveStandbyElector}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ActiveStandbyElectorBasedElectorService extends AbstractService
+ implements EmbeddedElector,
+ ActiveStandbyElector.ActiveStandbyElectorCallback {
+ private static final Log LOG = LogFactory.getLog(
+ ActiveStandbyElectorBasedElectorService.class.getName());
+ private static final HAServiceProtocol.StateChangeRequestInfo req =
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
+
+ private RMContext rmContext;
+
+ private byte[] localActiveNodeInfo;
+ private ActiveStandbyElector elector;
+ private long zkSessionTimeout;
+ private Timer zkDisconnectTimer;
+ @VisibleForTesting
+ final Object zkDisconnectLock = new Object();
+
+ ActiveStandbyElectorBasedElectorService(RMContext rmContext) {
+ super(ActiveStandbyElectorBasedElectorService.class.getName());
+ this.rmContext = rmContext;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf)
+ throws Exception {
+ conf = conf instanceof YarnConfiguration
+ ? conf
+ : new YarnConfiguration(conf);
+
+ String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
+ if (zkQuorum == null) {
+ throw new YarnRuntimeException("Embedded automatic failover " +
+ "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
+ " is not set");
+ }
+
+ String rmId = HAUtil.getRMHAId(conf);
+ String clusterId = YarnConfiguration.getClusterId(conf);
+ localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
+
+ String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+ String electionZNode = zkBasePath + "/" + clusterId;
+
+ zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
+ YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
+
+ List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
+ List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
+
+ int maxRetryNum =
+ conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
+ .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
+ CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
+ elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
+ electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
+
+ elector.ensureParentZNode();
+ if (!isParentZnodeSafe(clusterId)) {
+ notifyFatalError(electionZNode + " znode has invalid data! "+
+ "Might need formatting!");
+ }
+
+ super.serviceInit(conf);
+ }
+
+ @Override
+ protected void serviceStart() throws Exception {
+ elector.joinElection(localActiveNodeInfo);
+ super.serviceStart();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ /**
+ * When error occurs in serviceInit(), serviceStop() can be called.
+ * We need null check for the case.
+ */
+ if (elector != null) {
+ elector.quitElection(false);
+ elector.terminateConnection();
+ }
+ super.serviceStop();
+ }
+
+ @Override
+ public void becomeActive() throws ServiceFailedException {
+ cancelDisconnectTimer();
+
+ try {
+ rmContext.getRMAdminService().transitionToActive(req);
+ } catch (Exception e) {
+ throw new ServiceFailedException("RM could not transition to Active", e);
+ }
+ }
+
+ @Override
+ public void becomeStandby() {
+ cancelDisconnectTimer();
+
+ try {
+ rmContext.getRMAdminService().transitionToStandby(req);
+ } catch (Exception e) {
+ LOG.error("RM could not transition to Standby", e);
+ }
+ }
+
+ /**
+ * Stop the disconnect timer. Any running tasks will be allowed to complete.
+ */
+ private void cancelDisconnectTimer() {
+ synchronized (zkDisconnectLock) {
+ if (zkDisconnectTimer != null) {
+ zkDisconnectTimer.cancel();
+ zkDisconnectTimer = null;
+ }
+ }
+ }
+
+ /**
+ * When the ZK client loses contact with ZK, this method will be called to
+ * allow the RM to react. Because the loss of connection can be noticed
+ * before the session timeout happens, it is undesirable to transition
+ * immediately. Instead the method starts a timer that will wait
+ * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
+ * initiating the transition into standby state.
+ */
+ @Override
+ public void enterNeutralMode() {
+ LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
+ + zkSessionTimeout + " ms if connection is not reestablished.");
+
+ // If we've just become disconnected, start a timer. When the time's up,
+ // we'll transition to standby.
+ synchronized (zkDisconnectLock) {
+ if (zkDisconnectTimer == null) {
+ zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
+ zkDisconnectTimer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ synchronized (zkDisconnectLock) {
+ // Only run if the timer hasn't been cancelled
+ if (zkDisconnectTimer != null) {
+ becomeStandby();
+ }
+ }
+ }
+ }, zkSessionTimeout);
+ }
+ }
+ }
+
+ @SuppressWarnings(value = "unchecked")
+ @Override
+ public void notifyFatalError(String errorMessage) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED,
+ errorMessage));
+ }
+
+ @Override
+ public void fenceOldActive(byte[] oldActiveData) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Request to fence old active being ignored, " +
+ "as embedded leader election doesn't support fencing");
+ }
+ }
+
+ private static byte[] createActiveNodeInfo(String clusterId, String rmId)
+ throws IOException {
+ return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
+ .newBuilder()
+ .setClusterId(clusterId)
+ .setRmId(rmId)
+ .build()
+ .toByteArray();
+ }
+
+ private boolean isParentZnodeSafe(String clusterId)
+ throws InterruptedException, IOException, KeeperException {
+ byte[] data;
+ try {
+ data = elector.getActiveData();
+ } catch (ActiveStandbyElector.ActiveNotFoundException e) {
+ // no active found, parent znode is safe
+ return true;
+ }
+
+ YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
+ try {
+ proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
+ .parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
+ return false;
+ }
+
+ // Check if the passed proto corresponds to an RM in the same cluster
+ if (!proto.getClusterId().equals(clusterId)) {
+ LOG.error("Mismatched cluster! The other RM seems " +
+ "to be from a different cluster. Current cluster = " + clusterId +
+ "Other RM's cluster = " + proto.getClusterId());
+ return false;
+ }
+ return true;
+ }
+
+ // EmbeddedElector methods
+
+ @Override
+ public void rejoinElection() {
+ elector.quitElection(false);
+ elector.joinElection(localActiveNodeInfo);
+ }
+
+ @Override
+ public String getZookeeperConnectionState() {
+ return elector.getHAZookeeperConnectionState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
index c060659..028b6f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java
@@ -29,7 +29,6 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.HAServiceProtocol;
@@ -108,8 +107,6 @@ public class AdminService extends CompositeService implements
private String rmId;
private boolean autoFailoverEnabled;
- private boolean curatorEnabled;
- private EmbeddedElectorService embeddedElector;
private Server server;
@@ -134,18 +131,8 @@ public class AdminService extends CompositeService implements
@Override
public void serviceInit(Configuration conf) throws Exception {
- if (rmContext.isHAEnabled()) {
- curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
- YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
- autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
- if (autoFailoverEnabled && !curatorEnabled) {
- if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
- embeddedElector = createEmbeddedElectorService();
- addIfService(embeddedElector);
- }
- }
-
- }
+ autoFailoverEnabled =
+ rmContext.isHAEnabled() && HAUtil.isAutomaticFailoverEnabled(conf);
masterServiceBindAddress = conf.getSocketAddr(
YarnConfiguration.RM_BIND_HOST,
@@ -228,17 +215,6 @@ public class AdminService extends CompositeService implements
}
}
- protected EmbeddedElectorService createEmbeddedElectorService() {
- return new EmbeddedElectorService(rmContext);
- }
-
- @InterfaceAudience.Private
- void resetLeaderElection() {
- if (embeddedElector != null) {
- embeddedElector.resetLeaderElection();
- }
- }
-
private UserGroupInformation checkAccess(String method) throws IOException {
return RMServerUtils.verifyAdminAccess(authorizer, method, LOG);
}
@@ -375,30 +351,24 @@ public class AdminService extends CompositeService implements
}
}
+ /**
+ * Return the HA status of this RM. This includes the current state and
+ * whether the RM is ready to become active.
+ *
+ * @return {@link HAServiceStatus} of the current RM
+ * @throws IOException if the caller does not have permissions
+ */
@Override
public synchronized HAServiceStatus getServiceStatus() throws IOException {
checkAccess("getServiceState");
- if (curatorEnabled) {
- HAServiceStatus state;
- if (rmContext.getLeaderElectorService().hasLeaderShip()) {
- state = new HAServiceStatus(HAServiceState.ACTIVE);
- } else {
- state = new HAServiceStatus(HAServiceState.STANDBY);
- }
- // set empty string to avoid NPE at
- // HAServiceProtocolServerSideTranslatorPB#getServiceStatus
- state.setNotReadyToBecomeActive("");
- return state;
+ HAServiceState haState = rmContext.getHAServiceState();
+ HAServiceStatus ret = new HAServiceStatus(haState);
+ if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
+ ret.setReadyToBecomeActive();
} else {
- HAServiceState haState = rmContext.getHAServiceState();
- HAServiceStatus ret = new HAServiceStatus(haState);
- if (isRMActive() || haState == HAServiceProtocol.HAServiceState.STANDBY) {
- ret.setReadyToBecomeActive();
- } else {
- ret.setNotReadyToBecomeActive("State is " + haState);
- }
- return ret;
+ ret.setNotReadyToBecomeActive("State is " + haState);
}
+ return ret;
}
@Override
@@ -926,19 +896,4 @@ public class AdminService extends CompositeService implements
rmContext.getScheduler().setClusterMaxPriority(conf);
}
-
- public String getHAZookeeperConnectionState() {
- if (!rmContext.isHAEnabled()) {
- return "ResourceManager HA is not enabled.";
- } else if (!autoFailoverEnabled) {
- return "Auto Failover is not enabled.";
- }
- if (curatorEnabled) {
- return "Connected to zookeeper : " + rmContext
- .getLeaderElectorService().getCuratorClient().getZookeeperClient()
- .isConnected();
- } else {
- return this.embeddedElector.getHAZookeeperConnectionState();
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
new file mode 100644
index 0000000..bcdf48b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/CuratorBasedElectorService.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+
+/**
+ * Leader election implementation that uses Curator.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class CuratorBasedElectorService extends AbstractService
+ implements EmbeddedElector, LeaderLatchListener {
+ public static final Log LOG =
+ LogFactory.getLog(CuratorBasedElectorService.class);
+ private LeaderLatch leaderLatch;
+ private CuratorFramework curator;
+ private RMContext rmContext;
+ private String latchPath;
+ private String rmId;
+ private ResourceManager rm;
+
+ public CuratorBasedElectorService(RMContext rmContext, ResourceManager rm) {
+ super(CuratorBasedElectorService.class.getName());
+ this.rmContext = rmContext;
+ this.rm = rm;
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ rmId = HAUtil.getRMHAId(conf);
+ String clusterId = YarnConfiguration.getClusterId(conf);
+ String zkBasePath = conf.get(
+ YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
+ YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
+ latchPath = zkBasePath + "/" + clusterId;
+ curator = rm.getCurator();
+ initAndStartLeaderLatch();
+ super.serviceInit(conf);
+ }
+
+ private void initAndStartLeaderLatch() throws Exception {
+ leaderLatch = new LeaderLatch(curator, latchPath, rmId);
+ leaderLatch.addListener(this);
+ leaderLatch.start();
+ }
+
+ @Override
+ protected void serviceStop() throws Exception {
+ closeLeaderLatch();
+ super.serviceStop();
+ }
+
+ @Override
+ public void rejoinElection() {
+ try {
+ closeLeaderLatch();
+ Thread.sleep(1000);
+ initAndStartLeaderLatch();
+ } catch (Exception e) {
+ LOG.info("Fail to re-join election.", e);
+ }
+ }
+
+ @Override
+ public String getZookeeperConnectionState() {
+ return "Connected to zookeeper : " +
+ curator.getZookeeperClient().isConnected();
+ }
+
+ @Override
+ public void isLeader() {
+ LOG.info(rmId + "is elected leader, transitioning to active");
+ try {
+ rmContext.getRMAdminService().transitionToActive(
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+ } catch (Exception e) {
+ LOG.info(rmId + " failed to transition to active, giving up leadership",
+ e);
+ notLeader();
+ rejoinElection();
+ }
+ }
+
+ private void closeLeaderLatch() throws IOException {
+ if (leaderLatch != null) {
+ leaderLatch.close();
+ }
+ }
+
+ @Override
+ public void notLeader() {
+ LOG.info(rmId + " relinquish leadership");
+ try {
+ rmContext.getRMAdminService().transitionToStandby(
+ new HAServiceProtocol.StateChangeRequestInfo(
+ HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
+ } catch (Exception e) {
+ LOG.info(rmId + " did not transition to standby successfully.");
+ }
+ }
+
+ // only for testing
+ @VisibleForTesting
+ public CuratorFramework getCuratorClient() {
+ return this.curator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
new file mode 100644
index 0000000..677ec85
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElector.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+/**
+ * Interface that all embedded leader electors must implement.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface EmbeddedElector extends Service{
+ /**
+ * Leave and rejoin leader election.
+ */
+ void rejoinElection();
+
+ /**
+ * Get information about the elector's connection to Zookeeper.
+ *
+ * @return zookeeper connection state
+ */
+ String getZookeeperConnectionState();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
deleted file mode 100644
index 88d2e10..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java
+++ /dev/null
@@ -1,260 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.ha.ActiveStandbyElector;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.ha.ServiceFailedException;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.ZKUtil;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.ACL;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class EmbeddedElectorService extends AbstractService
- implements ActiveStandbyElector.ActiveStandbyElectorCallback {
- private static final Log LOG =
- LogFactory.getLog(EmbeddedElectorService.class.getName());
- private static final HAServiceProtocol.StateChangeRequestInfo req =
- new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC);
-
- private RMContext rmContext;
-
- private byte[] localActiveNodeInfo;
- private ActiveStandbyElector elector;
- private long zkSessionTimeout;
- private Timer zkDisconnectTimer;
- @VisibleForTesting
- final Object zkDisconnectLock = new Object();
-
- EmbeddedElectorService(RMContext rmContext) {
- super(EmbeddedElectorService.class.getName());
- this.rmContext = rmContext;
- }
-
- @Override
- protected void serviceInit(Configuration conf)
- throws Exception {
- conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf);
-
- String zkQuorum = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
- if (zkQuorum == null) {
- throw new YarnRuntimeException("Embedded automatic failover " +
- "is enabled, but " + YarnConfiguration.RM_ZK_ADDRESS +
- " is not set");
- }
-
- String rmId = HAUtil.getRMHAId(conf);
- String clusterId = YarnConfiguration.getClusterId(conf);
- localActiveNodeInfo = createActiveNodeInfo(clusterId, rmId);
-
- String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
- YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
- String electionZNode = zkBasePath + "/" + clusterId;
-
- zkSessionTimeout = conf.getLong(YarnConfiguration.RM_ZK_TIMEOUT_MS,
- YarnConfiguration.DEFAULT_RM_ZK_TIMEOUT_MS);
-
- List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
- List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
-
- int maxRetryNum =
- conf.getInt(YarnConfiguration.RM_HA_FC_ELECTOR_ZK_RETRIES_KEY, conf
- .getInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
- CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT));
- elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
- electionZNode, zkAcls, zkAuths, this, maxRetryNum, false);
-
- elector.ensureParentZNode();
- if (!isParentZnodeSafe(clusterId)) {
- notifyFatalError(electionZNode + " znode has invalid data! "+
- "Might need formatting!");
- }
-
- super.serviceInit(conf);
- }
-
- @Override
- protected void serviceStart() throws Exception {
- elector.joinElection(localActiveNodeInfo);
- super.serviceStart();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- /**
- * When error occurs in serviceInit(), serviceStop() can be called.
- * We need null check for the case.
- */
- if (elector != null) {
- elector.quitElection(false);
- elector.terminateConnection();
- }
- super.serviceStop();
- }
-
- @Override
- public void becomeActive() throws ServiceFailedException {
- cancelDisconnectTimer();
-
- try {
- rmContext.getRMAdminService().transitionToActive(req);
- } catch (Exception e) {
- throw new ServiceFailedException("RM could not transition to Active", e);
- }
- }
-
- @Override
- public void becomeStandby() {
- cancelDisconnectTimer();
-
- try {
- rmContext.getRMAdminService().transitionToStandby(req);
- } catch (Exception e) {
- LOG.error("RM could not transition to Standby", e);
- }
- }
-
- /**
- * Stop the disconnect timer. Any running tasks will be allowed to complete.
- */
- private void cancelDisconnectTimer() {
- synchronized (zkDisconnectLock) {
- if (zkDisconnectTimer != null) {
- zkDisconnectTimer.cancel();
- zkDisconnectTimer = null;
- }
- }
- }
-
- /**
- * When the ZK client loses contact with ZK, this method will be called to
- * allow the RM to react. Because the loss of connection can be noticed
- * before the session timeout happens, it is undesirable to transition
- * immediately. Instead the method starts a timer that will wait
- * {@link YarnConfiguration#RM_ZK_TIMEOUT_MS} milliseconds before
- * initiating the transition into standby state.
- */
- @Override
- public void enterNeutralMode() {
- LOG.warn("Lost contact with Zookeeper. Transitioning to standby in "
- + zkSessionTimeout + " ms if connection is not reestablished.");
-
- // If we've just become disconnected, start a timer. When the time's up,
- // we'll transition to standby.
- synchronized (zkDisconnectLock) {
- if (zkDisconnectTimer == null) {
- zkDisconnectTimer = new Timer("Zookeeper disconnect timer");
- zkDisconnectTimer.schedule(new TimerTask() {
- @Override
- public void run() {
- synchronized (zkDisconnectLock) {
- // Only run if the timer hasn't been cancelled
- if (zkDisconnectTimer != null) {
- becomeStandby();
- }
- }
- }
- }, zkSessionTimeout);
- }
- }
- }
-
- @SuppressWarnings(value = "unchecked")
- @Override
- public void notifyFatalError(String errorMessage) {
- rmContext.getDispatcher().getEventHandler().handle(
- new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage));
- }
-
- @Override
- public void fenceOldActive(byte[] oldActiveData) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Request to fence old active being ignored, " +
- "as embedded leader election doesn't support fencing");
- }
- }
-
- private static byte[] createActiveNodeInfo(String clusterId, String rmId)
- throws IOException {
- return YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
- .newBuilder()
- .setClusterId(clusterId)
- .setRmId(rmId)
- .build()
- .toByteArray();
- }
-
- private boolean isParentZnodeSafe(String clusterId)
- throws InterruptedException, IOException, KeeperException {
- byte[] data;
- try {
- data = elector.getActiveData();
- } catch (ActiveStandbyElector.ActiveNotFoundException e) {
- // no active found, parent znode is safe
- return true;
- }
-
- YarnServerResourceManagerServiceProtos.ActiveRMInfoProto proto;
- try {
- proto = YarnServerResourceManagerServiceProtos.ActiveRMInfoProto
- .parseFrom(data);
- } catch (InvalidProtocolBufferException e) {
- LOG.error("Invalid data in ZK: " + StringUtils.byteToHexString(data));
- return false;
- }
-
- // Check if the passed proto corresponds to an RM in the same cluster
- if (!proto.getClusterId().equals(clusterId)) {
- LOG.error("Mismatched cluster! The other RM seems " +
- "to be from a different cluster. Current cluster = " + clusterId +
- "Other RM's cluster = " + proto.getClusterId());
- return false;
- }
- return true;
- }
-
- public void resetLeaderElection() {
- elector.quitElection(false);
- elector.joinElection(localActiveNodeInfo);
- }
-
- public String getHAZookeeperConnectionState() {
- return elector.getHAZookeeperConnectionState();
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
deleted file mode 100644
index 8c1a6eb..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/LeaderElectorService.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.recipes.leader.LeaderLatch;
-import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ha.HAServiceProtocol;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-import java.io.IOException;
-
-
-public class LeaderElectorService extends AbstractService implements
- LeaderLatchListener {
- public static final Log LOG = LogFactory.getLog(LeaderElectorService.class);
- private LeaderLatch leaderLatch;
- private CuratorFramework curator;
- private RMContext rmContext;
- private String latchPath;
- private String rmId;
- private ResourceManager rm;
-
- public LeaderElectorService(RMContext rmContext, ResourceManager rm) {
- super(LeaderElectorService.class.getName());
- this.rmContext = rmContext;
- this.rm = rm;
- }
-
- @Override
- protected void serviceInit(Configuration conf) throws Exception {
- rmId = HAUtil.getRMHAId(conf);
- String clusterId = YarnConfiguration.getClusterId(conf);
- String zkBasePath = conf.get(
- YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH,
- YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH);
- latchPath = zkBasePath + "/" + clusterId;
- curator = rm.getCurator();
- initAndStartLeaderLatch();
- super.serviceInit(conf);
- }
-
- private void initAndStartLeaderLatch() throws Exception {
- leaderLatch = new LeaderLatch(curator, latchPath, rmId);
- leaderLatch.addListener(this);
- leaderLatch.start();
- }
-
- @Override
- protected void serviceStop() throws Exception {
- closeLeaderLatch();
- super.serviceStop();
- }
-
- public boolean hasLeaderShip() {
- return leaderLatch.hasLeadership();
- }
-
-
- @Override
- public void isLeader() {
- LOG.info(rmId + "is elected leader, transitioning to active");
- try {
- rmContext.getRMAdminService().transitionToActive(
- new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
- } catch (Exception e) {
- LOG.info(rmId + " failed to transition to active, giving up leadership",
- e);
- notLeader();
- reJoinElection();
- }
- }
-
- public void reJoinElection() {
- try {
- closeLeaderLatch();
- Thread.sleep(1000);
- initAndStartLeaderLatch();
- } catch (Exception e) {
- LOG.info("Fail to re-join election.", e);
- }
- }
-
- private void closeLeaderLatch() throws IOException {
- if (leaderLatch != null) {
- leaderLatch.close();
- }
- }
- @Override
- public void notLeader() {
- LOG.info(rmId + " relinquish leadership");
- try {
- rmContext.getRMAdminService().transitionToStandby(
- new HAServiceProtocol.StateChangeRequestInfo(
- HAServiceProtocol.RequestSource.REQUEST_BY_ZKFC));
- } catch (Exception e) {
- LOG.info(rmId + " did not transition to standby successfully.");
- }
- }
-
- // only for testing
- @VisibleForTesting
- public CuratorFramework getCuratorClient() {
- return this.curator;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index c9d185f..26ef5ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -145,13 +145,15 @@ public interface RMContext {
void setQueuePlacementManager(PlacementManager placementMgr);
- void setLeaderElectorService(LeaderElectorService elector);
+ void setLeaderElectorService(EmbeddedElector elector);
- LeaderElectorService getLeaderElectorService();
+ EmbeddedElector getLeaderElectorService();
QueueLimitCalculator getNodeManagerQueueLimitCalculator();
void setRMAppLifetimeMonitor(RMAppLifetimeMonitor rmAppLifetimeMonitor);
RMAppLifetimeMonitor getRMAppLifetimeMonitor();
+
+ String getHAZookeeperConnectionState();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 3f17ac6..a452f95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -76,7 +76,7 @@ public class RMContextImpl implements RMContext {
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
- private LeaderElectorService elector;
+ private EmbeddedElector elector;
private QueueLimitCalculator queueLimitCalculator;
@@ -143,12 +143,12 @@ public class RMContextImpl implements RMContext {
}
@Override
- public void setLeaderElectorService(LeaderElectorService elector) {
+ public void setLeaderElectorService(EmbeddedElector elector) {
this.elector = elector;
}
@Override
- public LeaderElectorService getLeaderElectorService() {
+ public EmbeddedElector getLeaderElectorService() {
return this.elector;
}
@@ -513,4 +513,13 @@ public class RMContextImpl implements RMContext {
public RMAppLifetimeMonitor getRMAppLifetimeMonitor() {
return this.activeServiceContext.getRMAppLifetimeMonitor();
}
+
+ public String getHAZookeeperConnectionState() {
+ if (elector == null) {
+ return "Could not find leader elector. Verify both HA and automatic " +
+ "failover are enabled.";
+ } else {
+ return elector.getZookeeperConnectionState();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 8ddbc20..110f2c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -272,16 +272,17 @@ public class ResourceManager extends CompositeService implements Recoverable {
this.rmContext.setHAEnabled(HAUtil.isHAEnabled(this.conf));
if (this.rmContext.isHAEnabled()) {
HAUtil.verifyAndSetConfiguration(this.conf);
- curatorEnabled = conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
- YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
- if (curatorEnabled) {
- this.curator = createAndStartCurator(conf);
- LeaderElectorService elector = new LeaderElectorService(rmContext, this);
- addService(elector);
+
+ // If the RM is configured to use an embedded leader elector,
+ // initialize the leader elector.
+ if (HAUtil.isAutomaticFailoverEnabled(conf) &&
+ HAUtil.isAutomaticFailoverEmbedded(conf)) {
+ EmbeddedElector elector = createEmbeddedElector();
+ addIfService(elector);
rmContext.setLeaderElectorService(elector);
}
}
-
+
// Set UGI and do login
// If security is enabled, use login user
// If security is not enabled, use current user
@@ -331,6 +332,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
super.serviceInit(this.conf);
}
+ protected EmbeddedElector createEmbeddedElector() throws IOException {
+ EmbeddedElector elector;
+ curatorEnabled =
+ conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
+ YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
+ if (curatorEnabled) {
+ this.curator = createAndStartCurator(conf);
+ elector = new CuratorBasedElectorService(rmContext, this);
+ } else {
+ elector = new ActiveStandbyElectorBasedElectorService(rmContext);
+ }
+ return elector;
+ }
+
public CuratorFramework createAndStartCurator(Configuration conf)
throws IOException {
String zkHostPort = conf.get(YarnConfiguration.RM_ZK_ADDRESS);
@@ -802,14 +817,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
// Transition to standby and reinit active services
LOG.info("Transitioning RM to Standby mode");
transitionToStandby(true);
- if (curatorEnabled) {
- rmContext.getLeaderElectorService().reJoinElection();
- } else {
- adminService.resetLeaderElection();
+ EmbeddedElector elector = rmContext.getLeaderElectorService();
+ if (elector != null) {
+ elector.rejoinElection();
}
- return;
} catch (Exception e) {
- LOG.fatal("Failed to transition RM to Standby mode.");
+ LOG.fatal("Failed to transition RM to Standby mode.", e);
ExitUtil.terminate(1, e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
index 2d7139f..3367cf4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebApp.java
@@ -121,8 +121,7 @@ public class RMWebApp extends WebApp implements YarnWebParams {
}
public String getHAZookeeperConnectionState() {
- return rm.getRMContext().getRMAdminService()
- .getHAZookeeperConnectionState();
+ return getRMContext().getHAZookeeperConnectionState();
}
public RMContext getRMContext() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
index 512a5c4..d815315 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterInfo.java
@@ -64,7 +64,7 @@ public class ClusterInfo {
this.hadoopBuildVersion = VersionInfo.getBuildVersion();
this.hadoopVersionBuiltOn = VersionInfo.getDate();
this.haZooKeeperConnectionState =
- rm.getRMContext().getRMAdminService().getHAZookeeperConnectionState();
+ rm.getRMContext().getHAZookeeperConnectionState();
}
public String getState() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index ea573e2..a66b093 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -109,6 +109,7 @@ import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.Assert;
+
@SuppressWarnings("unchecked")
public class MockRM extends ResourceManager {
@@ -123,6 +124,8 @@ public class MockRM extends ResourceManager {
private final boolean useNullRMNodeLabelsManager;
private boolean disableDrainEventsImplicitly;
+ private boolean useRealElector = false;
+
public MockRM() {
this(new YarnConfiguration());
}
@@ -132,13 +135,23 @@ public class MockRM extends ResourceManager {
}
public MockRM(Configuration conf, RMStateStore store) {
- this(conf, store, true);
+ this(conf, store, true, false);
}
-
+
+ public MockRM(Configuration conf, boolean useRealElector) {
+ this(conf, null, true, useRealElector);
+ }
+
+ public MockRM(Configuration conf, RMStateStore store,
+ boolean useRealElector) {
+ this(conf, store, true, useRealElector);
+ }
+
public MockRM(Configuration conf, RMStateStore store,
- boolean useNullRMNodeLabelsManager) {
+ boolean useNullRMNodeLabelsManager, boolean useRealElector) {
super();
this.useNullRMNodeLabelsManager = useNullRMNodeLabelsManager;
+ this.useRealElector = useRealElector;
init(conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf));
if (store != null) {
setRMStateStore(store);
@@ -193,6 +206,15 @@ public class MockRM extends ResourceManager {
}
@Override
+ protected EmbeddedElector createEmbeddedElector() throws IOException {
+ if (useRealElector) {
+ return super.createEmbeddedElector();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new EventHandler<SchedulerEvent>() {
@Override
@@ -984,11 +1006,6 @@ public class MockRM extends ResourceManager {
protected void stopServer() {
// don't do anything
}
-
- @Override
- protected EmbeddedElectorService createEmbeddedElectorService() {
- return null;
- }
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
index 6092f41..c9ce7d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/RMHATestBase.java
@@ -108,13 +108,13 @@ public abstract class RMHATestBase extends ClientBaseWithFixes{
}
protected void startRMs() throws IOException {
- rm1 = new MockRM(confForRM1, null, false){
+ rm1 = new MockRM(confForRM1, null, false, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
}
};
- rm2 = new MockRM(confForRM2, null, false){
+ rm2 = new MockRM(confForRM2, null, false, false){
@Override
protected Dispatcher createDispatcher() {
return new DrainDispatcher();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
index bb10041..121cacb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestLeaderElectorService.java
@@ -63,7 +63,6 @@ public class TestLeaderElectorService {
conf = new Configuration();
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
conf.setBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR, true);
- conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true);
conf.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID);
@@ -121,7 +120,7 @@ public class TestLeaderElectorService {
}
};
memStore.init(conf);
- rm1 = new MockRM(conf, memStore);
+ rm1 = new MockRM(conf, memStore, true);
rm1.init(conf);
rm1.start();
@@ -167,7 +166,8 @@ public class TestLeaderElectorService {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
- LeaderElectorService service = rm1.getRMContext().getLeaderElectorService();
+ CuratorBasedElectorService service = (CuratorBasedElectorService)
+ rm1.getRMContext().getLeaderElectorService();
CuratorZookeeperClient client =
service.getCuratorClient().getZookeeperClient();
// this will expire current curator client session. curator will re-establish
@@ -187,7 +187,7 @@ public class TestLeaderElectorService {
Thread launchRM = new Thread() {
@Override
public void run() {
- rm1 = new MockRM(conf) {
+ rm1 = new MockRM(conf, true) {
@Override
synchronized void transitionToActive() throws Exception {
if (throwException.get()) {
@@ -217,9 +217,12 @@ public class TestLeaderElectorService {
rm1 = startRM("rm1", HAServiceState.ACTIVE);
rm2 = startRM("rm2", HAServiceState.STANDBY);
+ CuratorBasedElectorService service = (CuratorBasedElectorService)
+ rm1.getRMContext().getLeaderElectorService();
+
ZooKeeper zkClient =
- rm1.getRMContext().getLeaderElectorService().getCuratorClient()
- .getZookeeperClient().getZooKeeper();
+ service.getCuratorClient().getZookeeperClient().getZooKeeper();
+
InstanceSpec connectionInstance = zkCluster.findConnectionInstance(zkClient);
zkCluster.killServer(connectionInstance);
@@ -245,7 +248,7 @@ public class TestLeaderElectorService {
private MockRM startRM(String rmId, HAServiceState state) throws Exception{
YarnConfiguration yarnConf = new YarnConfiguration(conf);
yarnConf.set(YarnConfiguration.RM_HA_ID, rmId);
- MockRM rm = new MockRM(yarnConf);
+ MockRM rm = new MockRM(yarnConf, true);
rm.init(yarnConf);
rm.start();
waitFor(rm, state);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
index bfd0b4e..1fe9bbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java
@@ -127,7 +127,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
myConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 50);
when(rc.getRMAdminService()).thenReturn(as);
- EmbeddedElectorService ees = new EmbeddedElectorService(rc);
+ ActiveStandbyElectorBasedElectorService
+ ees = new ActiveStandbyElectorBasedElectorService(rc);
ees.init(myConf);
ees.enterNeutralMode();
@@ -164,7 +165,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationActive(AdminService as,
- EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ActiveStandbyElectorBasedElectorService ees)
+ throws IOException, InterruptedException {
ees.becomeActive();
Thread.sleep(100);
@@ -183,7 +185,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationStandby(AdminService as,
- EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ActiveStandbyElectorBasedElectorService ees)
+ throws IOException, InterruptedException {
ees.becomeStandby();
Thread.sleep(100);
@@ -201,7 +204,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationNeutral(AdminService as,
- EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ActiveStandbyElectorBasedElectorService ees)
+ throws IOException, InterruptedException {
ees.enterNeutralMode();
Thread.sleep(100);
@@ -220,7 +224,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationTimingActive(AdminService as,
- EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ActiveStandbyElectorBasedElectorService ees)
+ throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread
@@ -250,7 +255,8 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
* @throws InterruptedException if interrupted
*/
private void testCallbackSynchronizationTimingStandby(AdminService as,
- EmbeddedElectorService ees) throws IOException, InterruptedException {
+ ActiveStandbyElectorBasedElectorService ees)
+ throws IOException, InterruptedException {
synchronized (ees.zkDisconnectLock) {
// Sleep while holding the lock so that the timer thread can't do
// anything when it runs. Sleep until we're pretty sure the timer thread
@@ -283,25 +289,20 @@ public class TestRMEmbeddedElector extends ClientBaseWithFixes {
}
@Override
- protected AdminService createAdminService() {
- return new AdminService(MockRMWithElector.this, getRMContext()) {
+ protected EmbeddedElector createEmbeddedElector() {
+ return new ActiveStandbyElectorBasedElectorService(getRMContext()) {
@Override
- protected EmbeddedElectorService createEmbeddedElectorService() {
- return new EmbeddedElectorService(getRMContext()) {
- @Override
- public void becomeActive() throws
- ServiceFailedException {
- try {
- callbackCalled.set(true);
- TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
- Thread.sleep(delayMs);
- TestRMEmbeddedElector.LOG.info("Sleep done");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- super.becomeActive();
- }
- };
+ public void becomeActive() throws
+ ServiceFailedException {
+ try {
+ callbackCalled.set(true);
+ TestRMEmbeddedElector.LOG.info("Callback called. Sleeping now");
+ Thread.sleep(delayMs);
+ TestRMEmbeddedElector.LOG.info("Sleep done");
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ super.becomeActive();
}
};
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6410a54/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
index 000f4a4..5114329 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java
@@ -161,8 +161,8 @@ public class TestRMHA {
ClientResponse response =
webResource.path("ws").path("v1").path("cluster").path("apps")
- .path(path).accept(MediaType.APPLICATION_JSON)
- .get(ClientResponse.class);
+ .path(path).accept(MediaType.APPLICATION_JSON)
+ .get(ClientResponse.class);
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
response.getType().toString());
JSONObject json = response.getEntity(JSONObject.class);
@@ -178,13 +178,13 @@ public class TestRMHA {
* 1. Standby: Should be a no-op
* 2. Active: Active services should start
* 3. Active: Should be a no-op.
- * While active, submit a couple of jobs
+ * While active, submit a couple of jobs
* 4. Standby: Active services should stop
* 5. Active: Active services should start
* 6. Stop the RM: All services should stop and RM should not be ready to
* become Active
*/
- @Test (timeout = 30000)
+ @Test(timeout = 30000)
public void testFailoverAndTransitions() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);
@@ -204,37 +204,37 @@ public class TestRMHA {
checkMonitorHealth();
checkStandbyRMFunctionality();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
-
+
// 1. Transition to Standby - must be a no-op
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
-
+
// 2. Transition to active
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
-
+
// 3. Transition to active - no-op
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 2, 2, 2, 2048, 2);
-
+
// 4. Transition to standby
rm.adminService.transitionToStandby(requestInfo);
checkMonitorHealth();
checkStandbyRMFunctionality();
verifyClusterMetrics(0, 0, 0, 0, 0, 0);
-
+
// 5. Transition to active to check Active->Standby->Active works
rm.adminService.transitionToActive(requestInfo);
checkMonitorHealth();
checkActiveRMFunctionality();
verifyClusterMetrics(1, 1, 1, 1, 2048, 1);
-
+
// 6. Stop the RM. All services should stop and RM should not be ready to
// become active
rm.stop();
@@ -340,7 +340,7 @@ public class TestRMHA {
rm.adminService.transitionToStandby(requestInfo);
rm.adminService.transitionToActive(requestInfo);
rm.adminService.transitionToStandby(requestInfo);
-
+
MyCountingDispatcher dispatcher =
(MyCountingDispatcher) rm.getRMContext().getDispatcher();
assertTrue(!dispatcher.isStopped());
@@ -348,24 +348,24 @@ public class TestRMHA {
rm.adminService.transitionToActive(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
- .getEventHandlerCount());
+ .getEventHandlerCount());
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
-
+
// Keep the dispatcher reference before transitioning to standby
dispatcher = (MyCountingDispatcher) rm.getRMContext().getDispatcher();
-
-
+
+
rm.adminService.transitionToStandby(requestInfo);
assertEquals(errorMessageForEventHandler, expectedEventHandlerCount,
((MyCountingDispatcher) rm.getRMContext().getDispatcher())
- .getEventHandlerCount());
+ .getEventHandlerCount());
assertEquals(errorMessageForService, expectedServiceCount,
rm.getServices().size());
assertTrue(dispatcher.isStopped());
-
+
rm.stop();
}
@@ -386,7 +386,8 @@ public class TestRMHA {
assertEquals(conf.get(YarnConfiguration.RM_HA_ID), RM1_NODE_ID);
//test if RM_HA_ID can not be found
- configuration.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID+ "," + RM3_NODE_ID);
+ configuration
+ .set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM3_NODE_ID);
configuration.unset(YarnConfiguration.RM_HA_ID);
conf = new YarnConfiguration(configuration);
try {
@@ -458,7 +459,7 @@ public class TestRMHA {
checkActiveRMFunctionality();
}
- @Test(timeout = 90000)
+ @Test
public void testTransitionedToStandbyShouldNotHang() throws Exception {
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
Configuration conf = new YarnConfiguration(configuration);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org