You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/05/03 03:56:34 UTC
svn commit: r1333288 - in
/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/ha/
src/main/java/org/apache/hadoop/ha/protocolPB/
src/main/packages/templates/conf/...
Author: todd
Date: Thu May 3 01:56:33 2012
New Revision: 1333288
URL: http://svn.apache.org/viewvc?rev=1333288&view=rev
Log:
HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto
Modified:
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt Thu May 3 01:56:33 2012
@@ -21,3 +21,5 @@ HADOOP-8246. Auto-HA: automatically scop
HADOOP-8247. Add a config to enable auto-HA, which disables manual FailoverController (todd)
HADOOP-8306. ZKFC: improve error message when ZK is not running. (todd)
+
+HADOOP-8279. Allow manual failover to be invoked when auto-failover is enabled. (todd)
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java Thu May 3 01:56:33 2012
@@ -116,6 +116,8 @@ public class CommonConfigurationKeys ext
"security.refresh.user.mappings.protocol.acl";
public static final String
SECURITY_HA_SERVICE_PROTOCOL_ACL = "security.ha.service.protocol.acl";
+ public static final String
+ SECURITY_ZKFC_PROTOCOL_ACL = "security.zkfc.protocol.acl";
public static final String HADOOP_SECURITY_TOKEN_SERVICE_USE_IP =
"hadoop.security.token.service.use_ip";
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Thu May 3 01:56:33 2012
@@ -378,7 +378,7 @@ public class ActiveStandbyElector implem
createConnection();
}
Stat stat = new Stat();
- return zkClient.getData(zkLockFilePath, false, stat);
+ return getDataWithRetries(zkLockFilePath, false, stat);
} catch(KeeperException e) {
Code code = e.code();
if (isNodeDoesNotExist(code)) {
@@ -889,6 +889,15 @@ public class ActiveStandbyElector implem
});
}
+ private byte[] getDataWithRetries(final String path, final boolean watch,
+ final Stat stat) throws InterruptedException, KeeperException {
+ return zkDoWithRetries(new ZKAction<byte[]>() {
+ public byte[] run() throws KeeperException, InterruptedException {
+ return zkClient.getData(path, watch, stat);
+ }
+ });
+ }
+
private Stat setDataWithRetries(final String path, final byte[] data,
final int version) throws InterruptedException, KeeperException {
return zkDoWithRetries(new ZKAction<Stat>() {
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAAdmin.java Thu May 3 01:56:33 2012
@@ -38,6 +38,7 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
/**
@@ -201,9 +202,26 @@ public abstract class HAAdmin extends Co
HAServiceTarget fromNode = resolveTarget(args[0]);
HAServiceTarget toNode = resolveTarget(args[1]);
- if (!checkManualStateManagementOK(fromNode) ||
- !checkManualStateManagementOK(toNode)) {
- return -1;
+ // Check that auto-failover is consistently configured for both nodes.
+ Preconditions.checkState(
+ fromNode.isAutoFailoverEnabled() ==
+ toNode.isAutoFailoverEnabled(),
+ "Inconsistent auto-failover configs between %s and %s!",
+ fromNode, toNode);
+
+ if (fromNode.isAutoFailoverEnabled()) {
+ if (forceFence || forceActive) {
+ // -forceActive doesn't make sense with auto-HA, since, if the node
+ // is not healthy, then its ZKFC will immediately quit the election
+ // again the next time a health check runs.
+ //
+ // -forceFence doesn't seem to have any real use cases with auto-HA
+ // so it isn't implemented.
+ errOut.println(FORCEFENCE + " and " + FORCEACTIVE + " flags not " +
+ "supported with auto-failover enabled.");
+ return -1;
+ }
+ return gracefulFailoverThroughZKFCs(toNode);
}
FailoverController fc = new FailoverController(getConf(),
@@ -218,6 +236,31 @@ public abstract class HAAdmin extends Co
}
return 0;
}
+
+
+ /**
+ * Initiate a graceful failover by talking to the target node's ZKFC.
+ * This sends an RPC to the ZKFC, which coordinates the failover.
+ *
+ * @param toNode the node to fail to
+ * @return status code (0 for success)
+ * @throws IOException if failover does not succeed
+ */
+ private int gracefulFailoverThroughZKFCs(HAServiceTarget toNode)
+ throws IOException {
+
+ int timeout = FailoverController.getRpcTimeoutToNewActive(getConf());
+ ZKFCProtocol proxy = toNode.getZKFCProxy(getConf(), timeout);
+ try {
+ proxy.gracefulFailover();
+ out.println("Failover to " + toNode + " successful");
+ } catch (ServiceFailedException sfe) {
+ errOut.println("Failover failed: " + sfe.getLocalizedMessage());
+ return -1;
+ }
+
+ return 0;
+ }
private int checkHealth(final CommandLine cmd)
throws IOException, ServiceFailedException {
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HAServiceTarget.java Thu May 3 01:56:33 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolClientSideTranslatorPB;
import org.apache.hadoop.net.NetUtils;
import com.google.common.collect.Maps;
@@ -49,6 +50,11 @@ public abstract class HAServiceTarget {
public abstract InetSocketAddress getAddress();
/**
+ * @return the IPC address of the ZKFC on the target node
+ */
+ public abstract InetSocketAddress getZKFCAddress();
+
+ /**
* @return a Fencer implementation configured for this target node
*/
public abstract NodeFencer getFencer();
@@ -76,6 +82,20 @@ public abstract class HAServiceTarget {
confCopy, factory, timeoutMs);
}
+ /**
+ * @return a proxy to the ZKFC which is associated with this HA service.
+ */
+ public ZKFCProtocol getZKFCProxy(Configuration conf, int timeoutMs)
+ throws IOException {
+ Configuration confCopy = new Configuration(conf);
+ // Lower the timeout so we quickly fail to connect
+ confCopy.setInt(CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
+ SocketFactory factory = NetUtils.getDefaultSocketFactory(confCopy);
+ return new ZKFCProtocolClientSideTranslatorPB(
+ getZKFCAddress(),
+ confCopy, factory, timeoutMs);
+ }
+
public final Map<String, String> getFencingParameters() {
Map<String, String> ret = Maps.newHashMap();
addFencingParameters(ret);
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java?rev=1333288&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCProtocol.java Thu May 3 01:56:33 2012
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.retry.Idempotent;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.KerberosInfo;
+
+import java.io.IOException;
+
+/**
+ * Protocol exposed by the ZKFailoverController, allowing for graceful
+ * failover.
+ */
+@KerberosInfo(
+ serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface ZKFCProtocol {
+ /**
+ * Initial version of the protocol
+ */
+ public static final long versionID = 1L;
+
+ /**
+ * Request that this service yield from the active node election for the
+ * specified time period.
+ *
+ * If the node is not currently active, it simply prevents any attempts
+ * to become active for the specified time period. Otherwise, it first
+ * tries to transition the local service to standby state, and then quits
+ * the election.
+ *
+ * If the attempt to transition to standby succeeds, then the ZKFC receiving
+ * this RPC will delete its own breadcrumb node in ZooKeeper. Thus, the
+ * next node to become active will not run any fencing process. Otherwise,
+ * the breadcrumb will be left, such that the next active will fence this
+ * node.
+ *
+ * After the specified time period elapses, the node will attempt to re-join
+ * the election, provided that its service is healthy.
+ *
+ * If the node has previously been instructed to cede active, and is still
+ * within the specified time period, the later command's time period will
+ * take precedence, resetting the timer.
+ *
+ * A call to cedeActive which specifies a 0 or negative time period will
+ * allow the target node to immediately rejoin the election, so long as
+ * it is healthy.
+ *
+ * @param millisToCede period for which the node should not attempt to
+ * become active
+ * @throws IOException if the operation fails
+ * @throws AccessControlException if the operation is disallowed
+ */
+ @Idempotent
+ public void cedeActive(int millisToCede)
+ throws IOException, AccessControlException;
+
+ /**
+ * Request that this node try to become active through a graceful failover.
+ *
+ * If the node is already active, this is a no-op and simply returns success
+ * without taking any further action.
+ *
+ * If the node is not healthy, it will throw an exception indicating that it
+ * is not able to become active.
+ *
+ * If the node is healthy and not active, it will try to initiate a graceful
+ * failover to become active, returning only when it has successfully become
+ * active. See {@link ZKFailoverController#gracefulFailoverToYou()} for the
+ * implementation details.
+ *
+ * If the node fails to successfully coordinate the failover, throws an
+ * exception indicating the reason for failure.
+ *
+ * @throws IOException if graceful failover fails
+ * @throws AccessControlException if the operation is disallowed
+ */
+ @Idempotent
+ public void gracefulFailover()
+ throws IOException, AccessControlException;
+}
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java?rev=1333288&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFCRpcServer.java Thu May 3 01:56:33 2012
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolPB;
+import org.apache.hadoop.ha.protocolPB.ZKFCProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RPC.Server;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+
+import com.google.protobuf.BlockingService;
+
+@InterfaceAudience.LimitedPrivate("HDFS")
+@InterfaceStability.Evolving
+public class ZKFCRpcServer implements ZKFCProtocol {
+
+ private static final int HANDLER_COUNT = 3;
+ private final ZKFailoverController zkfc;
+ private Server server;
+
+ ZKFCRpcServer(Configuration conf,
+ InetSocketAddress bindAddr,
+ ZKFailoverController zkfc,
+ PolicyProvider policy) throws IOException {
+ this.zkfc = zkfc;
+
+ RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
+ ProtobufRpcEngine.class);
+ ZKFCProtocolServerSideTranslatorPB translator =
+ new ZKFCProtocolServerSideTranslatorPB(this);
+ BlockingService service = ZKFCProtocolService
+ .newReflectiveBlockingService(translator);
+ this.server = RPC.getServer(
+ ZKFCProtocolPB.class,
+ service, bindAddr.getHostName(),
+ bindAddr.getPort(), HANDLER_COUNT, false, conf,
+ null /*secretManager*/);
+
+ // set service-level authorization security policy
+ if (conf.getBoolean(
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, false)) {
+ server.refreshServiceAcl(conf, policy);
+ }
+
+ }
+
+ void start() {
+ this.server.start();
+ }
+
+ public InetSocketAddress getAddress() {
+ return server.getListenerAddress();
+ }
+
+ void stopAndJoin() throws InterruptedException {
+ this.server.stop();
+ this.server.join();
+ }
+
+ @Override
+ public void cedeActive(int millisToCede) throws IOException,
+ AccessControlException {
+ zkfc.checkRpcAdminAccess();
+ zkfc.cedeActive(millisToCede);
+ }
+
+ @Override
+ public void gracefulFailover() throws IOException, AccessControlException {
+ zkfc.checkRpcAdminAccess();
+ zkfc.gracefulFailoverToYou();
+ }
+
+}
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Thu May 3 01:56:33 2012
@@ -18,21 +18,32 @@
package org.apache.hadoop.ha;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -41,6 +52,8 @@ import org.apache.zookeeper.data.ACL;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.LimitedPrivate("HDFS")
public abstract class ZKFailoverController implements Tool {
@@ -85,6 +98,7 @@ public abstract class ZKFailoverControll
private HealthMonitor healthMonitor;
private ActiveStandbyElector elector;
+ protected ZKFCRpcServer rpcServer;
private HAServiceTarget localTarget;
@@ -93,6 +107,22 @@ public abstract class ZKFailoverControll
/** Set if a fatal error occurs */
private String fatalError = null;
+ /**
+ * A future nanotime before which the ZKFC will not join the election.
+ * This is used during graceful failover.
+ */
+ private long delayJoiningUntilNanotime = 0;
+
+ /** Executor on which {@link #scheduleRecheck(long)} schedules events */
+ private ScheduledExecutorService delayExecutor =
+ Executors.newScheduledThreadPool(1,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("ZKFC Delay timer #%d")
+ .build());
+
+ private ActiveAttemptRecord lastActiveAttemptRecord;
+ private Object activeAttemptRecordLock = new Object();
+
@Override
public void setConf(Configuration conf) {
this.conf = conf;
@@ -104,6 +134,10 @@ public abstract class ZKFailoverControll
protected abstract HAServiceTarget getLocalTarget();
protected abstract HAServiceTarget dataToTarget(byte[] data);
protected abstract void loginAsFCUser() throws IOException;
+ protected abstract void checkRpcAdminAccess()
+ throws AccessControlException, IOException;
+ protected abstract InetSocketAddress getRpcAddressToBindTo();
+ protected abstract PolicyProvider getPolicyProvider();
/**
* Return the name of a znode inside the configured parent znode in which
@@ -194,10 +228,14 @@ public abstract class ZKFailoverControll
return ERR_CODE_NO_FENCER;
}
+ initRPC();
initHM();
+ startRPC();
try {
mainLoop();
} finally {
+ rpcServer.stopAndJoin();
+
elector.quitElection(true);
healthMonitor.shutdown();
healthMonitor.join();
@@ -262,6 +300,16 @@ public abstract class ZKFailoverControll
healthMonitor.addCallback(new HealthCallbacks());
healthMonitor.start();
}
+
+ protected void initRPC() throws IOException {
+ InetSocketAddress bindAddr = getRpcAddressToBindTo();
+ rpcServer = new ZKFCRpcServer(conf, bindAddr, this, getPolicyProvider());
+ }
+
+ protected void startRPC() throws IOException {
+ rpcServer.start();
+ }
+
private void initZK() throws HadoopIllegalArgumentException, IOException {
zkQuorum = conf.get(ZK_QUORUM_KEY);
@@ -328,10 +376,18 @@ public abstract class ZKFailoverControll
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy(
conf, FailoverController.getRpcTimeoutToNewActive(conf)),
createReqInfo());
- LOG.info("Successfully transitioned " + localTarget +
- " to active state");
+ String msg = "Successfully transitioned " + localTarget +
+ " to active state";
+ LOG.info(msg);
+ recordActiveAttempt(new ActiveAttemptRecord(true, msg));
+
} catch (Throwable t) {
- LOG.fatal("Couldn't make " + localTarget + " active", t);
+ String msg = "Couldn't make " + localTarget + " active";
+ LOG.fatal(msg, t);
+
+ recordActiveAttempt(new ActiveAttemptRecord(false, msg + "\n" +
+ StringUtils.stringifyException(t)));
+
if (t instanceof ServiceFailedException) {
throw (ServiceFailedException)t;
} else {
@@ -350,6 +406,69 @@ public abstract class ZKFailoverControll
}
}
+ /**
+ * Store the results of the last attempt to become active.
+ * This is used so that, during manually initiated failover,
+ * we can report back the results of the attempt to become active
+ * to the initiator of the failover.
+ */
+ private void recordActiveAttempt(
+ ActiveAttemptRecord record) {
+ synchronized (activeAttemptRecordLock) {
+ lastActiveAttemptRecord = record;
+ activeAttemptRecordLock.notifyAll();
+ }
+ }
+
+ /**
+ * Wait until one of the following events:
+ * <ul>
+ * <li>Another thread publishes the results of an attempt to become active
+ * using {@link #recordActiveAttempt(ActiveAttemptRecord)}</li>
+ * <li>The node enters bad health status</li>
+ * <li>The specified timeout elapses</li>
+ * </ul>
+ *
+ * @param timeoutMillis number of millis to wait
+ * @return the published record, or null if the timeout elapses or the
+ * service becomes unhealthy
+ * @throws InterruptedException if the thread is interrupted.
+ */
+ private ActiveAttemptRecord waitForActiveAttempt(int timeoutMillis)
+ throws InterruptedException {
+ long st = System.nanoTime();
+ long waitUntil = st + TimeUnit.NANOSECONDS.convert(
+ timeoutMillis, TimeUnit.MILLISECONDS);
+
+ do {
+ // periodically check health state, because entering an
+ // unhealthy state could prevent us from ever attempting to
+ // become active. We can detect this and respond to the user
+ // immediately.
+ synchronized (this) {
+ if (lastHealthState != State.SERVICE_HEALTHY) {
+ // early out if service became unhealthy
+ return null;
+ }
+ }
+
+ synchronized (activeAttemptRecordLock) {
+ if ((lastActiveAttemptRecord != null &&
+ lastActiveAttemptRecord.nanoTime >= st)) {
+ return lastActiveAttemptRecord;
+ }
+ // Only wait 1sec so that we periodically recheck the health state
+ // above.
+ activeAttemptRecordLock.wait(1000);
+ }
+ } while (System.nanoTime() < waitUntil);
+
+ // Timeout elapsed.
+ LOG.warn(timeoutMillis + "ms timeout elapsed waiting for an attempt " +
+ "to become active");
+ return null;
+ }
+
private StateChangeRequestInfo createReqInfo() {
return new StateChangeRequestInfo(RequestSource.REQUEST_BY_ZKFC);
}
@@ -369,6 +488,304 @@ public abstract class ZKFailoverControll
// at the same time.
}
}
+
+
+ private synchronized void fenceOldActive(byte[] data) {
+ HAServiceTarget target = dataToTarget(data);
+
+ try {
+ doFence(target);
+ } catch (Throwable t) {
+ recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
+ Throwables.propagate(t);
+ }
+ }
+
+ private void doFence(HAServiceTarget target) {
+ LOG.info("Should fence: " + target);
+ boolean gracefulWorked = new FailoverController(conf,
+ RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
+ if (gracefulWorked) {
+ // It's possible that it's in standby but just about to go into active,
+ // no? Is there some race here?
+ LOG.info("Successfully transitioned " + target + " to standby " +
+ "state without fencing");
+ return;
+ }
+
+ try {
+ target.checkFencingConfigured();
+ } catch (BadFencingConfigurationException e) {
+ LOG.error("Couldn't fence old active " + target, e);
+ recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
+ throw new RuntimeException(e);
+ }
+
+ if (!target.getFencer().fence(target)) {
+ throw new RuntimeException("Unable to fence " + target);
+ }
+ }
+
+
+ /**
+ * Request from graceful failover to cede active role. Causes
+ * this ZKFC to transition its local node to standby, then quit
+ * the election for the specified period of time, after which it
+ * will rejoin iff it is healthy.
+ */
+ void cedeActive(final int millisToCede)
+ throws AccessControlException, ServiceFailedException, IOException {
+ try {
+ UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doCedeActive(millisToCede);
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void doCedeActive(int millisToCede)
+ throws AccessControlException, ServiceFailedException, IOException {
+ int timeout = FailoverController.getGracefulFenceTimeout(conf);
+
+ // Lock elector to maintain lock ordering of elector -> ZKFC
+ synchronized (elector) {
+ synchronized (this) {
+ if (millisToCede <= 0) {
+ delayJoiningUntilNanotime = 0;
+ recheckElectability();
+ return;
+ }
+
+ LOG.info("Requested by " + UserGroupInformation.getCurrentUser() +
+ " at " + Server.getRemoteAddress() + " to cede active role.");
+ boolean needFence = false;
+ try {
+ localTarget.getProxy(conf, timeout).transitionToStandby(createReqInfo());
+ LOG.info("Successfully ensured local node is in standby mode");
+ } catch (IOException ioe) {
+ LOG.warn("Unable to transition local node to standby: " +
+ ioe.getLocalizedMessage());
+ LOG.warn("Quitting election but indicating that fencing is " +
+ "necessary");
+ needFence = true;
+ }
+ delayJoiningUntilNanotime = System.nanoTime() +
+ TimeUnit.MILLISECONDS.toNanos(millisToCede);
+ elector.quitElection(needFence);
+ }
+ }
+ recheckElectability();
+ }
+
+ /**
+ * Coordinate a graceful failover to this node.
+ * @throws ServiceFailedException if the node fails to become active
+ * @throws IOException some other error occurs
+ */
+ void gracefulFailoverToYou() throws ServiceFailedException, IOException {
+ try {
+ UserGroupInformation.getLoginUser().doAs(new PrivilegedExceptionAction<Void>() {
+ @Override
+ public Void run() throws Exception {
+ doGracefulFailover();
+ return null;
+ }
+
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Coordinate a graceful failover. This proceeds in several phases:
+ * 1) Pre-flight checks: ensure that the local node is healthy, and
+ * thus a candidate for failover.
+ * 2) Determine the current active node. If it is the local node, no
+ * need to failover - return success.
+ * 3) Ask that node to yield from the election for a number of seconds.
+ * 4) Allow the normal election path to run in other threads. Wait until
+ * we either become unhealthy or we see an election attempt recorded by
+ * the normal code path.
+ * 5) Allow the old active to rejoin the election, so a future
+ * failback is possible.
+ */
+ private void doGracefulFailover()
+ throws ServiceFailedException, IOException, InterruptedException {
+ int timeout = FailoverController.getGracefulFenceTimeout(conf) * 2;
+
+ // Phase 1: pre-flight checks
+ checkEligibleForFailover();
+
+ // Phase 2: determine old/current active node. Check that we're not
+ // ourselves active, etc.
+ HAServiceTarget oldActive = getCurrentActive();
+ if (oldActive == null) {
+ // No node is currently active. So, if we aren't already
+ // active ourselves by means of a normal election, then there's
+ // probably something preventing us from becoming active.
+ throw new ServiceFailedException(
+ "No other node is currently active.");
+ }
+
+ if (oldActive.getAddress().equals(localTarget.getAddress())) {
+ LOG.info("Local node " + localTarget + " is already active. " +
+ "No need to failover. Returning success.");
+ return;
+ }
+
+ // Phase 3: ask the old active to yield from the election.
+ LOG.info("Asking " + oldActive + " to cede its active state for " +
+ timeout + "ms");
+ ZKFCProtocol oldZkfc = oldActive.getZKFCProxy(conf, timeout);
+ oldZkfc.cedeActive(timeout);
+
+ // Phase 4: wait for the normal election to make the local node
+ // active.
+ ActiveAttemptRecord attempt = waitForActiveAttempt(timeout + 60000);
+
+ if (attempt == null) {
+ // We didn't even make an attempt to become active.
+ synchronized(this) {
+ if (lastHealthState != State.SERVICE_HEALTHY) {
+ throw new ServiceFailedException("Unable to become active. " +
+ "Service became unhealthy while trying to failover.");
+ }
+ }
+
+ throw new ServiceFailedException("Unable to become active. " +
+ "Local node did not get an opportunity to do so from ZooKeeper, " +
+ "or the local node took too long to transition to active.");
+ }
+
+ // Phase 5. At this point, we made some attempt to become active. So we
+ // can tell the old active to rejoin if it wants. This allows a quick
+ // fail-back if we immediately crash.
+ oldZkfc.cedeActive(-1);
+
+ if (attempt.succeeded) {
+ LOG.info("Successfully became active. " + attempt.status);
+ } else {
+ // Propagate failure
+ String msg = "Failed to become active. " + attempt.status;
+ throw new ServiceFailedException(msg);
+ }
+ }
+
+ /**
+ * Ensure that the local node is in a healthy state, and thus
+ * eligible for graceful failover.
+ * @throws ServiceFailedException if the node is unhealthy
+ */
+ private synchronized void checkEligibleForFailover()
+ throws ServiceFailedException {
+ // Check health
+ if (this.getLastHealthState() != State.SERVICE_HEALTHY) {
+ throw new ServiceFailedException(
+ localTarget + " is not currently healthy. " +
+ "Cannot be failover target");
+ }
+ }
+
+ /**
+ * @return an {@link HAServiceTarget} for the current active node
+ * in the cluster, or null if no node is active.
+ * @throws IOException if a ZK-related issue occurs
+ * @throws InterruptedException if thread is interrupted
+ */
+ private HAServiceTarget getCurrentActive()
+ throws IOException, InterruptedException {
+ synchronized (elector) {
+ synchronized (this) {
+ byte[] activeData;
+ try {
+ activeData = elector.getActiveData();
+ } catch (ActiveNotFoundException e) {
+ return null;
+ } catch (KeeperException ke) {
+ throw new IOException(
+ "Unexpected ZooKeeper issue fetching active node info", ke);
+ }
+
+ HAServiceTarget oldActive = dataToTarget(activeData);
+ return oldActive;
+ }
+ }
+ }
+
+ /**
+ * Check the current state of the service, and join the election
+ * if it should be in the election.
+ */
+ private void recheckElectability() {
+ // Maintain lock ordering of elector -> ZKFC
+ synchronized (elector) {
+ synchronized (this) {
+ boolean healthy = lastHealthState == State.SERVICE_HEALTHY;
+
+ long remainingDelay = delayJoiningUntilNanotime - System.nanoTime();
+ if (remainingDelay > 0) {
+ if (healthy) {
+ LOG.info("Would have joined master election, but this node is " +
+ "prohibited from doing so for " +
+ TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms");
+ }
+ scheduleRecheck(remainingDelay);
+ return;
+ }
+
+ switch (lastHealthState) {
+ case SERVICE_HEALTHY:
+ elector.joinElection(targetToData(localTarget));
+ break;
+
+ case INITIALIZING:
+ LOG.info("Ensuring that " + localTarget + " does not " +
+ "participate in active master election");
+ elector.quitElection(false);
+ break;
+
+ case SERVICE_UNHEALTHY:
+ case SERVICE_NOT_RESPONDING:
+ LOG.info("Quitting master election for " + localTarget +
+ " and marking that fencing is necessary");
+ elector.quitElection(true);
+ break;
+
+ case HEALTH_MONITOR_FAILED:
+ fatalError("Health monitor failed!");
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unhandled state:" + lastHealthState);
+ }
+ }
+ }
+ }
+
+ /**
+ * Schedule a call to {@link #recheckElectability()} in the future.
+ */
+ private void scheduleRecheck(long whenNanos) {
+ delayExecutor.schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ try {
+ recheckElectability();
+ } catch (Throwable t) {
+ fatalError("Failed to recheck electability: " +
+ StringUtils.stringifyException(t));
+ }
+ }
+ },
+ whenNanos, TimeUnit.NANOSECONDS);
+ }
/**
* @return the last health state passed to the FC
@@ -383,6 +800,11 @@ public abstract class ZKFailoverControll
ActiveStandbyElector getElectorForTests() {
return elector;
}
+
+ @VisibleForTesting
+ ZKFCRpcServer getRpcServerForTests() {
+ return rpcServer;
+ }
/**
* Callbacks from elector
@@ -409,32 +831,7 @@ public abstract class ZKFailoverControll
@Override
public void fenceOldActive(byte[] data) {
- HAServiceTarget target = dataToTarget(data);
-
- LOG.info("Should fence: " + target);
- boolean gracefulWorked = new FailoverController(conf,
- RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
- if (gracefulWorked) {
- // It's possible that it's in standby but just about to go into active,
- // no? Is there some race here?
- LOG.info("Successfully transitioned " + target + " to standby " +
- "state without fencing");
- return;
- }
-
- try {
- target.checkFencingConfigured();
- } catch (BadFencingConfigurationException e) {
- LOG.error("Couldn't fence old active " + target, e);
- // TODO: see below todo
- throw new RuntimeException(e);
- }
-
- if (!target.getFencer().fence(target)) {
- // TODO: this will end up in some kind of tight loop,
- // won't it? We need some kind of backoff
- throw new RuntimeException("Unable to fence " + target);
- }
+ ZKFailoverController.this.fenceOldActive(data);
}
@Override
@@ -451,34 +848,21 @@ public abstract class ZKFailoverControll
public void enteredState(HealthMonitor.State newState) {
LOG.info("Local service " + localTarget +
" entered state: " + newState);
- switch (newState) {
- case SERVICE_HEALTHY:
- LOG.info("Joining master election for " + localTarget);
- elector.joinElection(targetToData(localTarget));
- break;
-
- case INITIALIZING:
- LOG.info("Ensuring that " + localTarget + " does not " +
- "participate in active master election");
- elector.quitElection(false);
- break;
-
- case SERVICE_UNHEALTHY:
- case SERVICE_NOT_RESPONDING:
- LOG.info("Quitting master election for " + localTarget +
- " and marking that fencing is necessary");
- elector.quitElection(true);
- break;
-
- case HEALTH_MONITOR_FAILED:
- fatalError("Health monitor failed!");
- break;
-
- default:
- throw new IllegalArgumentException("Unhandled state:" + newState);
- }
-
lastHealthState = newState;
+ recheckElectability();
}
}
+
+ private static class ActiveAttemptRecord {
+ private final boolean succeeded;
+ private final String status;
+ private final long nanoTime;
+
+ public ActiveAttemptRecord(boolean succeeded, String status) {
+ this.succeeded = succeeded;
+ this.status = status;
+ this.nanoTime = System.nanoTime();
+ }
+ }
+
}
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java?rev=1333288&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolClientSideTranslatorPB.java Thu May 3 01:56:33 2012
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha.protocolPB;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ZKFCProtocol;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
+import org.apache.hadoop.ipc.ProtobufHelper;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.ProtocolTranslator;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+
+public class ZKFCProtocolClientSideTranslatorPB implements
+ ZKFCProtocol, Closeable, ProtocolTranslator {
+
+ private final static RpcController NULL_CONTROLLER = null;
+ private final ZKFCProtocolPB rpcProxy;
+
+ public ZKFCProtocolClientSideTranslatorPB(
+ InetSocketAddress addr, Configuration conf,
+ SocketFactory socketFactory, int timeout) throws IOException {
+ RPC.setProtocolEngine(conf, ZKFCProtocolPB.class,
+ ProtobufRpcEngine.class);
+ rpcProxy = RPC.getProxy(ZKFCProtocolPB.class,
+ RPC.getProtocolVersion(ZKFCProtocolPB.class), addr,
+ UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
+ }
+
+ @Override
+ public void cedeActive(int millisToCede) throws IOException,
+ AccessControlException {
+ try {
+ CedeActiveRequestProto req = CedeActiveRequestProto.newBuilder()
+ .setMillisToCede(millisToCede)
+ .build();
+ rpcProxy.cedeActive(NULL_CONTROLLER, req);
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+ @Override
+ public void gracefulFailover() throws IOException, AccessControlException {
+ try {
+ rpcProxy.gracefulFailover(NULL_CONTROLLER,
+ GracefulFailoverRequestProto.getDefaultInstance());
+ } catch (ServiceException e) {
+ throw ProtobufHelper.getRemoteException(e);
+ }
+ }
+
+
+ @Override
+ public void close() {
+ RPC.stopProxy(rpcProxy);
+ }
+
+ @Override
+ public Object getUnderlyingProxyObject() {
+ return rpcProxy;
+ }
+}
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java?rev=1333288&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolPB.java Thu May 3 01:56:33 2012
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha.protocolPB;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.ZKFCProtocolService;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.security.KerberosInfo;
+
+@KerberosInfo(
+ serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
+@ProtocolInfo(protocolName = "org.apache.hadoop.ha.ZKFCProtocol",
+ protocolVersion = 1)
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ZKFCProtocolPB extends
+ ZKFCProtocolService.BlockingInterface, VersionedProtocol {
+ /**
+ * If any methods need annotation, it can be added here
+ */
+}
\ No newline at end of file
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java?rev=1333288&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/ZKFCProtocolServerSideTranslatorPB.java Thu May 3 01:56:33 2012
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ha.protocolPB;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ha.ZKFCProtocol;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveRequestProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.CedeActiveResponseProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverRequestProto;
+import org.apache.hadoop.ha.proto.ZKFCProtocolProtos.GracefulFailoverResponseProto;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+@InterfaceAudience.Private
+@InterfaceStability.Stable
+public class ZKFCProtocolServerSideTranslatorPB implements
+ ZKFCProtocolPB {
+ private final ZKFCProtocol server;
+
+ public ZKFCProtocolServerSideTranslatorPB(ZKFCProtocol server) {
+ this.server = server;
+ }
+
+ @Override
+ public CedeActiveResponseProto cedeActive(RpcController controller,
+ CedeActiveRequestProto request) throws ServiceException {
+ try {
+ server.cedeActive(request.getMillisToCede());
+ return CedeActiveResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public GracefulFailoverResponseProto gracefulFailover(
+ RpcController controller, GracefulFailoverRequestProto request)
+ throws ServiceException {
+ try {
+ server.gracefulFailover();
+ return GracefulFailoverResponseProto.getDefaultInstance();
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ }
+
+ @Override
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
+ return RPC.getProtocolVersion(ZKFCProtocolPB.class);
+ }
+
+ @Override
+ public ProtocolSignature getProtocolSignature(String protocol,
+ long clientVersion, int clientMethodsHash) throws IOException {
+ if (!protocol.equals(RPC.getProtocolName(ZKFCProtocolPB.class))) {
+ throw new IOException("Serverside implements " +
+ RPC.getProtocolName(ZKFCProtocolPB.class) +
+ ". The following requested protocol is unknown: " + protocol);
+ }
+
+ return ProtocolSignature.getProtocolSignature(clientMethodsHash,
+ RPC.getProtocolVersion(ZKFCProtocolPB.class),
+ HAServiceProtocolPB.class);
+ }
+
+}
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/packages/templates/conf/hadoop-policy.xml Thu May 3 01:56:33 2012
@@ -223,6 +223,12 @@
<description>ACL for HAService protocol used by HAAdmin to manage the
active and stand-by states of namenode.</description>
</property>
+ <property>
+ <name>security.zkfc.protocol.acl</name>
+ <value>*</value>
+ <description>ACL for access to the ZK Failover Controller
+ </description>
+ </property>
<property>
<name>security.mrhs.client.protocol.acl</name>
Added: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto?rev=1333288&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto (added)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/main/proto/ZKFCProtocol.proto Thu May 3 01:56:33 2012
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.ha.proto";
+option java_outer_classname = "ZKFCProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+
+message CedeActiveRequestProto {
+ required uint32 millisToCede = 1;
+}
+
+message CedeActiveResponseProto {
+}
+
+message GracefulFailoverRequestProto {
+}
+
+message GracefulFailoverResponseProto {
+}
+
+
+/**
+ * Protocol provides manual control of the ZK Failover Controllers
+ */
+service ZKFCProtocolService {
+ /**
+ * Request that the service cede its active state, and quit the election
+ * for some amount of time
+ */
+ rpc cedeActive(CedeActiveRequestProto)
+ returns(CedeActiveResponseProto);
+
+
+ rpc gracefulFailover(GracefulFailoverRequestProto)
+ returns(GracefulFailoverResponseProto);
+}
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java Thu May 3 01:56:33 2012
@@ -40,13 +40,15 @@ class DummyHAService extends HAServiceTa
private static final String DUMMY_FENCE_KEY = "dummy.fence.key";
volatile HAServiceState state;
HAServiceProtocol proxy;
+ ZKFCProtocol zkfcProxy = null;
NodeFencer fencer;
InetSocketAddress address;
boolean isHealthy = true;
boolean actUnreachable = false;
- boolean failToBecomeActive;
+ boolean failToBecomeActive, failToBecomeStandby, failToFence;
DummySharedResource sharedResource;
+ public int fenceCount = 0;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
@@ -83,12 +85,24 @@ class DummyHAService extends HAServiceTa
}
@Override
+ public InetSocketAddress getZKFCAddress() {
+ return null;
+ }
+
+ @Override
public HAServiceProtocol getProxy(Configuration conf, int timeout)
throws IOException {
return proxy;
}
@Override
+ public ZKFCProtocol getZKFCProxy(Configuration conf, int timeout)
+ throws IOException {
+ assert zkfcProxy != null;
+ return zkfcProxy;
+ }
+
+ @Override
public NodeFencer getFencer() {
return fencer;
}
@@ -139,6 +153,9 @@ class DummyHAService extends HAServiceTa
public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailedException,
AccessControlException, IOException {
checkUnreachable();
+ if (failToBecomeStandby) {
+ throw new ServiceFailedException("injected failure");
+ }
if (sharedResource != null) {
sharedResource.release(DummyHAService.this);
}
@@ -167,7 +184,6 @@ class DummyHAService extends HAServiceTa
}
public static class DummyFencer implements FenceMethod {
-
public void checkArgs(String args) throws BadFencingConfigurationException {
}
@@ -176,6 +192,13 @@ class DummyHAService extends HAServiceTa
throws BadFencingConfigurationException {
LOG.info("tryFence(" + target + ")");
DummyHAService svc = (DummyHAService)target;
+ synchronized (svc) {
+ svc.fenceCount++;
+ }
+ if (svc.failToFence) {
+ LOG.info("Injected failure to fence");
+ return false;
+ }
svc.sharedResource.release(svc);
return true;
}
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java Thu May 3 01:56:33 2012
@@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthMonitor.State;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
import org.apache.zookeeper.KeeperException.NoNodeException;
@@ -126,6 +128,10 @@ public class MiniZKFCCluster {
public ActiveStandbyElector getElector(int i) {
return thrs[i].zkfc.getElectorForTests();
}
+
+ public DummyZKFC getZkfc(int i) {
+ return thrs[i].zkfc;
+ }
public void setHealthy(int idx, boolean healthy) {
svcs[idx].isHealthy = healthy;
@@ -134,6 +140,14 @@ public class MiniZKFCCluster {
public void setFailToBecomeActive(int idx, boolean doFail) {
svcs[idx].failToBecomeActive = doFail;
}
+
+ public void setFailToBecomeStandby(int idx, boolean doFail) {
+ svcs[idx].failToBecomeStandby = doFail;
+ }
+
+ public void setFailToFence(int idx, boolean doFail) {
+ svcs[idx].failToFence = doFail;
+ }
public void setUnreachable(int idx, boolean unreachable) {
svcs[idx].actUnreachable = unreachable;
@@ -290,5 +304,25 @@ public class MiniZKFCCluster {
protected String getScopeInsideParentNode() {
return DUMMY_CLUSTER;
}
+
+ @Override
+ protected void checkRpcAdminAccess() throws AccessControlException {
+ }
+
+ @Override
+ protected InetSocketAddress getRpcAddressToBindTo() {
+ return new InetSocketAddress(0);
+ }
+
+ @Override
+ protected void initRPC() throws IOException {
+ super.initRPC();
+ localTarget.zkfcProxy = this.getRpcServerForTests();
+ }
+
+ @Override
+ protected PolicyProvider getPolicyProvider() {
+ return null;
+ }
}
}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java?rev=1333288&r1=1333287&r2=1333288&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java (original)
+++ hadoop/common/branches/HDFS-3042/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java Thu May 3 01:56:33 2012
@@ -27,6 +27,7 @@ import org.apache.hadoop.ha.HAServicePro
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
@@ -378,6 +379,205 @@ public class TestZKFailoverController ex
cluster.stop();
}
}
+
+ /**
+ * Test that the ZKFC can gracefully cede its active status.
+ */
+ @Test(timeout=15000)
+ public void testCedeActive() throws Exception {
+ try {
+ cluster.start();
+ DummyZKFC zkfc = cluster.getZkfc(0);
+ // It should be in active to start.
+ assertEquals(ActiveStandbyElector.State.ACTIVE,
+ zkfc.getElectorForTests().getStateForTests());
+
+ // Ask it to cede active for 3 seconds. It should respond promptly
+ // (i.e. the RPC itself should not take 3 seconds!)
+ ZKFCProtocol proxy = zkfc.getLocalTarget().getZKFCProxy(conf, 5000);
+ long st = System.currentTimeMillis();
+ proxy.cedeActive(3000);
+ long et = System.currentTimeMillis();
+ assertTrue("RPC to cedeActive took " + (et - st) + " ms",
+ et - st < 1000);
+
+ // Should be in "INIT" state since it's not in the election
+ // at this point.
+ assertEquals(ActiveStandbyElector.State.INIT,
+ zkfc.getElectorForTests().getStateForTests());
+
+ // After the prescribed 3 seconds, should go into STANDBY state,
+ // since the other node in the cluster would have taken ACTIVE.
+ cluster.waitForElectorState(0, ActiveStandbyElector.State.STANDBY);
+ long et2 = System.currentTimeMillis();
+ assertTrue("Should take ~3 seconds to rejoin. Only took " + (et2 - et) +
+ "ms before rejoining.",
+ et2 - et > 2800);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailover() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ cluster.waitForActiveLockHolder(1);
+ cluster.getService(0).getZKFCProxy(conf, 5000).gracefulFailover();
+ cluster.waitForActiveLockHolder(0);
+
+ assertEquals(0, cluster.getService(0).fenceCount);
+ assertEquals(0, cluster.getService(1).fenceCount);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverToUnhealthy() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+
+ // Mark it unhealthy, wait for it to exit election
+ cluster.setHealthy(1, false);
+ cluster.waitForElectorState(1, ActiveStandbyElector.State.INIT);
+
+ // Ask for failover, it should fail, because it's unhealthy
+ try {
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ fail("Did not fail to graceful failover to unhealthy service!");
+ } catch (ServiceFailedException sfe) {
+ GenericTestUtils.assertExceptionContains(
+ cluster.getService(1).toString() +
+ " is not currently healthy.", sfe);
+ }
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverFailBecomingActive() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+ cluster.setFailToBecomeActive(1, true);
+
+ // Ask for failover, it should fail and report back to user.
+ try {
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ fail("Did not fail to graceful failover when target failed " +
+ "to become active!");
+ } catch (ServiceFailedException sfe) {
+ GenericTestUtils.assertExceptionContains(
+ "Couldn't make " + cluster.getService(1) + " active", sfe);
+ GenericTestUtils.assertExceptionContains(
+ "injected failure", sfe);
+ }
+
+ // No fencing
+ assertEquals(0, cluster.getService(0).fenceCount);
+ assertEquals(0, cluster.getService(1).fenceCount);
+
+ // Service 0 should go back to being active after the failed failover
+ cluster.waitForActiveLockHolder(0);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverFailBecomingStandby() throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+
+ // Ask for failover when old node fails to transition to standby.
+ // This should trigger fencing, since the cedeActive() command
+ // still works, but leaves the breadcrumb in place.
+ cluster.setFailToBecomeStandby(0, true);
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+
+ // Check that the old node was fenced
+ assertEquals(1, cluster.getService(0).fenceCount);
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ @Test(timeout=15000)
+ public void testGracefulFailoverFailBecomingStandbyAndFailFence()
+ throws Exception {
+ try {
+ cluster.start();
+
+ cluster.waitForActiveLockHolder(0);
+
+ // Ask for failover when old node fails to transition to standby.
+ // This should trigger fencing, since the cedeActive() command
+ // still works, but leaves the breadcrumb in place.
+ cluster.setFailToBecomeStandby(0, true);
+ cluster.setFailToFence(0, true);
+
+ try {
+ cluster.getService(1).getZKFCProxy(conf, 5000).gracefulFailover();
+ fail("Failover should have failed when old node wont fence");
+ } catch (ServiceFailedException sfe) {
+ GenericTestUtils.assertExceptionContains(
+ "Unable to fence " + cluster.getService(0), sfe);
+ }
+ } finally {
+ cluster.stop();
+ }
+ }
+
+ /**
+ * Test which exercises all of the inputs into ZKFC. This is particularly
+ * useful for running under jcarder to check for lock order violations.
+ */
+ @Test(timeout=30000)
+ public void testOneOfEverything() throws Exception {
+ try {
+ cluster.start();
+
+ // Failover by session expiration
+ LOG.info("====== Failing over by session expiration");
+ cluster.expireAndVerifyFailover(0, 1);
+ cluster.expireAndVerifyFailover(1, 0);
+
+ // Restart ZK
+ LOG.info("====== Restarting server");
+ stopServer();
+ waitForServerDown(hostPort, CONNECTION_TIMEOUT);
+ startServer();
+ waitForServerUp(hostPort, CONNECTION_TIMEOUT);
+
+ // Failover by bad health
+ cluster.setHealthy(0, false);
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
+ cluster.setHealthy(1, true);
+ cluster.setHealthy(0, false);
+ cluster.waitForHAState(1, HAServiceState.ACTIVE);
+ cluster.waitForHAState(0, HAServiceState.STANDBY);
+ cluster.setHealthy(0, true);
+
+ cluster.waitForHealthState(0, State.SERVICE_HEALTHY);
+
+ // Graceful failovers
+ cluster.getZkfc(1).gracefulFailoverToYou();
+ cluster.getZkfc(0).gracefulFailoverToYou();
+ } finally {
+ cluster.stop();
+ }
+ }
private int runFC(DummyHAService target, String ... args) throws Exception {
DummyZKFC zkfc = new DummyZKFC(target);