You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2012/12/10 23:10:03 UTC

svn commit: r1419832 - in /hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common: ./ src/main/java/org/apache/hadoop/ha/ src/test/java/org/apache/hadoop/ha/

Author: todd
Date: Mon Dec 10 22:10:02 2012
New Revision: 1419832

URL: http://svn.apache.org/viewvc?rev=1419832&view=rev
Log:
HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection establishment delay. Contributed by Rakesh R and Todd Lipcon.

Modified:
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
    hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/CHANGES.txt Mon Dec 10 22:10:02 2012
@@ -178,6 +178,9 @@ Release 2.0.3-alpha - Unreleased 
     HADOOP-6762. Exception while doing RPC I/O closes channel
     (Sam Rash and todd via todd)
 
+    HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection
+    establishment delay. (Rakesh R and todd via todd)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java Mon Dec 10 22:10:02 2012
@@ -21,6 +21,8 @@ package org.apache.hadoop.ha;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -45,6 +47,7 @@ import org.apache.zookeeper.KeeperExcept
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * 
@@ -205,7 +208,7 @@ public class ActiveStandbyElector implem
       int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
       List<ZKAuthInfo> authInfo,
       ActiveStandbyElectorCallback app) throws IOException,
-      HadoopIllegalArgumentException {
+      HadoopIllegalArgumentException, KeeperException {
     if (app == null || acl == null || parentZnodeName == null
         || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
       throw new HadoopIllegalArgumentException("Invalid argument");
@@ -602,10 +605,24 @@ public class ActiveStandbyElector implem
    * 
    * @return new zookeeper client instance
    * @throws IOException
+   * @throws KeeperException zookeeper connectionloss exception
    */
-  protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
-    ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
-    zk.register(new WatcherWithClientRef(zk));
+  protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
+      KeeperException {
+    
+    // Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
+    // may trigger the Connected event immediately. So, if we register the
+    // watcher after constructing ZooKeeper, we may miss that event. Instead,
+    // we construct the watcher first, and have it queue any events it receives
+    // before we can set its ZooKeeper reference.
+    WatcherWithClientRef watcher = new WatcherWithClientRef();
+    ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
+    watcher.setZooKeeperRef(zk);
+
+    // Wait for the asynchronous success/failure. This may throw an exception
+    // if we don't connect within the session timeout.
+    watcher.waitForZKConnectionEvent(zkSessionTimeout);
+    
     for (ZKAuthInfo auth : zkAuthInfo) {
       zk.addAuthInfo(auth.getScheme(), auth.getAuth());
     }
@@ -710,13 +727,16 @@ public class ActiveStandbyElector implem
       } catch(IOException e) {
         LOG.warn(e);
         sleepFor(5000);
+      } catch(KeeperException e) {
+        LOG.warn(e);
+        sleepFor(5000);
       }
       ++connectionRetryCount;
     }
     return success;
   }
 
-  private void createConnection() throws IOException {
+  private void createConnection() throws IOException, KeeperException {
     if (zkClient != null) {
       try {
         zkClient.close();
@@ -973,14 +993,76 @@ public class ActiveStandbyElector implem
    * events.
    */
   private final class WatcherWithClientRef implements Watcher {
-    private final ZooKeeper zk;
+    private ZooKeeper zk;
+    
+    /**
+     * Latch fired whenever any event arrives. This is used in order
+     * to wait for the Connected event when the client is first created.
+     */
+    private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
+
+    /**
+     * If any events arrive before the reference to ZooKeeper is set,
+     * they get queued up and later forwarded when the reference is
+     * available.
+     */
+    private final List<WatchedEvent> queuedEvents = Lists.newLinkedList();
+    
+    private WatcherWithClientRef() {
+    }
 
     private WatcherWithClientRef(ZooKeeper zk) {
       this.zk = zk;
     }
+    
+    /**
+     * Waits for the next event from ZooKeeper to arrive.
+     * 
+     * @param connectionTimeoutMs zookeeper connection timeout in milliseconds
+     * @throws KeeperException if the connection attempt times out. This will
+     * be a ZooKeeper ConnectionLoss exception code.
+     * @throws IOException if interrupted while connecting to ZooKeeper
+     */
+    private void waitForZKConnectionEvent(int connectionTimeoutMs)
+        throws KeeperException, IOException {
+      try {
+        if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
+          LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
+              + connectionTimeoutMs + " milliseconds");
+          synchronized (this) {
+            zk.close();
+          }
+          throw KeeperException.create(Code.CONNECTIONLOSS);
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException(
+            "Interrupted when connecting to zookeeper server", e);
+      }
+    }
+
+    private synchronized void setZooKeeperRef(ZooKeeper zk) {
+      Preconditions.checkState(this.zk == null,
+          "zk already set -- must be set exactly once");
+      this.zk = zk;
+      
+      for (WatchedEvent e : queuedEvents) {
+        forwardEvent(e);
+      }
+      queuedEvents.clear();
+    }
 
     @Override
-    public void process(WatchedEvent event) {
+    public synchronized void process(WatchedEvent event) {
+      if (zk != null) {
+        forwardEvent(event);
+      } else {
+        queuedEvents.add(event);
+      }
+    }
+    
+    private void forwardEvent(WatchedEvent event) {
+      hasReceivedEvent.countDown();
       try {
         ActiveStandbyElector.this.processWatchEvent(
             zk, event);
@@ -1024,5 +1106,4 @@ public class ActiveStandbyElector implem
       ((appData == null) ? "null" : StringUtils.byteToHexString(appData)) + 
       " cb=" + appClient;
   }
-
 }

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java Mon Dec 10 22:10:02 2012
@@ -180,7 +180,15 @@ public abstract class ZKFailoverControll
 
   private int doRun(String[] args)
       throws HadoopIllegalArgumentException, IOException, InterruptedException {
-    initZK();
+    try {
+      initZK();
+    } catch (KeeperException ke) {
+      LOG.fatal("Unable to start failover controller. Unable to connect "
+          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
+          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
+          + "ZooKeeper is running.");
+      return ERR_CODE_NO_ZK;
+    }
     if (args.length > 0) {
       if ("-formatZK".equals(args[0])) {
         boolean force = false;
@@ -199,24 +207,12 @@ public abstract class ZKFailoverControll
         badArg(args[0]);
       }
     }
-    
-    try {
-      if (!elector.parentZNodeExists()) {
-        LOG.fatal("Unable to start failover controller. " +
-            "Parent znode does not exist.\n" +
-            "Run with -formatZK flag to initialize ZooKeeper.");
-        return ERR_CODE_NO_PARENT_ZNODE;
-      }
-    } catch (IOException ioe) {
-      if (ioe.getCause() instanceof KeeperException.ConnectionLossException) {
-        LOG.fatal("Unable to start failover controller. Unable to connect " +
-            "to ZooKeeper quorum at " + zkQuorum + ". Please check the " +
-            "configured value for " + ZK_QUORUM_KEY + " and ensure that " +
-            "ZooKeeper is running.");
-        return ERR_CODE_NO_ZK;
-      } else {
-        throw ioe;
-      }
+
+    if (!elector.parentZNodeExists()) {
+      LOG.fatal("Unable to start failover controller. "
+          + "Parent znode does not exist.\n"
+          + "Run with -formatZK flag to initialize ZooKeeper.");
+      return ERR_CODE_NO_PARENT_ZNODE;
     }
 
     try {
@@ -310,7 +306,8 @@ public abstract class ZKFailoverControll
   }
 
 
-  private void initZK() throws HadoopIllegalArgumentException, IOException {
+  private void initZK() throws HadoopIllegalArgumentException, IOException,
+      KeeperException {
     zkQuorum = conf.get(ZK_QUORUM_KEY);
     int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
         ZK_SESSION_TIMEOUT_DEFAULT);

Modified: hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java?rev=1419832&r1=1419831&r2=1419832&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java (original)
+++ hadoop/common/branches/branch-2/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java Mon Dec 10 22:10:02 2012
@@ -42,6 +42,7 @@ import org.apache.hadoop.HadoopIllegalAr
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
 import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
 import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
+import org.apache.hadoop.test.GenericTestUtils;
 
 public class TestActiveStandbyElector {
 
@@ -56,7 +57,8 @@ public class TestActiveStandbyElector {
     private int sleptFor = 0;
     
     ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
-        List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
+        List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException,
+        KeeperException {
       super(hostPort, timeout, parent, acl,
           Collections.<ZKAuthInfo>emptyList(), app);
     }
@@ -83,7 +85,7 @@ public class TestActiveStandbyElector {
       ActiveStandbyElector.BREADCRUMB_FILENAME;
 
   @Before
-  public void init() throws IOException {
+  public void init() throws IOException, KeeperException {
     count = 0;
     mockZK = Mockito.mock(ZooKeeper.class);
     mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
@@ -705,4 +707,18 @@ public class TestActiveStandbyElector {
         Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
         Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
   }
+
+  /**
+   * verify the zookeeper connection establishment
+   */
+  @Test
+  public void testWithoutZKServer() throws Exception {
+    try {
+      new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME,
+          Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp);
+      Assert.fail("Did not throw zookeeper connection loss exceptions!");
+    } catch (KeeperException ke) {
+      GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke);
+    }
+  }
 }