You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/04/03 00:12:37 UTC
svn commit: r1308596 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
test/java/org/apache/hadoop/hbase/TestZooKeeper.java
Author: larsh
Date: Mon Apr 2 22:12:36 2012
New Revision: 1308596
URL: http://svn.apache.org/viewvc?rev=1308596&view=rev
Log:
HBASE-5682 Allow HConnectionImplementation to recover from ZK connection loss
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1308596&r1=1308595&r2=1308596&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Mon Apr 2 22:12:36 2012
@@ -32,9 +32,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.NoSuchElementException;
import java.util.Set;
-import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -498,11 +496,11 @@ public class HConnectionManager {
private volatile HMasterInterface master;
private volatile boolean masterChecked;
// ZooKeeper reference
- private ZooKeeperWatcher zooKeeper;
+ private volatile ZooKeeperWatcher zooKeeper;
// ZooKeeper-based master address tracker
- private MasterAddressTracker masterAddressTracker;
- private RootRegionTracker rootRegionTracker;
- private ClusterId clusterId;
+ private volatile MasterAddressTracker masterAddressTracker;
+ private volatile RootRegionTracker rootRegionTracker;
+ private volatile ClusterId clusterId;
private final Object metaRegionLock = new Object();
@@ -574,35 +572,43 @@ public class HConnectionManager {
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
- setupZookeeperTrackers();
-
this.master = null;
this.masterChecked = false;
}
- private synchronized void setupZookeeperTrackers()
- throws ZooKeeperConnectionException{
+ private synchronized void ensureZookeeperTrackers()
+ throws ZooKeeperConnectionException {
// initialize zookeeper and master address manager
- this.zooKeeper = getZooKeeperWatcher();
- masterAddressTracker = new MasterAddressTracker(this.zooKeeper, this);
- masterAddressTracker.start();
-
- this.rootRegionTracker = new RootRegionTracker(this.zooKeeper, this);
- this.rootRegionTracker.start();
-
- this.clusterId = new ClusterId(this.zooKeeper, this);
+ if (zooKeeper == null) {
+ zooKeeper = getZooKeeperWatcher();
+ }
+ if (clusterId == null) {
+ clusterId = new ClusterId(zooKeeper, this);
+ }
+ if (masterAddressTracker == null) {
+ masterAddressTracker = new MasterAddressTracker(zooKeeper, this);
+ masterAddressTracker.start();
+ }
+ if (rootRegionTracker == null) {
+ rootRegionTracker = new RootRegionTracker(zooKeeper, this);
+ rootRegionTracker.start();
+ }
}
- private synchronized void resetZooKeeperTrackers()
- throws ZooKeeperConnectionException {
- LOG.info("Trying to reconnect to zookeeper");
- masterAddressTracker.stop();
- masterAddressTracker = null;
- rootRegionTracker.stop();
- rootRegionTracker = null;
+ private synchronized void resetZooKeeperTrackers() {
+ if (masterAddressTracker != null) {
+ masterAddressTracker.stop();
+ masterAddressTracker = null;
+ }
+ if (rootRegionTracker != null) {
+ rootRegionTracker.stop();
+ rootRegionTracker = null;
+ }
clusterId = null;
- this.zooKeeper = null;
- setupZookeeperTrackers();
+ if (zooKeeper != null) {
+ zooKeeper.close();
+ zooKeeper = null;
+ }
}
public Configuration getConfiguration() {
@@ -623,6 +629,7 @@ public class HConnectionManager {
LOG.info("Exception contacting master. Retrying...", ute.getCause());
}
+ ensureZookeeperTrackers();
checkIfBaseNodeAvailable();
ServerName sn = null;
synchronized (this.masterLock) {
@@ -762,6 +769,7 @@ public class HConnectionManager {
// The root region is always enabled
return online;
}
+ getZooKeeperWatcher();
String tableNameStr = Bytes.toString(tableName);
try {
if (online) {
@@ -807,11 +815,10 @@ public class HConnectionManager {
throw new IllegalArgumentException(
"table name cannot be null or zero length");
}
-
+ ensureZookeeperTrackers();
if (Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME)) {
try {
- ServerName servername =
- this.rootRegionTracker.waitRootRegionLocation(this.rpcTimeout);
+ ServerName servername = this.rootRegionTracker.getRootRegionLocation();
LOG.debug("Looked up root region location, connection=" + this +
"; serverName=" + ((servername == null)? "": servername.toString()));
if (servername == null) return null;
@@ -1255,6 +1262,7 @@ public class HConnectionManager {
} else {
rsName = Addressing.createHostAndPortStr(hostname, port);
}
+ ensureZookeeperTrackers();
// See if we already have a connection (common case)
server = this.servers.get(rsName);
if (server == null) {
@@ -1642,25 +1650,21 @@ public class HConnectionManager {
@Override
public void abort(final String msg, Throwable t) {
- if (t instanceof KeeperException.SessionExpiredException) {
- try {
- LOG.info("This client just lost it's session with ZooKeeper, trying" +
- " to reconnect.");
- resetZooKeeperTrackers();
- LOG.info("Reconnected successfully. This disconnect could have been" +
+ if (t instanceof KeeperException) {
+ LOG.info("This client just lost it's session with ZooKeeper, will"
+ + " automatically reconnect when needed.");
+ if (t instanceof KeeperException.SessionExpiredException) {
+ LOG.info("ZK session expired. This disconnect could have been" +
" caused by a network partition or a long-running GC pause," +
" either way it's recommended that you verify your environment.");
- return;
- } catch (ZooKeeperConnectionException e) {
- LOG.error("Could not reconnect to ZooKeeper after session" +
- " expiration, aborting");
- t = e;
+ resetZooKeeperTrackers();
}
+ return;
}
if (t != null) LOG.fatal(msg, t);
else LOG.fatal(msg);
this.aborted = true;
- this.closed = true;
+ close();
}
@Override
@@ -1675,6 +1679,7 @@ public class HConnectionManager {
public int getCurrentNrHRS() throws IOException {
try {
+ getZooKeeperWatcher();
// We go to zk rather than to master to get count of regions to avoid
// HTable having a Master dependency. See HBase-2828
return ZKUtil.getNumberOfChildren(this.zooKeeper,
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1308596&r1=1308595&r2=1308596&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Mon Apr 2 22:12:36 2012
@@ -26,6 +26,10 @@ import static org.junit.Assert.assertNul
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -58,12 +62,17 @@ public class TestZooKeeper {
private final static HBaseTestingUtility
TEST_UTIL = new HBaseTestingUtility();
+ private static HConnection persistentConnection;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
+ // create a connection *before* the cluster is started, to validate that the
+ // connection's ZK trackers are initialized on demand
+ persistentConnection = HConnectionManager.createConnection(TEST_UTIL.getConfiguration());
+
// Test we can first start the ZK cluster by itself
TEST_UTIL.startMiniZKCluster();
TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
@@ -75,6 +84,7 @@ public class TestZooKeeper {
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ persistentConnection.close();
TEST_UTIL.shutdownMiniCluster();
}
@@ -93,40 +103,28 @@ public class TestZooKeeper {
*/
@Test
public void testClientSessionExpired()
- throws IOException, InterruptedException {
+ throws Exception {
LOG.info("testClientSessionExpired");
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
new HTable(c, HConstants.META_TABLE_NAME).close();
- String quorumServers = ZKConfig.getZKQuorumServersString(c);
- int sessionTimeout = 5 * 1000; // 5 seconds
HConnection connection = HConnectionManager.getConnection(c);
ZooKeeperWatcher connectionZK = connection.getZooKeeperWatcher();
- long sessionID = connectionZK.getRecoverableZooKeeper().getSessionId();
- byte[] password = connectionZK.getRecoverableZooKeeper().getSessionPasswd();
- ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout,
- EmptyWatcher.instance, sessionID, password);
- LOG.info("Session timeout=" + zk.getSessionTimeout() +
- ", original=" + sessionTimeout +
- ", id=" + zk.getSessionId());
- zk.close();
-
- Thread.sleep(sessionTimeout * 3L);
+ TEST_UTIL.expireSession(connectionZK, null);
// provoke session expiration by doing something with ZK
ZKUtil.dump(connectionZK);
// Check that the old ZK connection is closed, means we did expire
System.err.println("ZooKeeper should have timed out");
- String state = connectionZK.getRecoverableZooKeeper().getState().toString();
LOG.info("state=" + connectionZK.getRecoverableZooKeeper().getState());
Assert.assertTrue(connectionZK.getRecoverableZooKeeper().getState().
equals(States.CLOSED));
// Check that the client recovered
ZooKeeperWatcher newConnectionZK = connection.getZooKeeperWatcher();
- LOG.info("state=" + newConnectionZK.getRecoverableZooKeeper().getState());
- Assert.assertTrue(newConnectionZK.getRecoverableZooKeeper().getState().equals(
- States.CONNECTED));
+ States state = newConnectionZK.getRecoverableZooKeeper().getState();
+ LOG.info("state=" + state);
+ Assert.assertTrue(state.equals(States.CONNECTED) || state.equals(States.CONNECTING));
}
@Test
@@ -148,21 +146,33 @@ public class TestZooKeeper {
* Make sure we can use the cluster
* @throws Exception
*/
- public void testSanity() throws Exception{
- HBaseAdmin admin =
- new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration()));
+ private void testSanity() throws Exception {
+ String tableName = "test"+System.currentTimeMillis();
+ HBaseAdmin admin = new HBaseAdmin(new Configuration(TEST_UTIL.getConfiguration()));
+ testAdminSanity(admin, tableName);
+ HTable table = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+ testTableSanity(table, tableName);
+ }
+
+ private void testSanity(HConnection conn, ExecutorService pool) throws Exception {
String tableName = "test"+System.currentTimeMillis();
+ HBaseAdmin admin = new HBaseAdmin(persistentConnection);
+ testAdminSanity(admin, tableName);
+ HTable table = new HTable(Bytes.toBytes(tableName), persistentConnection, pool);
+ testTableSanity(table, tableName);
+
+ }
+ private void testAdminSanity(HBaseAdmin admin, String tableName) throws Exception {
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor family = new HColumnDescriptor("fam");
desc.addFamily(family);
LOG.info("Creating table " + tableName);
admin.createTable(desc);
+ }
- HTable table =
- new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
+ private void testTableSanity(HTable table, String tableName) throws Exception {
Put put = new Put(Bytes.toBytes("testrow"));
- put.add(Bytes.toBytes("fam"),
- Bytes.toBytes("col"), Bytes.toBytes("testdata"));
+ put.add(Bytes.toBytes("fam"), Bytes.toBytes("col"), Bytes.toBytes("testdata"));
LOG.info("Putting table " + tableName);
table.put(put);
table.close();
@@ -229,6 +239,16 @@ public class TestZooKeeper {
}
}
+ /**
+ * Test with a connection that existed before the cluster was started
+ */
+ @Test
+ public void testPersistentConnection() throws Exception {
+ ExecutorService pool = new ThreadPoolExecutor(1, 10, 10, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
+ testSanity(persistentConnection, pool);
+ }
+
private void testKey(String ensemble, String port, String znode)
throws IOException {
Configuration conf = new Configuration();