You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2009/05/11 22:21:59 UTC
svn commit: r773677 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/master/
src/java/org/apache/hadoop/hbase/regionserver/
src/test/org/apache/hadoop/hbase/
Author: stack
Date: Mon May 11 20:21:59 2009
New Revision: 773677
URL: http://svn.apache.org/viewvc?rev=773677&view=rev
Log:
HBASE-1311 ZooKeeperWrapper: Failed to set watcher on ZNode /hbase/master
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestZooKeeper.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=773677&r1=773676&r2=773677&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Mon May 11 20:21:59 2009
@@ -121,6 +121,8 @@
(Ryan Rawson via Stack)
HBASE-1399 Can't drop tables since HBASE-1398 (Ryan Rawson via Andrew
Purtell)
+ HBASE-1311 ZooKeeperWrapper: Failed to set watcher on ZNode /hbase/master
+ (Nitay Joffe via Stack)
IMPROVEMENTS
HBASE-1089 Add count of regions on filesystem to master UI; add percentage
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=773677&r1=773676&r2=773677&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java Mon May 11 20:21:59 2009
@@ -521,7 +521,19 @@
return Collections.unmodifiableMap(onlineMetaRegions);
}
}
-
+
+ public boolean metaRegionsInTransition() {
+ synchronized (onlineMetaRegions) {
+ for (MetaRegion metaRegion : onlineMetaRegions.values()) {
+ String regionName = Bytes.toString(metaRegion.getRegionName());
+ if (regionIsInTransition(regionName)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
/**
* Stop the root and meta scanners so that the region servers serving meta
* regions can shut down.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java?rev=773677&r1=773676&r2=773677&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java Mon May 11 20:21:59 2009
@@ -69,8 +69,9 @@
protected boolean metaTableAvailable() {
boolean available = true;
- if (master.regionManager.numMetaRegions() !=
- master.regionManager.numOnlineMetaRegions()) {
+ if ((master.regionManager.numMetaRegions() !=
+ master.regionManager.numOnlineMetaRegions()) ||
+ master.regionManager.metaRegionsInTransition()) {
// We can't proceed because not all of the meta regions are online.
// We can't block either because that would prevent the meta region
// online message from being processed. In order to prevent spinning
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=773677&r1=773676&r2=773677&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon May 11 20:21:59 2009
@@ -107,6 +107,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -136,7 +137,7 @@
// If false, the file system has become unavailable
protected volatile boolean fsOk;
- protected final HServerInfo serverInfo;
+ protected HServerInfo serverInfo;
protected final HBaseConfiguration conf;
private final ServerConnection connection;
@@ -167,10 +168,10 @@
// Server to handle client requests. Default access so can be accessed by
// unit tests.
- final HBaseServer server;
+ HBaseServer server;
// Leases
- private final Leases leases;
+ private Leases leases;
// Request counter
private volatile AtomicInteger requestCount = new AtomicInteger();
@@ -193,20 +194,20 @@
private RegionServerMetrics metrics;
// Compactions
- final CompactSplitThread compactSplitThread;
+ CompactSplitThread compactSplitThread;
// Cache flushing
- final MemcacheFlusher cacheFlusher;
+ MemcacheFlusher cacheFlusher;
/* Check for major compactions.
*/
- final Chore majorCompactionChecker;
+ Chore majorCompactionChecker;
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected volatile HLog log;
- final LogRoller logRoller;
- final LogFlusher logFlusher;
+ LogRoller logRoller;
+ LogFlusher logFlusher;
// limit compactions while starting up
CompactionLimitThread compactionLimitThread;
@@ -217,13 +218,23 @@
final Map<String, InternalScanner> scanners =
new ConcurrentHashMap<String, InternalScanner>();
- private final ZooKeeperWrapper zooKeeperWrapper;
+ private ZooKeeperWrapper zooKeeperWrapper;
// A sleeper that sleeps for msgInterval.
private final Sleeper sleeper;
private final long rpcTimeout;
+ // Address passed in to constructor.
+ private final HServerAddress address;
+
+ // The main region server thread.
+ private Thread regionServerThread;
+
+ // Run HDFS shutdown thread on exit if this is set. We clear this out when
+ // doing a restart() to prevent closing of HDFS.
+ private final AtomicBoolean shutdownHDFS = new AtomicBoolean(true);
+
/**
* Starts a HRegionServer at the default location
* @param conf
@@ -241,7 +252,8 @@
* @throws IOException
*/
public HRegionServer(HServerAddress address, HBaseConfiguration conf)
- throws IOException {
+ throws IOException {
+ this.address = address;
this.abortRequested = false;
this.fsOk = true;
this.conf = conf;
@@ -258,29 +270,26 @@
sleeper = new Sleeper(this.msgInterval, this.stopRequested);
- // Cache flushing thread.
- this.cacheFlusher = new MemcacheFlusher(conf, this);
-
- // Compaction thread
- this.compactSplitThread = new CompactSplitThread(this);
-
- // Log rolling thread
- this.logRoller = new LogRoller(this);
-
- // Log flushing thread
- this.logFlusher =
- new LogFlusher(this.threadWakeFrequency, this.stopRequested);
-
- // Background thread to check for major compactions; needed if region
- // has not gotten updates in a while. Make it run at a lesser frequency.
- int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
- ".multiplier", 1000);
- this.majorCompactionChecker = new MajorCompactionChecker(this,
- this.threadWakeFrequency * multiplier, this.stopRequested);
-
// Task thread to process requests from Master
this.worker = new Worker();
- this.workerThread = new Thread(worker);
+
+ this.numRegionsToReport =
+ conf.getInt("hbase.regionserver.numregionstoreport", 10);
+
+ this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
+
+ reinitialize();
+ }
+
+ /**
+ * Creates all of the state that needs to be reconstructed in case we are
+ * doing a restart. This is shared between the constructor and restart().
+ * @throws IOException
+ */
+ private void reinitialize() throws IOException {
+ abortRequested = false;
+ stopRequested.set(false);
+ shutdownHDFS.set(true);
// Server to handle client requests
this.server = HBaseRPC.getServer(this, address.getBindAddress(),
@@ -296,12 +305,23 @@
new InetSocketAddress(address.getBindAddress(),
this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
this.conf.getInt("hbase.regionserver.info.port", 60030), machineName);
-
if (this.serverInfo.getServerAddress() == null) {
throw new NullPointerException("Server address cannot be null; " +
"hbase-958 debugging");
}
- this.zooKeeperWrapper = new ZooKeeperWrapper(conf);
+
+ reinitializeThreads();
+
+ reinitializeZooKeeper();
+
+ int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
+ for(int i = 0; i < nbBlocks; i++) {
+ reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
+ }
+ }
+
+ private void reinitializeZooKeeper() throws IOException {
+ zooKeeperWrapper = new ZooKeeperWrapper(conf);
watchMasterAddress();
boolean startCodeOk = false;
@@ -312,19 +332,34 @@
LOG.debug("Start code already taken, trying another one");
}
}
+ }
+
+ private void reinitializeThreads() {
+ this.workerThread = new Thread(worker);
+
+ // Cache flushing thread.
+ this.cacheFlusher = new MemcacheFlusher(conf, this);
- this.numRegionsToReport =
- conf.getInt("hbase.regionserver.numregionstoreport", 10);
-
- this.leases = new Leases(
- conf.getInt("hbase.regionserver.lease.period", 60 * 1000),
- this.threadWakeFrequency);
+ // Compaction thread
+ this.compactSplitThread = new CompactSplitThread(this);
- int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
- for(int i = 0; i < nbBlocks; i++) {
- reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
- }
- this.rpcTimeout = conf.getLong("hbase.regionserver.lease.period", 60000);
+ // Log rolling thread
+ this.logRoller = new LogRoller(this);
+
+ // Log flushing thread
+ this.logFlusher =
+ new LogFlusher(this.threadWakeFrequency, this.stopRequested);
+
+ // Background thread to check for major compactions; needed if region
+ // has not gotten updates in a while. Make it run at a lesser frequency.
+ int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
+ ".multiplier", 1000);
+ this.majorCompactionChecker = new MajorCompactionChecker(this,
+ this.threadWakeFrequency * multiplier, this.stopRequested);
+
+ this.leases = new Leases(
+ conf.getInt("hbase.regionserver.lease.period", 60 * 1000),
+ this.threadWakeFrequency);
}
/**
@@ -336,14 +371,25 @@
*/
public void process(WatchedEvent event) {
EventType type = event.getType();
- LOG.info("Got ZooKeeper event, state: " + event.getState() + ", type: " +
+ KeeperState state = event.getState();
+ LOG.info("Got ZooKeeper event, state: " + state + ", type: " +
type + ", path: " + event.getPath());
- if (type == EventType.NodeCreated) {
- getMaster();
+
+ // Ignore events if we're shutting down.
+ if (stopRequested.get()) {
+ LOG.debug("Ignoring ZooKeeper event while shutting down");
+ return;
}
- // ZooKeeper watches are one time only, so we need to re-register our watch.
- watchMasterAddress();
+ if (state == KeeperState.Expired) {
+ LOG.error("ZooKeeper session expired");
+ restart();
+ } else if (type == EventType.NodeCreated) {
+ getMaster();
+
+ // ZooKeeper watches are one time only, so we need to re-register our watch.
+ watchMasterAddress();
+ }
}
private void watchMasterAddress() {
@@ -353,12 +399,41 @@
}
}
+ private void restart() {
+ LOG.info("Restarting Region Server");
+
+ shutdownHDFS.set(false);
+ abort();
+ Threads.shutdown(regionServerThread);
+
+ boolean done = false;
+ while (!done) {
+ try {
+ reinitialize();
+ done = true;
+ } catch (IOException e) {
+ LOG.debug("Error trying to reinitialize ZooKeeper", e);
+ }
+ }
+
+ Thread t = new Thread(this);
+ String name = regionServerThread.getName();
+ t.setName(name);
+ t.start();
+ }
+
+ /** @return ZooKeeperWrapper used by RegionServer. */
+ public ZooKeeperWrapper getZooKeeperWrapper() {
+ return zooKeeperWrapper;
+ }
+
/**
* The HRegionServer sticks in this loop until closed. It repeatedly checks
* in with the HMaster, sending heartbeats & reports, and receiving HRegion
* load/unload instructions.
*/
public void run() {
+ regionServerThread = Thread.currentThread();
boolean quiesceRequested = false;
try {
init(reportForDuty());
@@ -600,8 +675,11 @@
}
join();
- runThread(this.hdfsShutdownThread,
- this.conf.getLong("hbase.dfs.shutdown.wait", 30000));
+ if (shutdownHDFS.get()) {
+ runThread(this.hdfsShutdownThread,
+ this.conf.getLong("hbase.dfs.shutdown.wait", 30000));
+ }
+
LOG.info(Thread.currentThread().getName() + " exiting");
}
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=773677&r1=773676&r2=773677&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Mon May 11 20:21:59 2009
@@ -110,8 +110,7 @@
* @param serverNumber Used as index into a list.
*/
public void abortRegionServer(int serverNumber) {
- HRegionServer server =
- this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
+ HRegionServer server = getRegionServer(serverNumber);
LOG.info("Aborting " + server.getServerInfo().toString());
server.abort();
}
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestZooKeeper.java?rev=773677&r1=773676&r2=773677&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/TestZooKeeper.java Mon May 11 20:21:59 2009
@@ -21,10 +21,14 @@
import java.io.IOException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -106,4 +110,43 @@
System.err.println("ZooKeeper should have timed out");
connection.relocateRegion(HConstants.ROOT_TABLE_NAME, HConstants.EMPTY_BYTE_ARRAY);
}
+
+ /**
+ *
+ */
+ public void testRegionServerSessionExpired() {
+ try {
+ new HTable(conf, HConstants.META_TABLE_NAME);
+
+ String quorumServers = zooKeeperCluster.getQuorumServers();
+ int sessionTimeout = conf.getInt("zookeeper.session.timeout", 2 * 1000);
+
+ Watcher watcher = new EmptyWatcher();
+ HRegionServer rs = cluster.getRegionServer(0);
+ ZooKeeperWrapper rsZK = rs.getZooKeeperWrapper();
+ long sessionID = rsZK.getSessionID();
+ byte[] password = rsZK.getSessionPassword();
+
+ ZooKeeper zk = new ZooKeeper(quorumServers, sessionTimeout, watcher, sessionID, password);
+ zk.close();
+
+ Thread.sleep(sessionTimeout * 3);
+
+ new HTable(conf, HConstants.META_TABLE_NAME);
+
+ HBaseAdmin admin = new HBaseAdmin(conf);
+ HTableDescriptor desc = new HTableDescriptor("test");
+ HColumnDescriptor family = new HColumnDescriptor("fam:");
+ desc.addFamily(family);
+ admin.createTable(desc);
+
+ HTable table = new HTable("test");
+ BatchUpdate batchUpdate = new BatchUpdate("testrow");
+ batchUpdate.put("fam:col", Bytes.toBytes("testdata"));
+ table.commit(batchUpdate);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
}