You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/12/10 23:10:03 UTC
svn commit: r1419832 - in
/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./
src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/
Author: todd
Date: Mon Dec 10 22:10:02 2012
New Revision: 1419832
URL: http://svn.apache.org/viewvc?rev=1419832&view=rev
Log:
HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection establishment delay. Contributed by Rakesh R and Todd Lipcon.
Modified:
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Mon Dec 10 22:10:02 2012
@@ -178,6 +178,9 @@ Release 2.0.3-alpha - Unreleased
HADOOP-6762. Exception while doing RPC I/O closes channel
(Sam Rash and todd via todd)
+ HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection
+ establishment delay. (Rakesh R and todd via todd)
+
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Mon Dec 10 22:10:02 2012
@@ -21,6 +21,8 @@ package org.apache.hadoop.ha;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -45,6 +47,7 @@ import org.apache.zookeeper.KeeperExcept
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
/**
*
@@ -205,7 +208,7 @@ public class ActiveStandbyElector implem
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
List<ZKAuthInfo> authInfo,
ActiveStandbyElectorCallback app) throws IOException,
- HadoopIllegalArgumentException {
+ HadoopIllegalArgumentException, KeeperException {
if (app == null || acl == null || parentZnodeName == null
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
throw new HadoopIllegalArgumentException("Invalid argument");
@@ -602,10 +605,24 @@ public class ActiveStandbyElector implem
*
* @return new zookeeper client instance
* @throws IOException
+ * @throws KeeperException zookeeper connectionloss exception
*/
- protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
- ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
- zk.register(new WatcherWithClientRef(zk));
+ protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
+ KeeperException {
+
+ // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
+ // may trigger the Connected event immediately. So, if we register the
+ // watcher after constructing ZooKeeper, we may miss that event. Instead,
+ // we construct the watcher first, and have it queue any events it receives
+ // before we can set its ZooKeeper reference.
+ WatcherWithClientRef watcher = new WatcherWithClientRef();
+ ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
+ watcher.setZooKeeperRef(zk);
+
+ // Wait for the asynchronous success/failure. This may throw an exception
+ // if we don't connect within the session timeout.
+ watcher.waitForZKConnectionEvent(zkSessionTimeout);
+
for (ZKAuthInfo auth : zkAuthInfo) {
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
}
@@ -710,13 +727,16 @@ public class ActiveStandbyElector implem
} catch(IOException e) {
LOG.warn(e);
sleepFor(5000);
+ } catch(KeeperException e) {
+ LOG.warn(e);
+ sleepFor(5000);
}
++connectionRetryCount;
}
return success;
}
- private void createConnection() throws IOException {
+ private void createConnection() throws IOException, KeeperException {
if (zkClient != null) {
try {
zkClient.close();
@@ -973,14 +993,76 @@ public class ActiveStandbyElector implem
* events.
*/
private final class WatcherWithClientRef implements Watcher {
- private final ZooKeeper zk;
+ private ZooKeeper zk;
+
+ /**
+ * Latch fired whenever any event arrives. This is used in order
+ * to wait for the Connected event when the client is first created.
+ */
+ private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
+
+ /**
+ * If any events arrive before the reference to ZooKeeper is set,
+ * they get queued up and later forwarded when the reference is
+ * available.
+ */
+ private final List<WatchedEvent> queuedEvents = Lists.newLinkedList();
+
+ private WatcherWithClientRef() {
+ }
private WatcherWithClientRef(ZooKeeper zk) {
this.zk = zk;
}
+
+ /**
+ * Waits for the next event from ZooKeeper to arrive.
+ *
+ * @param connectionTimeoutMs zookeeper connection timeout in milliseconds
+ * @throws KeeperException if the connection attempt times out. This will
+ * be a ZooKeeper ConnectionLoss exception code.
+ * @throws IOException if interrupted while connecting to ZooKeeper
+ */
+ private void waitForZKConnectionEvent(int connectionTimeoutMs)
+ throws KeeperException, IOException {
+ try {
+ if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
+ LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
+ + connectionTimeoutMs + " milliseconds");
+ synchronized (this) {
+ zk.close();
+ }
+ throw KeeperException.create(Code.CONNECTIONLOSS);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IOException(
+ "Interrupted when connecting to zookeeper server", e);
+ }
+ }
+
+ private synchronized void setZooKeeperRef(ZooKeeper zk) {
+ Preconditions.checkState(this.zk == null,
+ "zk already set -- must be set exactly once");
+ this.zk = zk;
+
+ for (WatchedEvent e : queuedEvents) {
+ forwardEvent(e);
+ }
+ queuedEvents.clear();
+ }
@Override
- public void process(WatchedEvent event) {
+ public synchronized void process(WatchedEvent event) {
+ if (zk != null) {
+ forwardEvent(event);
+ } else {
+ queuedEvents.add(event);
+ }
+ }
+
+ private void forwardEvent(WatchedEvent event) {
+ hasReceivedEvent.countDown();
try {
ActiveStandbyElector.this.processWatchEvent(
zk, event);
@@ -1024,5 +1106,4 @@ public class ActiveStandbyElector implem
((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
" cb=" + appClient;
}
-
}
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Mon Dec 10 22:10:02 2012
@@ -180,7 +180,15 @@ public abstract class ZKFailoverControll
private int doRun(String[] args)
throws HadoopIllegalArgumentException, IOException, InterruptedException {
- initZK();
+ try {
+ initZK();
+ } catch (KeeperException ke) {
+ LOG.fatal("Unable to start failover controller. Unable to connect "
+ + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
+ + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
+ + "ZooKeeper is running.");
+ return ERR_CODE_NO_ZK;
+ }
if (args.length > 0) {
if ("-formatZK".equals(args[0])) {
boolean force = false;
@@ -199,24 +207,12 @@ public abstract class ZKFailoverControll
badArg(args[0]);
}
}
-
- try {
- if (!elector.parentZNodeExists()) {
- LOG.fatal("Unable to start failover controller. " +
- "Parent znode does not exist.\n" +
- "Run with -formatZK flag to initialize ZooKeeper.");
- return ERR_CODE_NO_PARENT_ZNODE;
- }
- } catch (IOException ioe) {
- if (ioe.getCause() instanceof KeeperException.ConnectionLossException) {
- LOG.fatal("Unable to start failover controller. Unable to connect " +
- "to ZooKeeper quorum at " + zkQuorum + ". Please check the " +
- "configured value for " + ZK_QUORUM_KEY + " and ensure that " +
- "ZooKeeper is running.");
- return ERR_CODE_NO_ZK;
- } else {
- throw ioe;
- }
+
+ if (!elector.parentZNodeExists()) {
+ LOG.fatal("Unable to start failover controller. "
+ + "Parent znode does not exist.\n"
+ + "Run with -formatZK flag to initialize ZooKeeper.");
+ return ERR_CODE_NO_PARENT_ZNODE;
}
try {
@@ -310,7 +306,8 @@ public abstract class ZKFailoverControll
}
- private void initZK() throws HadoopIllegalArgumentException, IOException {
+ private void initZK() throws HadoopIllegalArgumentException, IOException,
+ KeeperException {
zkQuorum = conf.get(ZK_QUORUM_KEY);
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
ZK_SESSION_TIMEOUT_DEFAULT);
Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Mon Dec 10 22:10:02 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.HadoopIllegalAr
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
+import org.apache.hadoop.test.GenericTestUtils;
public class TestActiveStandbyElector {
@@ -56,7 +57,8 @@ public class TestActiveStandbyElector {
private int sleptFor = 0;
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
- List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
+ List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException,
+ KeeperException {
super(hostPort, timeout, parent, acl,
Collections.<ZKAuthInfo>emptyList(), app);
}
@@ -83,7 +85,7 @@ public class TestActiveStandbyElector {
ActiveStandbyElector.BREADCRUMB_FILENAME;
@Before
- public void init() throws IOException {
+ public void init() throws IOException, KeeperException {
count = 0;
mockZK = Mockito.mock(ZooKeeper.class);
mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
@@ -705,4 +707,18 @@ public class TestActiveStandbyElector {
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
}
+
+ /**
+ * verify the zookeeper connection establishment
+ */
+ @Test
+ public void testWithoutZKServer() throws Exception {
+ try {
+ new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME,
+ Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp);
+ Assert.fail("Did not throw zookeeper connection loss exceptions!");
+ } catch (KeeperException ke) {
+ GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke);
+ }
+ }
}