You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/04/03 00:12:37 UTC

svn commit: r1308596 - in /hbase/branches/0.94/src: main/java/org/apache/hadoop/hbase/client/HConnectionManager.java test/java/org/apache/hadoop/hbase/TestZooKeeper.java

Author: larsh
Date: Mon Apr  2 22:12:36 2012
New Revision: 1308596

URL: http://svn.apache.org/viewvc?rev=1308596&view=rev
Log:
HBASE-5682 Allow HConnectionImplementation to recover from ZK connection loss

Modified:
    hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java

Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1308596&r1=1308595&r2=1308596&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Apr  2 22:12:36 2012
@@ -32,9 +32,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -498,11 +496,11 @@ public class HConnectionManager {
     private volatile HMasterInterface master;
     private volatile boolean masterChecked;
     // ZooKeeper reference
-    private ZooKeeperWatcher zooKeeper;
+    private volatile ZooKeeperWatcher zooKeeper;
     // ZooKeeper-based master address tracker
-    private MasterAddressTracker masterAddressTracker;
-    private RootRegionTracker rootRegionTracker;
-    private ClusterId clusterId;
+    private volatile MasterAddressTracker masterAddressTracker;
+    private volatile RootRegionTracker rootRegionTracker;
+    private volatile ClusterId clusterId;
 
     private final Object metaRegionLock = new Object();
 
@@ -574,35 +572,43 @@ public class HConnectionManager {
           HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
           HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
 
-      setupZookeeperTrackers();
-
       this.master = null;
       this.masterChecked = false;
     }
 
-    private synchronized void setupZookeeperTrackers()
-        throws ZooKeeperConnectionException{
+    private synchronized void ensureZookeeperTrackers()
+        throws ZooKeeperConnectionException {
       // initialize zookeeper and master address manager
-      this.zooKeeper = getZooKeeperWatcher();
-      masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
-      masterAddressTracker.start();
-
-      this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
-      this.rootRegionTracker.start();
-
-      this.clusterId = new ClusterId(this.zooKeeper, this);
+      if (zooKeeper == null) {
+        zooKeeper = getZooKeeperWatcher();
+      }
+      if (clusterId == null) {
+        clusterId = new ClusterId(zooKeeper, this);
+      }
+      if (masterAddressTracker == null) {
+        masterAddressTracker = new MasterAddressTracker(zooKeeper, this);
+        masterAddressTracker.start();
+      }
+      if (rootRegionTracker == null) {
+        rootRegionTracker = new RootRegionTracker(zooKeeper, this);
+        rootRegionTracker.start();
+      }
     }
 
-    private synchronized void resetZooKeeperTrackers()
-        throws ZooKeeperConnectionException {
-      LOG.info("Trying to reconnect to zookeeper");
-      masterAddressTracker.stop();
-      masterAddressTracker = null;
-      rootRegionTracker.stop();
-      rootRegionTracker = null;
+    private synchronized void resetZooKeeperTrackers() {
+      if (masterAddressTracker != null) {
+        masterAddressTracker.stop();
+        masterAddressTracker = null;
+      }
+      if (rootRegionTracker != null) {
+        rootRegionTracker.stop();
+        rootRegionTracker = null;
+      }
       clusterId = null;
-      this.zooKeeper = null;
-      setupZookeeperTrackers();
+      if (zooKeeper != null) {
+        zooKeeper.close();
+        zooKeeper = null;
+      }
     }
 
     public Configuration getConfiguration() {
@@ -623,6 +629,7 @@ public class HConnectionManager {
         LOG.info("Exception contacting master. Retrying...", ute.getCause());
       }
 
+      ensureZookeeperTrackers();
       checkIfBaseNodeAvailable();
       ServerName sn = null;
       synchronized (this.masterLock) {
@@ -762,6 +769,7 @@ public class HConnectionManager {
         // The root region is always enabled
         return online;
       }
+      getZooKeeperWatcher();
       String tableNameStr = Bytes.toString(tableName);
       try {
         if (online) {
@@ -807,11 +815,10 @@ public class HConnectionManager {
         throw new IllegalArgumentException(
             "table name cannot be null or zero length");
       }
-
+      ensureZookeeperTrackers();
       if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
         try {
-          ServerName servername =
-            this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);
+          ServerName servername = this.rootRegionTracker.getRootRegionLocation();
           LOG.debug("Looked up root region location, connection=" + this +
             "; serverName=" + ((servername == null)? "": servername.toString()));
           if (servername == null) return null;
@@ -1255,6 +1262,7 @@ public class HConnectionManager {
       } else {
         rsName = Addressing.createHostAndPortStr(hostname, port);
       }
+      ensureZookeeperTrackers();
       // See if we already have a connection (common case)
       server = this.servers.get(rsName);
       if (server == null) {
@@ -1642,25 +1650,21 @@ public class HConnectionManager {
 
     @Override
     public void abort(final String msg, Throwable t) {
-      if (t instanceof KeeperException.SessionExpiredException) {
-        try {
-          LOG.info("This client just lost it's session with ZooKeeper, trying" +
-              " to reconnect.");
-          resetZooKeeperTrackers();
-          LOG.info("Reconnected successfully. This disconnect could have been" +
+      if (t instanceof KeeperException) {
+        LOG.info("This client just lost it's session with ZooKeeper, will"
+            + " automatically reconnect when needed.");
+        if (t instanceof KeeperException.SessionExpiredException) {
+          LOG.info("ZK session expired. This disconnect could have been" +
               " caused by a network partition or a long-running GC pause," +
               " either way it's recommended that you verify your environment.");
-          return;
-        } catch (ZooKeeperConnectionException e) {
-          LOG.error("Could not reconnect to ZooKeeper after session" +
-              " expiration, aborting");
-          t = e;
+          resetZooKeeperTrackers();
         }
+        return;
       }
       if (t != null) LOG.fatal(msg, t);
       else LOG.fatal(msg);
       this.aborted = true;
-      this.closed = true;
+      close();
     }
 
     @Override
@@ -1675,6 +1679,7 @@ public class HConnectionManager {
 
     public int getCurrentNrHRS() throws IOException {
       try {
+        getZooKeeperWatcher();
         // We go to zk rather than to master to get count of regions to avoid
         // HTable having a Master dependency.  See HBase-2828
         return ZKUtil.getNumberOfChildren(this.zooKeeper,

Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1308596&r1=1308595&r2=1308596&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Mon Apr  2 22:12:36 2012
@@ -26,6 +26,10 @@ import static org.junit.Assert.assertNul
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -58,12 +62,17 @@ public class TestZooKeeper {
 
   private final static HBaseTestingUtility
       TEST_UTIL = new HBaseTestingUtility();
+  private static HConnection persistentConnection;
 
   /**
    * @throws java.lang.Exception
    */
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
+    // create a connection *before* the cluster is started, to validate that the
+    // connection's ZK trackers are initialized on demand
+    persistentConnection = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
+
     // Test we can first start the ZK cluster by itself
     TEST_UTIL.startMiniZKCluster();
     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
@@ -75,6 +84,7 @@ public class TestZooKeeper {
    */
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    persistentConnection.close();
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -93,40 +103,28 @@ public class TestZooKeeper {
    */
   @Test
   public void testClientSessionExpired()
-  throws IOException, InterruptedException {
+  throws Exception {
     LOG.info("testClientSessionExpired");
     Configuration c = new Configuration(TEST_UTIL.getConfiguration());
     new HTable(c, HConstants.META_TABLE_NAME).close();
-    String quorumServers = ZKConfig.getZKQuorumServersString(c);
-    int sessionTimeout = 5 * 1000; // 5 seconds
     HConnection connection = HConnectionManager.getConnection(c);
     ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
-    long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId();
-    byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd();
-    ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
-        EmptyWatcher.instance, sessionID, password);
-    LOG.info("Session timeout=" + zk.getSessionTimeout() +
-      ", original=" + sessionTimeout +
-      ", id=" + zk.getSessionId());
-    zk.close();
-
-    Thread.sleep(sessionTimeout * 3L);
+    TEST_UTIL.expireSession(connectionZK, null);
 
     // provoke session expiration by doing something with ZK
     ZKUtil.dump(connectionZK);
 
     // Check that the old ZK connection is closed, means we did expire
     System.err.println("ZooKeeper should have timed out");
-    String state = connectionZK.getRecoverableZooKeeper().getState().toString();
     LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState());
     Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().
       equals(States.CLOSED));
 
     // Check that the client recovered
     ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
-    LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState());
-    Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals(
-      States.CONNECTED));
+    States state = newConnectionZK.getRecoverableZooKeeper().getState();
+    LOG.info("state=" + state);
+    Assert.assertTrue(state.equals(States.CONNECTED) || state.equals(States.CONNECTING));
   }
   
   @Test
@@ -148,21 +146,33 @@ public class TestZooKeeper {
    * Make sure we can use the cluster
    * @throws Exception
    */
-  public void testSanity() throws Exception{
-    HBaseAdmin admin =
-      new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration()));
+  private void testSanity() throws Exception {
+    String tableName = "test"+System.currentTimeMillis();
+    HBaseAdmin admin = new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration()));
+    testAdminSanity(admin, tableName);
+    HTable table = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+    testTableSanity(table, tableName);
+  }
+
+  private void testSanity(HConnection conn, ExecutorService pool) throws Exception {
     String tableName = "test"+System.currentTimeMillis();
+    HBaseAdmin admin = new HBaseAdmin(persistentConnection);
+    testAdminSanity(admin, tableName);
+    HTable table = new HTable(Bytes.toBytes(tableName), persistentConnection, pool);
+    testTableSanity(table, tableName);
+
+  }
+  private void testAdminSanity(HBaseAdmin admin, String tableName) throws Exception {
     HTableDescriptor desc = new HTableDescriptor(tableName);
     HColumnDescriptor family = new HColumnDescriptor("fam");
     desc.addFamily(family);
     LOG.info("Creating table " + tableName);
     admin.createTable(desc);
+  }
 
-    HTable table =
-      new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+  private void testTableSanity(HTable table, String tableName) throws Exception {
     Put put = new Put(Bytes.toBytes("testrow"));
-    put.add(Bytes.toBytes("fam"),
-        Bytes.toBytes("col"), Bytes.toBytes("testdata"));
+    put.add(Bytes.toBytes("fam"), Bytes.toBytes("col"), Bytes.toBytes("testdata"));
     LOG.info("Putting table " + tableName);
     table.put(put);
     table.close();
@@ -229,6 +239,16 @@ public class TestZooKeeper {
     }
   }
 
+  /**
+   * Test with a connection that existed before the cluster was started
+   */
+  @Test
+  public void testPersistentConnection() throws Exception {
+    ExecutorService pool = new ThreadPoolExecutor(1, 10, 10, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>());
+    testSanity(persistentConnection, pool);
+  }
+
   private void testKey(String ensemble, String port, String znode)
       throws IOException {
     Configuration conf = new Configuration();