You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/07/06 23:56:28 UTC
svn commit: r961028 [1/2] - in /hbase/branches/0.90_master_rewrite: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbase/regionserver...
Author: jgray
Date: Tue Jul 6 21:56:27 2010
New Revision: 961028
URL: http://svn.apache.org/viewvc?rev=961028&view=rev
Log:
HBASE-2696 [part1-v5-NewClasses_RS_Tested] ZooKeeper cleanup and refactor
Added:
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/regionserver/TestMasterAddressManager.java
Modified:
hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
hbase/branches/0.90_master_rewrite/src/main/resources/hbase-webapps/regionserver/regionserver.jsp
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java
hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/zookeeper/TestHQuorumPeer.java
Modified: hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_CHANGES.txt Tue Jul 6 21:56:27 2010
@@ -9,6 +9,8 @@ Branch 0.90.0 - Master Rewrite Branch
(Karthik R via jgray)
HBASE-2695 [MasterStatus-part2.1] HMaster clean and refactor
(Karthik R via jgray)
+ HBASE-2696 [part1-v5-NewClasses_RS_Tested] ZooKeeper cleanup
+ and refactor
NEW FEATURES
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/MiniZooKeeperCluster.java Tue Jul 6 21:56:27 2010
@@ -104,7 +104,7 @@ public class MiniZooKeeperCluster {
standaloneServerFactory =
new NIOServerCnxn.Factory(new InetSocketAddress(clientPort));
} catch (BindException e) {
- LOG.info("Faild binding ZK Server to client port: " + clientPort);
+ LOG.info("Failed binding ZK Server to client port: " + clientPort);
//this port is already in use. try to use another
clientPort++;
continue;
@@ -118,7 +118,7 @@ public class MiniZooKeeperCluster {
}
started = true;
-
+ LOG.info("Started MiniZK Server on client port: " + clientPort);
return clientPort;
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ServerStatus.java Tue Jul 6 21:56:27 2010
@@ -19,9 +19,8 @@
*/
package org.apache.hadoop.hbase;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* Set of functions that are exposed by any HBase server (implemented by the
@@ -37,4 +36,14 @@ public interface ServerStatus {
* Get the configuration object for this server.
*/
public Configuration getConfiguration();
+
+ /**
+ * Get the ZooKeeper instance for this server.
+ */
+ public ZooKeeperWatcher getZooKeeper();
+
+ /**
+ * Stub method into ServerStatus to move forward with ZK cleanup.
+ */
+ public void abortServer();
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Jul 6 21:56:27 2010
@@ -19,6 +19,24 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,30 +60,13 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.MetaUtils;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
-import java.io.IOException;
-import java.lang.reflect.UndeclaredThrowableException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.CopyOnWriteArraySet;
-
/**
* A non-instantiable class that manages connections to multiple tables in
* multiple HBase instances.
@@ -147,6 +148,7 @@ public class HConnectionManager {
/**
* Delete information for all connections.
* @param stopProxy stop the proxy as well
+ * @throws IOException
*/
public static void deleteAllConnections(boolean stopProxy) {
synchronized (HBASE_INSTANCES) {
@@ -228,8 +230,8 @@ public class HConnectionManager {
*/
public synchronized ZooKeeperWrapper getZooKeeperWrapper() throws IOException {
if(zooKeeperWrapper == null) {
- zooKeeperWrapper =
- ZooKeeperWrapper.createInstance(conf, HConnectionManager.class.getName());
+ zooKeeperWrapper = new ZooKeeperWatcher(conf,
+ HConnectionManager.class.getName(), null);
zooKeeperWrapper.registerListener(this);
}
return zooKeeperWrapper;
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Tue Jul 6 21:56:27 2010
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.util.Pair
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
@@ -133,8 +134,8 @@ public class HMaster extends Thread impl
// Metrics is set when we call run.
private final MasterMetrics metrics;
- // Our zk client.
- private ZooKeeperWrapper zooKeeperWrapper;
+ // Our zk client. TODO: rename variable once we settle on naming
+ private ZooKeeperWatcher zooKeeperWrapper;
// Watcher for master address and for cluster shutdown.
private final ZKMasterAddressWatcher zkMasterAddressWatcher;
// A Sleeper that sleeps for threadWakeFrequency; sleep if nothing todo.
@@ -182,7 +183,8 @@ public class HMaster extends Thread impl
// number of RS ephemeral nodes. RS ephemeral nodes are created only after
// the primary master has written the address to ZK. So this has to be done
// before we race to write our address to zookeeper.
- zooKeeperWrapper = ZooKeeperWrapper.createInstance(conf, getHServerAddress().toString());
+ zooKeeperWrapper =
+ new ZooKeeperWatcher(conf, getHServerAddress().toString(), this);
isClusterStartup = (zooKeeperWrapper.scanRSDirectory().size() == 0);
// Create the filesystem manager, which in turn does the following:
@@ -919,8 +921,9 @@ public class HMaster extends Thread impl
zooKeeperWrapper.close();
try {
+ // TODO: this is broken, we should just shutdown now not restart
zooKeeperWrapper =
- ZooKeeperWrapper.createInstance(conf, HMaster.class.getName());
+ new ZooKeeperWatcher(conf, HMaster.class.getName(), this);
zooKeeperWrapper.registerListener(this);
this.zkMasterAddressWatcher.setZookeeper(zooKeeperWrapper);
if(!this.zkMasterAddressWatcher.
@@ -1102,4 +1105,14 @@ public class HMaster extends Thread impl
public static void main(String [] args) {
doMain(args, HMaster.class);
}
+
+ @Override
+ public void abortServer() {
+ this.startShutdown();
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return zooKeeperWrapper;
+ }
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Tue Jul 6 21:56:27 2010
@@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
@@ -26,9 +28,7 @@ import org.apache.hadoop.hbase.HServerIn
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-
-import java.io.IOException;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
* ProcessRegionOpen is instantiated when a region server reports that it is
@@ -115,10 +115,8 @@ public class ProcessRegionOpen extends P
} else {
masterStatus.getRegionManager().removeRegion(regionInfo);
}
- ZooKeeperWrapper zkWrapper =
- ZooKeeperWrapper.getInstance(masterStatus.getConfiguration(),
- masterStatus.getHServerAddress().toString());
- zkWrapper.deleteUnassignedRegion(regionInfo.getEncodedName());
+ masterStatus.getZooKeeper().deleteUnassignedRegion(
+ regionInfo.getEncodedName());
return true;
}
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Jul 6 21:56:27 2010
@@ -140,8 +140,7 @@ public class RegionManager {
masterStatus.getConfiguration().getInt(
HConstants.THREAD_WAKE_FREQUENCY,
HConstants.DEFAULT_THREAD_WAKE_FREQUENCY);
- this.zkWrapper =
- ZooKeeperWrapper.getInstance(conf, masterStatus.getHServerAddress().toString());
+ this.zkWrapper = masterStatus.getZooKeeper();
this.maxAssignInOneGo = conf.getInt("hbase.regions.percheckin", 10);
this.loadBalancer = new LoadBalancer(conf);
@@ -640,11 +639,12 @@ public class RegionManager {
} catch(Exception iex) {
LOG.warn("meta scanner", iex);
}
- ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
- masterStatus.getConfiguration(),
- masterStatus.getHServerAddress().toString());
- zkw.clearRSDirectory();
- zkw.close();
+ // TODO: Why did we getInstance again? We should have it local?
+// ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
+// masterStatus.getConfiguration(),
+// masterStatus.getHServerAddress().toString());
+ zkWrapper.clearRSDirectory();
+ zkWrapper.close();
}
/**
@@ -1233,10 +1233,7 @@ public class RegionManager {
private void writeRootRegionLocationToZooKeeper(HServerAddress address) {
for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) {
- ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
- masterStatus.getConfiguration(),
- masterStatus.getHServerAddress().toString());
- if (zkw.writeRootRegionLocation(address)) {
+ if (zkWrapper.writeRootRegionLocation(address)) {
return;
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Jul 6 21:56:27 2010
@@ -246,10 +246,7 @@ public class ServerManager {
// We must set this watcher here because it can be set on a fresh start
// or on a failover
Watcher watcher = new ServerExpirer(new HServerInfo(info));
- ZooKeeperWrapper zkw = ZooKeeperWrapper.getInstance(
- masterStatus.getConfiguration(),
- masterStatus.getHServerAddress().toString());
- zkw.updateRSLocationGetWatch(info, watcher);
+ masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
this.serversToServerInfo.put(serverName, info);
this.serversToLoad.put(serverName, load);
synchronized (this.loadToServers) {
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java Tue Jul 6 21:56:27 2010
@@ -56,7 +56,7 @@ public class ZKUnassignedWatcher impleme
throws IOException {
this.serverName = masterStatus.getHServerAddress().toString();
this.serverManager = masterStatus.getServerManager();
- zkWrapper = ZooKeeperWrapper.getInstance(conf, masterStatus.getHServerAddress().toString());
+ zkWrapper = masterStatus.getZooKeeper();
String unassignedZNode = zkWrapper.getRegionInTransitionZNode();
// If the UNASSIGNED ZNode exists and this is a fresh cluster start, then
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul 6 21:56:27 2010
@@ -71,6 +71,7 @@ import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerStatus;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
@@ -101,23 +102,20 @@ import org.apache.hadoop.hbase.util.Info
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
*/
public class HRegionServer implements HRegionInterface,
- HBaseRPCErrorHandler, Runnable, Watcher, Stoppable {
+ HBaseRPCErrorHandler, Runnable, Stoppable, ServerStatus {
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
@@ -219,7 +217,11 @@ public class HRegionServer implements HR
final Map<String, InternalScanner> scanners =
new ConcurrentHashMap<String, InternalScanner>();
- private ZooKeeperWrapper zooKeeperWrapper;
+ // zookeeper connection and watcher
+ private ZooKeeperWatcher zooKeeper;
+
+ // master address manager and watecher
+ private MasterAddressManager masterAddressManager;
// A sleeper that sleeps for msgInterval.
private final Sleeper sleeper;
@@ -284,7 +286,7 @@ public class HRegionServer implements HR
conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
- reinitialize();
+ initialize();
}
/**
@@ -293,7 +295,7 @@ public class HRegionServer implements HR
* Both call it.
* @throws IOException
*/
- private void reinitialize() throws IOException {
+ private void initialize() throws IOException {
this.abortRequested = false;
this.stopRequested.set(false);
@@ -312,22 +314,25 @@ public class HRegionServer implements HR
throw new NullPointerException("Server address cannot be null; " +
"hbase-958 debugging");
}
- reinitializeThreads();
- reinitializeZooKeeper();
+ initializeThreads();
+ initializeZooKeeper();
int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
for(int i = 0; i < nbBlocks; i++) {
reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
}
}
- private void reinitializeZooKeeper() throws IOException {
- zooKeeperWrapper =
- ZooKeeperWrapper.createInstance(conf, serverInfo.getServerName());
- zooKeeperWrapper.registerListener(this);
- watchMasterAddress();
+ private void initializeZooKeeper() throws IOException {
+ // open connection to zookeeper and set primary watcher
+ zooKeeper = new ZooKeeperWatcher(conf, serverInfo.getServerName(), this);
+
+ // create the master address manager, register with zk, and start it
+ masterAddressManager = new MasterAddressManager(zooKeeper);
+ zooKeeper.registerListener(masterAddressManager);
+ masterAddressManager.monitorMaster();
}
- private void reinitializeThreads() {
+ private void initializeThreads() {
this.workerThread = new Thread(worker);
// Cache flushing thread.
@@ -353,74 +358,6 @@ public class HRegionServer implements HR
}
/**
- * We register ourselves as a watcher on the master address ZNode. This is
- * called by ZooKeeper when we get an event on that ZNode. When this method
- * is called it means either our master has died, or a new one has come up.
- * Either way we need to update our knowledge of the master.
- * @param event WatchedEvent from ZooKeeper.
- */
- public void process(WatchedEvent event) {
- EventType type = event.getType();
- KeeperState state = event.getState();
- LOG.info("Got ZooKeeper event, state: " + state + ", type: " +
- type + ", path: " + event.getPath());
-
- // Ignore events if we're shutting down.
- if (this.stopRequested.get()) {
- LOG.debug("Ignoring ZooKeeper event while shutting down");
- return;
- }
-
- if (state == KeeperState.Expired) {
- LOG.error("ZooKeeper session expired");
- boolean restart =
- this.conf.getBoolean("hbase.regionserver.restart.on.zk.expire", false);
- if (restart) {
- restart();
- } else {
- abort("ZooKeeper session expired");
- }
- } else if (type == EventType.NodeDeleted) {
- watchMasterAddress();
- } else if (type == EventType.NodeCreated) {
- getMaster();
-
- // ZooKeeper watches are one time only, so we need to re-register our watch.
- watchMasterAddress();
- }
- }
-
- private void watchMasterAddress() {
- while (!stopRequested.get() && !zooKeeperWrapper.watchMasterAddress(this)) {
- LOG.warn("Unable to set watcher on ZooKeeper master address. Retrying.");
- sleeper.sleep();
- }
- }
-
- private void restart() {
- abort("Restarting region server");
- Threads.shutdown(regionServerThread);
- boolean done = false;
- while (!done) {
- try {
- reinitialize();
- done = true;
- } catch (IOException e) {
- LOG.debug("Error trying to reinitialize ZooKeeper", e);
- }
- }
- Thread t = new Thread(this);
- String name = regionServerThread.getName();
- t.setName(name);
- t.start();
- }
-
- /** @return ZooKeeperWrapper used by RegionServer. */
- public ZooKeeperWrapper getZooKeeperWrapper() {
- return zooKeeperWrapper;
- }
-
- /**
* The HRegionServer sticks in this loop until closed. It repeatedly checks
* in with the HMaster, sending heartbeats & reports, and receiving HRegion
* load/unload instructions.
@@ -446,7 +383,8 @@ public class HRegionServer implements HR
for (int tries = 0; !stopRequested.get() && isHealthy();) {
// Try to get the root region location from the master.
if (!haveRootRegion.get()) {
- HServerAddress rootServer = zooKeeperWrapper.readRootRegionLocation();
+ HServerAddress rootServer =
+ ZKUtil.getDataAsAddress(zooKeeper, zooKeeper.rootServerZNode);
if (rootServer != null) {
// By setting the root region location, we bypass the wait imposed on
// HTable for all regions being assigned.
@@ -646,8 +584,9 @@ public class HRegionServer implements HR
this.hbaseMaster = null;
}
+ this.zooKeeper.close();
+
if (!killed) {
- this.zooKeeperWrapper.close();
join();
}
LOG.info(Thread.currentThread().getName() + " exiting");
@@ -1174,21 +1113,23 @@ public class HRegionServer implements HR
Threads.shutdown(this.hlogRoller);
}
+ /**
+ * Get the current master from ZooKeeper and open the RPC connection to it.
+ *
+ * Method will block until a master is available. You can break from this
+ * block by requesting the server stop.
+ *
+ * @return
+ */
private boolean getMaster() {
HServerAddress masterAddress = null;
- while (masterAddress == null) {
- if (stopRequested.get()) {
+ while((masterAddress = masterAddressManager.getMasterAddress()) == null) {
+ if(stopRequested.get()) {
return false;
}
- try {
- masterAddress = zooKeeperWrapper.readMasterAddressOrThrow();
- } catch (IOException e) {
- LOG.warn("Unable to read master address from ZooKeeper. Retrying." +
- " Error was:", e);
- sleeper.sleep();
- }
+ LOG.debug("No master found, will retry");
+ sleeper.sleep();
}
-
LOG.info("Telling master at " + masterAddress + " that we are up");
HMasterRegionInterface master = null;
while (!stopRequested.get() && master == null) {
@@ -1230,7 +1171,7 @@ public class HRegionServer implements HR
if (LOG.isDebugEnabled())
LOG.debug("sending initial server load: " + hsl);
lastMsg = System.currentTimeMillis();
- zooKeeperWrapper.writeRSLocation(this.serverInfo);
+ zooKeeper.writeRSLocation(this.serverInfo);
result = this.hbaseMaster.regionServerStartup(this.serverInfo);
break;
} catch (IOException e) {
@@ -1406,7 +1347,7 @@ public class HRegionServer implements HR
Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
HRegion region = this.onlineRegions.get(mapKey);
RSZookeeperUpdater zkUpdater =
- new RSZookeeperUpdater(conf, serverInfo.getServerName(),
+ new RSZookeeperUpdater(zooKeeper, serverInfo.getServerName(),
regionInfo.getEncodedName());
if (region == null) {
try {
@@ -1491,7 +1432,7 @@ public class HRegionServer implements HR
throws IOException {
RSZookeeperUpdater zkUpdater = null;
if(reportWhenCompleted) {
- zkUpdater = new RSZookeeperUpdater(conf,
+ zkUpdater = new RSZookeeperUpdater(zooKeeper,
serverInfo.getServerName(), hri.getEncodedName());
zkUpdater.startRegionCloseEvent(null, false);
}
@@ -2380,6 +2321,23 @@ public class HRegionServer implements HR
return threadWakeFrequency;
}
+ // ServerStatus
+
+ @Override
+ public void abortServer() {
+ this.abort("Received abortServer call");
+ }
+
+ @Override
+ public HServerAddress getHServerAddress() {
+ return this.address;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return zooKeeper;
+ }
+
//
// Main program and support routines
//
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/MasterAddressManager.java Tue Jul 6 21:56:27 2010
@@ -0,0 +1,185 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * Manages the location of the current active Master for this RegionServer.
+ *
+ * Listens for ZooKeeper events related to the master address. The node /master
+ * will contain the address of the current master. This listener is interested
+ * in NodeDeleted and NodeCreated events on /master.
+ *
+ * This class is thread-safe and takes care of re-setting all watchers to
+ * ensure it always knows the up-to-date master. To kick it off, instantiate
+ * the class and run the {@link #monitorMaster()} method.
+ *
+ * You can get the current master via {@link #getMasterAddress()} or the
+ * blocking method {@link #waitMasterAddress()}.
+ */
+public class MasterAddressManager extends ZooKeeperListener {
+ private static final Log LOG = LogFactory.getLog(MasterAddressManager.class);
+
+ // Address of the current primary master, null if no primary master
+ private HServerAddress masterAddress;
+
+ /**
+ * Construct a master address listener with the specified zookeeper reference.
+ *
+ * This constructor does not trigger any actions, you must call methods
+ * explicitly. Normally you will just want to execute {@link #monitorMaster()}
+ * and you will ensure to
+ *
+ * @param watcher zk reference and watcher
+ */
+ public MasterAddressManager(ZooKeeperWatcher watcher) {
+ super(watcher);
+ masterAddress = null;
+ }
+
+ /**
+ * Get the address of the current master if one is available. Returns null
+ * if no current master.
+ *
+ * Use {@link #waitMasterAddress} if you want to block until the master is
+ * available.
+ * @return server address of current active master, or null if none available
+ */
+ public synchronized HServerAddress getMasterAddress() {
+ return masterAddress;
+ }
+
+ /**
+ * Check if there is a master available.
+ * @return true if there is a master set, false if not.
+ */
+ public synchronized boolean hasMaster() {
+ return masterAddress != null;
+ }
+
+ /**
+ * Get the address of the current master. If no master is available, method
+ * will block until one is available, the thread is interrupted, or timeout
+ * has passed.
+ *
+ * TODO: Make this work, currently unused, kept with existing retry semantics.
+ *
+ * @return server address of current active master, null if timed out
+ * @throws InterruptedException if the thread is interrupted while waiting
+ */
+ public synchronized HServerAddress waitForMaster()
+ throws InterruptedException {
+ return masterAddress;
+ }
+
+ /**
+ * Setup to watch for the primary master of the cluster.
+ *
+ * If the master is already available in ZooKeeper, this method will ensure
+ * it gets set and that any further changes are also watched for.
+ *
+ * If no master is available, this method ensures we become aware of it and
+ * will take care of setting it.
+ */
+ public void monitorMaster() {
+ if(ZKUtil.watchAndCheckExists(watcher, watcher.masterAddressZNode)) {
+ handleNewMaster();
+ }
+ }
+
+ @Override
+ public void nodeCreated(String path) {
+ LOG.info("nodeCreated(" + path + ")");
+ if(path.equals(watcher.masterAddressZNode)) {
+ handleNewMaster();
+ }
+ monitorMaster();
+ }
+
+ @Override
+ public void nodeDeleted(String path) {
+ if(path.equals(watcher.masterAddressZNode)) {
+ handleDeadMaster();
+ }
+ monitorMaster();
+ }
+
+ /**
+ * Set the master address to the specified address. This operation is
+ * idempotent, a master will only be set if there is currently no master set.
+ */
+ private synchronized void setMasterAddress(HServerAddress address) {
+ if(masterAddress == null) {
+ LOG.info("Found and set master address: " + address);
+ masterAddress = address;
+ }
+ }
+
+ /**
+ * Unsets the master address. Used when the master goes offline so none is
+ * available.
+ */
+ private synchronized void unsetMasterAddress() {
+ if(masterAddress != null) {
+ LOG.info("Master has been unset. There is no current master available");
+ masterAddress = null;
+ }
+ }
+
+ /**
+ * Handle a new master being set.
+ *
+ * This method should be called to check if there is a new master. If there
+ * is already a master set, this method returns immediately. If none is set,
+ * this will attempt to grab the master location from ZooKeeper and will set
+ * it.
+ *
+ * This method uses an atomic operation to ensure a new master is only set
+ * once.
+ */
+ private void handleNewMaster() {
+ if(hasMaster()) {
+ return;
+ }
+ HServerAddress address =
+ ZKUtil.getDataAsAddress(watcher, watcher.masterAddressZNode);
+ if(address != null) {
+ setMasterAddress(address);
+ }
+ }
+
+ /**
+ * Handle a master failure.
+ *
+ * Triggered when a master node is deleted.
+ *
+ * TODO: Other ways we figure master is "dead"? What do we do if set in ZK
+ * but we can't communicate with TCP?
+ */
+ private void handleDeadMaster() {
+ unsetMasterAddress();
+ }
+}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/RSZookeeperUpdater.java Tue Jul 6 21:56:27 2010
@@ -1,19 +1,14 @@
package org.apache.hadoop.hbase.regionserver;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler;
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
@@ -33,22 +28,22 @@ public class RSZookeeperUpdater {
private final String regionServerName;
private String regionName = null;
private String regionZNode = null;
- private ZooKeeperWrapper zkWrapper = null;
+ private ZooKeeperWatcher zooKeeper = null;
private int zkVersion = 0;
HBaseEventType lastUpdatedState;
- public RSZookeeperUpdater(Configuration conf,
- String regionServerName, String regionName) {
- this(conf, regionServerName, regionName, 0);
+ public RSZookeeperUpdater(ZooKeeperWatcher zooKeeper, String regionServerName,
+ String regionName) {
+ this(zooKeeper, regionServerName, regionName, 0);
}
- public RSZookeeperUpdater(Configuration conf, String regionServerName,
- String regionName, int zkVersion) {
- this.zkWrapper = ZooKeeperWrapper.getInstance(conf, regionServerName);
+ public RSZookeeperUpdater(ZooKeeperWatcher zooKeeper, String regionServerName,
+ String regionName, int zkVersion) {
+ this.zooKeeper = zooKeeper;
this.regionServerName = regionServerName;
this.regionName = regionName;
// get the region ZNode we have to create
- this.regionZNode = zkWrapper.getZNode(zkWrapper.getRegionInTransitionZNode(), regionName);
+ this.regionZNode = zooKeeper.getZNode(zooKeeper.assignmentZNode, regionName);
this.zkVersion = zkVersion;
}
@@ -59,14 +54,14 @@ public class RSZookeeperUpdater {
*/
public void startRegionCloseEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
// if this ZNode already exists, something is wrong
- if(zkWrapper.exists(regionZNode, true)) {
+ if(zooKeeper.exists(regionZNode, true)) {
String msg = "ZNode " + regionZNode + " already exists in ZooKeeper, will NOT close region.";
LOG.error(msg);
throw new IOException(msg);
}
// create the region node in the unassigned directory first
- zkWrapper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
+ zooKeeper.createZNodeIfNotExists(regionZNode, null, CreateMode.PERSISTENT, true);
// update the data for "regionName" ZNode in unassigned to CLOSING
updateZKWithEventData(HBaseEventType.RS2ZK_REGION_CLOSING, hmsg);
@@ -93,7 +88,7 @@ public class RSZookeeperUpdater {
*/
public void startRegionOpenEvent(HMsg hmsg, boolean updatePeriodically) throws IOException {
Stat stat = new Stat();
- byte[] data = zkWrapper.readZNode(regionZNode, stat);
+ byte[] data = zooKeeper.readZNode(regionZNode, stat);
// if there is no ZNode for this region, something is wrong
if(data == null) {
String msg = "ZNode " + regionZNode + " does not exist in ZooKeeper, will NOT open region.";
@@ -158,7 +153,7 @@ public class RSZookeeperUpdater {
" with [" + hbEventType + "]" +
" expected version = " + zkVersion);
lastUpdatedState = hbEventType;
- zkWrapper.writeZNode(regionZNode, data, zkVersion, true);
+ zooKeeper.writeZNode(regionZNode, data, zkVersion, true);
zkVersion++;
}
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/HQuorumPeer.java Tue Jul 6 21:56:27 2010
@@ -19,21 +19,8 @@
*/
package org.apache.hadoop.hbase.zookeeper;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.net.DNS;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.zookeeper.server.ServerConfig;
-import org.apache.zookeeper.server.ZooKeeperServerMain;
-import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-import org.apache.zookeeper.server.quorum.QuorumPeerMain;
-
import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.NetworkInterface;
@@ -41,29 +28,27 @@ import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
-import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.net.DNS;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.zookeeper.server.ServerConfig;
+import org.apache.zookeeper.server.ZooKeeperServerMain;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerMain;
/**
* HBase's version of ZooKeeper's QuorumPeer. When HBase is set to manage
* ZooKeeper, this class is used to start up QuorumPeer instances. By doing
* things in here rather than directly calling to ZooKeeper, we have more
- * control over the process. Currently, this class allows us to parse the
+ * control over the process. This class uses {@link ZKConfig} to parse the
* zoo.cfg and inject variables from HBase's site.xml configuration in.
*/
public class HQuorumPeer {
- private static final Log LOG = LogFactory.getLog(HQuorumPeer.class);
-
- private static final String VARIABLE_START = "${";
- private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
- private static final String VARIABLE_END = "}";
- private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
-
- private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property.";
- private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length();
- private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY
- + "clientPort";
-
+
/**
* Parse ZooKeeper configuration from HBase XML config and run a QuorumPeer.
* @param args String[] of command line arguments. Not used.
@@ -71,7 +56,7 @@ public class HQuorumPeer {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
try {
- Properties zkProperties = makeZKProps(conf);
+ Properties zkProperties = ZKConfig.makeZKProps(conf);
writeMyID(zkProperties);
QuorumPeerConfig zkConfig = new QuorumPeerConfig();
zkConfig.parseProperties(zkProperties);
@@ -158,195 +143,4 @@ public class HQuorumPeer {
w.println(myId);
w.close();
}
-
- /**
- * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg.
- * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse
- * the corresponding config options from the HBase XML configs and generate
- * the appropriate ZooKeeper properties.
- * @param conf Configuration to read from.
- * @return Properties holding mappings representing ZooKeeper zoo.cfg file.
- */
- public static Properties makeZKProps(Configuration conf) {
- // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
- // it and grab its configuration properties.
- ClassLoader cl = HQuorumPeer.class.getClassLoader();
- final InputStream inputStream =
- cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
- if (inputStream != null) {
- try {
- return parseZooCfg(conf, inputStream);
- } catch (IOException e) {
- LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
- ", loading from XML files", e);
- }
- }
-
- // Otherwise, use the configuration options from HBase's XML files.
- Properties zkProperties = new Properties();
-
- // Directly map all of the hbase.zookeeper.property.KEY properties.
- for (Entry<String, String> entry : conf) {
- String key = entry.getKey();
- if (key.startsWith(ZK_CFG_PROPERTY)) {
- String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE);
- String value = entry.getValue();
- // If the value has variables substitutions, need to do a get.
- if (value.contains(VARIABLE_START)) {
- value = conf.get(key);
- }
- zkProperties.put(zkKey, value);
- }
- }
-
- // If clientPort is not set, assign the default
- if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) {
- zkProperties.put(ZK_CLIENT_PORT_KEY,
- HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
- }
-
- // Create the server.X properties.
- int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
- int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
-
- final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
- "localhost");
- for (int i = 0; i < serverHosts.length; ++i) {
- String serverHost = serverHosts[i];
- String address = serverHost + ":" + peerPort + ":" + leaderPort;
- String key = "server." + i;
- zkProperties.put(key, address);
- }
-
- return zkProperties;
- }
-
- /**
- * Return the ZK Quorum servers string given zk properties returned by
- * makeZKProps
- * @param properties
- * @return
- */
- public static String getZKQuorumServersString(Properties properties) {
- String clientPort = null;
- List<String> servers = new ArrayList<String>();
-
- // The clientPort option may come after the server.X hosts, so we need to
- // grab everything and then create the final host:port comma separated list.
- boolean anyValid = false;
- for (Entry<Object,Object> property : properties.entrySet()) {
- String key = property.getKey().toString().trim();
- String value = property.getValue().toString().trim();
- if (key.equals("clientPort")) {
- clientPort = value;
- }
- else if (key.startsWith("server.")) {
- String host = value.substring(0, value.indexOf(':'));
- servers.add(host);
- try {
- //noinspection ResultOfMethodCallIgnored
- InetAddress.getByName(host);
- anyValid = true;
- } catch (UnknownHostException e) {
- LOG.warn(StringUtils.stringifyException(e));
- }
- }
- }
-
- if (!anyValid) {
- LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
- return null;
- }
-
- if (clientPort == null) {
- LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
- return null;
- }
-
- if (servers.isEmpty()) {
- LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
- "ZooKeeper cluster configured for its operation.");
- return null;
- }
-
- StringBuilder hostPortBuilder = new StringBuilder();
- for (int i = 0; i < servers.size(); ++i) {
- String host = servers.get(i);
- if (i > 0) {
- hostPortBuilder.append(',');
- }
- hostPortBuilder.append(host);
- hostPortBuilder.append(':');
- hostPortBuilder.append(clientPort);
- }
-
- return hostPortBuilder.toString();
- }
-
- /**
- * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
- * This method is used for testing so we can pass our own InputStream.
- * @param conf HBaseConfiguration to use for injecting variables.
- * @param inputStream InputStream to read from.
- * @return Properties parsed from config stream with variables substituted.
- * @throws IOException if anything goes wrong parsing config
- */
- public static Properties parseZooCfg(Configuration conf,
- InputStream inputStream) throws IOException {
- Properties properties = new Properties();
- try {
- properties.load(inputStream);
- } catch (IOException e) {
- final String msg = "fail to read properties from "
- + HConstants.ZOOKEEPER_CONFIG_NAME;
- LOG.fatal(msg);
- throw new IOException(msg, e);
- }
- for (Entry<Object, Object> entry : properties.entrySet()) {
- String value = entry.getValue().toString().trim();
- String key = entry.getKey().toString().trim();
- StringBuilder newValue = new StringBuilder();
- int varStart = value.indexOf(VARIABLE_START);
- int varEnd = 0;
- while (varStart != -1) {
- varEnd = value.indexOf(VARIABLE_END, varStart);
- if (varEnd == -1) {
- String msg = "variable at " + varStart + " has no end marker";
- LOG.fatal(msg);
- throw new IOException(msg);
- }
- String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
-
- String substituteValue = System.getProperty(variable);
- if (substituteValue == null) {
- substituteValue = conf.get(variable);
- }
- if (substituteValue == null) {
- String msg = "variable " + variable + " not set in system property "
- + "or hbase configs";
- LOG.fatal(msg);
- throw new IOException(msg);
- }
-
- newValue.append(substituteValue);
-
- varEnd += VARIABLE_END_LENGTH;
- varStart = value.indexOf(VARIABLE_START, varEnd);
- }
- // Special case for 'hbase.cluster.distributed' property being 'true'
- if (key.startsWith("server.")) {
- if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED)
- && value.startsWith("localhost")) {
- String msg = "The server in zoo.cfg cannot be set to localhost " +
- "in a fully-distributed setup because it won't be reachable. " +
- "See \"Getting Started\" for more information.";
- LOG.fatal(msg);
- throw new IOException(msg);
- }
- }
- newValue.append(value.substring(varEnd));
- properties.setProperty(key, newValue.toString());
- }
- return properties;
- }
}
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKConfig.java Tue Jul 6 21:56:27 2010
@@ -0,0 +1,252 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Utility methods for reading, parsing, and building zookeeper configuration.
+ */
+public class ZKConfig {
+ private static final Log LOG = LogFactory.getLog(ZKConfig.class);
+
+ private static final String VARIABLE_START = "${";
+ private static final int VARIABLE_START_LENGTH = VARIABLE_START.length();
+ private static final String VARIABLE_END = "}";
+ private static final int VARIABLE_END_LENGTH = VARIABLE_END.length();
+
+ private static final String ZK_CFG_PROPERTY = "hbase.zookeeper.property.";
+ private static final int ZK_CFG_PROPERTY_SIZE = ZK_CFG_PROPERTY.length();
+ private static final String ZK_CLIENT_PORT_KEY = ZK_CFG_PROPERTY
+ + "clientPort";
+
+ /**
+ * Make a Properties object holding ZooKeeper config equivalent to zoo.cfg.
+ * If there is a zoo.cfg in the classpath, simply read it in. Otherwise parse
+ * the corresponding config options from the HBase XML configs and generate
+ * the appropriate ZooKeeper properties.
+ * @param conf Configuration to read from.
+ * @return Properties holding mappings representing ZooKeeper zoo.cfg file.
+ */
+ public static Properties makeZKProps(Configuration conf) {
+ // First check if there is a zoo.cfg in the CLASSPATH. If so, simply read
+ // it and grab its configuration properties.
+ ClassLoader cl = HQuorumPeer.class.getClassLoader();
+ final InputStream inputStream =
+ cl.getResourceAsStream(HConstants.ZOOKEEPER_CONFIG_NAME);
+ if (inputStream != null) {
+ try {
+ return parseZooCfg(conf, inputStream);
+ } catch (IOException e) {
+ LOG.warn("Cannot read " + HConstants.ZOOKEEPER_CONFIG_NAME +
+ ", loading from XML files", e);
+ }
+ }
+
+ // Otherwise, use the configuration options from HBase's XML files.
+ Properties zkProperties = new Properties();
+
+ // Directly map all of the hbase.zookeeper.property.KEY properties.
+ for (Entry<String, String> entry : conf) {
+ String key = entry.getKey();
+ if (key.startsWith(ZK_CFG_PROPERTY)) {
+ String zkKey = key.substring(ZK_CFG_PROPERTY_SIZE);
+ String value = entry.getValue();
+ // If the value has variables substitutions, need to do a get.
+ if (value.contains(VARIABLE_START)) {
+ value = conf.get(key);
+ }
+ zkProperties.put(zkKey, value);
+ }
+ }
+
+ // If clientPort is not set, assign the default
+ if (zkProperties.getProperty(ZK_CLIENT_PORT_KEY) == null) {
+ zkProperties.put(ZK_CLIENT_PORT_KEY,
+ HConstants.DEFAULT_ZOOKEPER_CLIENT_PORT);
+ }
+
+ // Create the server.X properties.
+ int peerPort = conf.getInt("hbase.zookeeper.peerport", 2888);
+ int leaderPort = conf.getInt("hbase.zookeeper.leaderport", 3888);
+
+ final String[] serverHosts = conf.getStrings(HConstants.ZOOKEEPER_QUORUM,
+ "localhost");
+ for (int i = 0; i < serverHosts.length; ++i) {
+ String serverHost = serverHosts[i];
+ String address = serverHost + ":" + peerPort + ":" + leaderPort;
+ String key = "server." + i;
+ zkProperties.put(key, address);
+ }
+
+ return zkProperties;
+ }
+
+ /**
+ * Parse ZooKeeper's zoo.cfg, injecting HBase Configuration variables in.
+ * This method is used for testing so we can pass our own InputStream.
+ * @param conf HBaseConfiguration to use for injecting variables.
+ * @param inputStream InputStream to read from.
+ * @return Properties parsed from config stream with variables substituted.
+ * @throws IOException if anything goes wrong parsing config
+ */
+ public static Properties parseZooCfg(Configuration conf,
+ InputStream inputStream) throws IOException {
+ Properties properties = new Properties();
+ try {
+ properties.load(inputStream);
+ } catch (IOException e) {
+ final String msg = "fail to read properties from "
+ + HConstants.ZOOKEEPER_CONFIG_NAME;
+ LOG.fatal(msg);
+ throw new IOException(msg, e);
+ }
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ String value = entry.getValue().toString().trim();
+ String key = entry.getKey().toString().trim();
+ StringBuilder newValue = new StringBuilder();
+ int varStart = value.indexOf(VARIABLE_START);
+ int varEnd = 0;
+ while (varStart != -1) {
+ varEnd = value.indexOf(VARIABLE_END, varStart);
+ if (varEnd == -1) {
+ String msg = "variable at " + varStart + " has no end marker";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+ String variable = value.substring(varStart + VARIABLE_START_LENGTH, varEnd);
+
+ String substituteValue = System.getProperty(variable);
+ if (substituteValue == null) {
+ substituteValue = conf.get(variable);
+ }
+ if (substituteValue == null) {
+ String msg = "variable " + variable + " not set in system property "
+ + "or hbase configs";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+
+ newValue.append(substituteValue);
+
+ varEnd += VARIABLE_END_LENGTH;
+ varStart = value.indexOf(VARIABLE_START, varEnd);
+ }
+ // Special case for 'hbase.cluster.distributed' property being 'true'
+ if (key.startsWith("server.")) {
+ if (conf.get(HConstants.CLUSTER_DISTRIBUTED).equals(HConstants.CLUSTER_IS_DISTRIBUTED)
+ && value.startsWith("localhost")) {
+ String msg = "The server in zoo.cfg cannot be set to localhost " +
+ "in a fully-distributed setup because it won't be reachable. " +
+ "See \"Getting Started\" for more information.";
+ LOG.fatal(msg);
+ throw new IOException(msg);
+ }
+ }
+ newValue.append(value.substring(varEnd));
+ properties.setProperty(key, newValue.toString());
+ }
+ return properties;
+ }
+
+ /**
+ * Return the ZK Quorum servers string given zk properties returned by
+ * makeZKProps
+ * @param properties
+ * @return
+ */
+ public static String getZKQuorumServersString(Properties properties) {
+ String clientPort = null;
+ List<String> servers = new ArrayList<String>();
+
+ // The clientPort option may come after the server.X hosts, so we need to
+ // grab everything and then create the final host:port comma separated list.
+ boolean anyValid = false;
+ for (Entry<Object,Object> property : properties.entrySet()) {
+ String key = property.getKey().toString().trim();
+ String value = property.getValue().toString().trim();
+ if (key.equals("clientPort")) {
+ clientPort = value;
+ }
+ else if (key.startsWith("server.")) {
+ String host = value.substring(0, value.indexOf(':'));
+ servers.add(host);
+ try {
+ //noinspection ResultOfMethodCallIgnored
+ InetAddress.getByName(host);
+ anyValid = true;
+ } catch (UnknownHostException e) {
+ LOG.warn(StringUtils.stringifyException(e));
+ }
+ }
+ }
+
+ if (!anyValid) {
+ LOG.error("no valid quorum servers found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+ return null;
+ }
+
+ if (clientPort == null) {
+ LOG.error("no clientPort found in " + HConstants.ZOOKEEPER_CONFIG_NAME);
+ return null;
+ }
+
+ if (servers.isEmpty()) {
+ LOG.fatal("No server.X lines found in conf/zoo.cfg. HBase must have a " +
+ "ZooKeeper cluster configured for its operation.");
+ return null;
+ }
+
+ StringBuilder hostPortBuilder = new StringBuilder();
+ for (int i = 0; i < servers.size(); ++i) {
+ String host = servers.get(i);
+ if (i > 0) {
+ hostPortBuilder.append(',');
+ }
+ hostPortBuilder.append(host);
+ hostPortBuilder.append(':');
+ hostPortBuilder.append(clientPort);
+ }
+
+ return hostPortBuilder.toString();
+ }
+
+ /**
+ * Return the ZK Quorum servers string given the specified configuration.
+ * @param properties
+ * @return
+ */
+ public static String getZKQuorumServersString(Configuration conf) {
+ return getZKQuorumServersString(makeZKProps(conf));
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java?rev=961028&r1=961027&r2=961028&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKServerTool.java Tue Jul 6 21:56:27 2010
@@ -20,12 +20,11 @@
package org.apache.hadoop.hbase.zookeeper;
+import java.util.Properties;
+import java.util.Map.Entry;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-
-import java.util.Map.Entry;
-import java.util.Properties;
/**
* Tool for reading ZooKeeper servers from HBase XML configuation and producing
@@ -41,7 +40,7 @@ public class ZKServerTool {
// Note that we do not simply grab the property
// HConstants.ZOOKEEPER_QUORUM from the HBaseConfiguration because the
// user may be using a zoo.cfg file.
- Properties zkProps = HQuorumPeer.makeZKProps(conf);
+ Properties zkProps = ZKConfig.makeZKProps(conf);
for (Entry<Object, Object> entry : zkProps.entrySet()) {
String key = entry.getKey().toString().trim();
String value = entry.getValue().toString().trim();
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Tue Jul 6 21:56:27 2010
@@ -0,0 +1,161 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Internal HBase utility class for ZooKeeper.
+ *
+ * Contains only static methods and constants.
+ */
+public class ZKUtil {
+ private static final Log LOG = LogFactory.getLog(ZKUtil.class);
+
+ // TODO: Replace this with ZooKeeper constant when ZOOKEEPER-277 is resolved.
+ private static final char ZNODE_PATH_SEPARATOR = '/';
+
+ /**
+ * Creates a new connection to ZooKeeper, pulling settings and quorum config
+ * from the specified configuration object using methods from {@link ZKConfig}.
+ *
+ * Sets the connection status monitoring watcher to the specified watcher.
+ *
+ * @param conf configuration to pull quorum and other settings from
+ * @param watcher watcher to monitor connection changes
+ * @return connection to zookeeper
+ * @throws IOException if unable to connect to zk or config problem
+ */
+ public static ZooKeeper connect(Configuration conf, Watcher watcher)
+ throws IOException {
+ Properties properties = ZKConfig.makeZKProps(conf);
+ String quorum = ZKConfig.getZKQuorumServersString(properties);
+ if(quorum == null) {
+ throw new IOException("Unable to determine ZooKeeper quorum");
+ }
+ int timeout = conf.getInt("zookeeper.session.timeout", 60 * 1000);
+ LOG.debug("Opening connection to ZooKeeper with quorum (" + quorum + ")");
+ return new ZooKeeper(quorum, timeout, watcher);
+ }
+
+ /**
+ * Join the prefix znode name with the suffix znode name to generate a proper
+ * full znode name.
+ *
+ * Assumes prefix does not end with slash and suffix does not begin with it.
+ *
+ * @param prefix beginning of znode name
+ * @param suffix ending of znode name
+ * @return result of properly joining prefix with suffix
+ */
+ public static String joinZNode(String prefix, String suffix) {
+ return prefix + ZNODE_PATH_SEPARATOR + suffix;
+ }
+
+ /**
+ * Watch the specified znode for delete/create/change events. The watcher is
+ * set whether or not the node exists. If the node already exists, the method
+ * returns true. If the node does not exist, the method returns false.
+ *
+ * @param zkw zk reference
+ * @param znode path of node to watch
+ * @return true if znode exists, false if does not exist or error
+ */
+ public static boolean watchAndCheckExists(ZooKeeperWatcher zkw, String znode) {
+ try {
+ Stat s = zkw.getZooKeeper().exists(znode, zkw);
+ zkw.debug("Set watcher on existing znode (" + znode + ")");
+ return s != null ? true : false;
+ } catch (KeeperException e) {
+ zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+ zkw.keeperException(e);
+ return false;
+ } catch (InterruptedException e) {
+ zkw.warn("Unable to set watcher on znode (" + znode + ")", e);
+ zkw.interruptedException(e);
+ return false;
+ }
+ }
+
+ /**
+ * Get the data at the specified znode and set a watch.
+ *
+ * Returns the data and sets a watch if the node exists. Returns null and no
+ * watch is set if the node does not exist or there is an exception.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @return data of the specified znode, or null
+ */
+ public static byte [] getDataAndWatch(ZooKeeperWatcher zkw, String znode) {
+ try {
+ byte [] data = zkw.getZooKeeper().getData(znode, zkw, null);
+ zkw.debug("Retrieved " + data.length + " bytes of data from znode (" +
+ znode + ") and set a watcher");
+ return data;
+ } catch (KeeperException.NoNodeException e) {
+ zkw.debug("Unable to get data of znode (" + znode + ") " +
+ "because node does not exist (not an error)");
+ return null;
+ } catch (KeeperException e) {
+ zkw.warn("Unable to get data of znode (" + znode + ")", e);
+ zkw.keeperException(e);
+ return null;
+ } catch (InterruptedException e) {
+ zkw.warn("Unable to get data of znode (" + znode + ")", e);
+ zkw.interruptedException(e);
+ return null;
+ }
+ }
+
+ /**
+ * Get the data at the specified znode, deserialize it as an HServerAddress,
+ * and set a watch.
+ *
+ * Returns the data as a server address and sets a watch if the node exists.
+ * Returns null and no watch is set if the node does not exist or there is an
+ * exception.
+ *
+ * @param zkw zk reference
+ * @param znode path of node
+ * @return data of the specified node as a server address, or null
+ */
+ public static HServerAddress getDataAsAddress(ZooKeeperWatcher zkw,
+ String znode) {
+ byte [] data = getDataAndWatch(zkw, znode);
+ if(data == null) {
+ return null;
+ }
+ String addrString = Bytes.toString(data);
+ zkw.debug("Read server address from znode (" + znode + "): " + addrString);
+ return new HServerAddress(addrString);
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperListener.java Tue Jul 6 21:56:27 2010
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+
+/**
+ * Base class for internal listeners of ZooKeeper events.
+ *
+ * The {@link ZooKeeperWatcher} for a process will execute the appropriate
+ * methods of implementations of this class. In order to receive events from
+ * the watcher, every listener must register itself via {@link ZooKeeperWatcher#registerListener}.
+ *
+ * Subclasses need only override those methods in which they are interested.
+ *
+ * Note that the watcher will be blocked when invoking methods in listeners so
+ * they must not be long-running.
+ */
+public class ZooKeeperListener {
+
+ // Reference to the zk watcher which also contains configuration and constants
+ protected ZooKeeperWatcher watcher;
+
+ /**
+ * Construct a ZooKeeper event listener.
+ * TODO: This should take ServerStatus which will contain ZKWatcher ref?
+ */
+ public ZooKeeperListener(ZooKeeperWatcher watcher) {
+ this.watcher = watcher;
+ }
+
+ /**
+ * Called when a new node has been created.
+ * @param path full path of the new node
+ */
+ public void nodeCreated(String path) {
+ // no-op
+ }
+
+ /**
+ * Called when a node has been deleted
+ * @param path full path of the deleted node
+ */
+ public void nodeDeleted(String path) {
+ // no-op
+ }
+
+ /**
+ * Called when an existing node has changed data.
+ * @param path full path of the updated node
+ */
+ public void nodeDataChanged(String path) {
+ // no-op
+ }
+
+ /**
+ * Called when an existing node has a child node added or removed.
+ * @param path full path of the node whose children have changed
+ */
+ public void nodeChildrenChanged(String path) {
+ // no-op
+ }
+}
\ No newline at end of file
Added: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java?rev=961028&view=auto
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java (added)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java Tue Jul 6 21:56:27 2010
@@ -0,0 +1,313 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.zookeeper;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerStatus;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * Acts as the single ZooKeeper Watcher. One instance of this is instantiated
+ * for each Master, RegionServer, and client process.
+ *
+ * This is the only class that implements {@link Watcher}. Other internal
+ * classes which need to be notified of ZooKeeper events must register with
+ * the local instance of this watcher via {@link #registerListener}.
+ *
+ * This class also holds and manages the connection to ZooKeeper. Code to deal
+ * with connection related events and exceptions are handled here.
+ */
+public class ZooKeeperWatcher extends ZooKeeperWrapper implements Watcher {
+ private static final Log LOG = LogFactory.getLog(ZooKeeperWatcher.class);
+
+ // name of this watcher (for logging only)
+ private String name;
+
+ // zookeeper connection
+ private ZooKeeper zooKeeper;
+
+ // server controller
+ private ServerStatus server;
+
+ // listeners to be notified
+ private final Set<ZooKeeperListener> listeners =
+ new CopyOnWriteArraySet<ZooKeeperListener>();
+
+ // node names
+
+ // base znode for this cluster
+ public String baseZNode;
+ // znode containing location of server hosting root region
+ public String rootServerZNode;
+ // znode containing ephemeral nodes of the regionservers
+ public String rsZNode;
+ // znode of currently active master
+ public String masterAddressZNode;
+ // znode containing the current cluster state
+ public String clusterStateZNode;
+ // znode used for region transitioning and assignment
+ public String assignmentZNode;
+
+ /**
+ * Instantiate a ZooKeeper connection and watcher.
+ * @param name name of this watcher, for logging/debug purposes only
+ * @throws IOException
+ */
+ public ZooKeeperWatcher(Configuration conf, String name, ServerStatus server)
+ throws IOException {
+ super(conf, name);
+ this.name = name;
+ this.zooKeeper = ZKUtil.connect(conf, this);
+ this.server = server;
+ info("Connected to ZooKeeper");
+ setNodeNames(conf);
+ }
+
+ /**
+ * Set the local variable node names using the specified configuration.
+ */
+ private void setNodeNames(Configuration conf) {
+ baseZNode = conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT,
+ HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
+ rootServerZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.rootserver", "root-region-server"));
+ rsZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.rs", "rs"));
+ masterAddressZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.master", "master"));
+ clusterStateZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.state", "shutdown"));
+ assignmentZNode = ZKUtil.joinZNode(baseZNode,
+ conf.get("zookeeper.znode.regionInTransition", "unassigned"));
+ }
+
+ /**
+ * Register the specified listener to receive ZooKeeper events.
+ * @param listener
+ */
+ public void registerListener(ZooKeeperListener listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * Get the connection to ZooKeeper.
+ * @return connection reference to zookeeper
+ */
+ public ZooKeeper getZooKeeper() {
+ return zooKeeper;
+ }
+
+ /**
+ * Method called from ZooKeeper for events and connection status.
+ *
+ * Valid events are passed along to listeners. Connection status changes
+ * are dealt with locally.
+ */
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.debug("<" + name + "> Received ZooKeeper Event, " +
+ "type: " + event.getType() + ", " +
+ "state:" + event.getState() + ", " +
+ "path: " + event.getPath());
+
+ // While we are still using both ZKWs, need to call parent process()
+ super.process(event);
+
+ switch(event.getType()) {
+
+ // If event type is NONE, this is a connection status change
+ case None: {
+ connectionEvent(event);
+ break;
+ }
+
+ // Otherwise pass along to the listeners
+
+ case NodeCreated: {
+ for(ZooKeeperListener listener : listeners) {
+ listener.nodeCreated(event.getPath());
+ }
+ break;
+ }
+
+ case NodeDeleted: {
+ for(ZooKeeperListener listener : listeners) {
+ listener.nodeDeleted(event.getPath());
+ }
+ break;
+ }
+
+ case NodeDataChanged: {
+ for(ZooKeeperListener listener : listeners) {
+ listener.nodeDataChanged(event.getPath());
+ }
+ break;
+ }
+
+ case NodeChildrenChanged: {
+ for(ZooKeeperListener listener : listeners) {
+ listener.nodeChildrenChanged(event.getPath());
+ }
+ break;
+ }
+ }
+ }
+
+ // Connection management
+
+ /**
+ * Called when there is a connection-related event via the Watcher callback.
+ *
+ * If Disconnected or Expired, this should shutdown the cluster.
+ *
+ * @param event
+ */
+ private void connectionEvent(WatchedEvent event) {
+ switch(event.getState()) {
+ // SyncConnected is normal, ignore
+ case SyncConnected:
+ break;
+
+ // Abort the server if Disconnected or Expired
+ // TODO: Ã
ny reason to handle these two differently?
+ case Disconnected:
+ case Expired:
+ error("Received Disconnected/Expired [" + event.getState() + "] " +
+ "from ZooKeeper, aborting server");
+ if(server != null) {
+ server.abortServer();
+ }
+ break;
+ }
+ }
+
+ /**
+ * Handles KeeperExceptions in client calls.
+ *
+ * This may be temporary but for now this gives one place to deal with these.
+ *
+ * TODO: Currently this method aborts the server.
+ *
+ * @param ke
+ */
+ public void keeperException(KeeperException ke) {
+ error("Received unexpected KeeperException, aborting server", ke);
+ server.abortServer();
+ }
+
+ /**
+ * Handles InterruptedExceptions in client calls.
+ *
+ * This may be temporary but for now this gives one place to deal with these.
+ *
+ * TODO: Currently, this method does nothing.
+ * Is this ever expected to happen? Do we abort or can we let it run?
+ *
+ * @param ie
+ */
+ public void interruptedException(InterruptedException ie) {
+ debug("Received InterruptedException, doing nothing here", ie);
+ // no-op
+ }
+
+ // Logging methods
+
+ /**
+ * Exposed info logging method so our zookeeper output is named.
+ * @param string log line
+ */
+ public void info(String string) {
+ LOG.info("<" + name + "> " + string);
+ }
+
+ /**
+ * Exposed debug logging method so our zookeeper output is named.
+ * @param string log line
+ */
+ public void debug(String string) {
+ LOG.debug("<" + name + "> " + string);
+ }
+
+ /**
+ * Exposed debug logging method so our zookeeper output is named.
+ * @param string log line
+ */
+ public void debug(String string, Throwable t) {
+ LOG.debug("<" + name + "> " + string, t);
+ }
+
+ /**
+ * Exposed warn logging method so our zookeeper output is named.
+ * @param string log line
+ */
+ public void warn(String string) {
+ LOG.warn("<" + name + "> " + string);
+ }
+
+ /**
+ * Exposed warn logging method so our zookeeper output is named.
+ * @param string log line
+ * @param t exception
+ */
+ public void warn(String string, Throwable t) {
+ LOG.warn("<" + name + "> " + string, t);
+ }
+
+ /**
+ * Exposed error logging method so our zookeeper output is named.
+ * @param string log line
+ */
+ public void error(String string) {
+ LOG.error("<" + name + "> " + string);
+ }
+
+ /**
+ * Exposed error logging method so our zookeeper output is named.
+ * @param string log line
+ * @param t exception
+ */
+ public void error(String string, Throwable t) {
+ LOG.error("<" + name + "> " + string, t);
+ }
+
+ /**
+ * Close the connection to ZooKeeper.
+ * @throws InterruptedException
+ */
+ public void close() {
+ try {
+ if(zooKeeper != null) {
+ zooKeeper.close();
+ super.close();
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+}
\ No newline at end of file