You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/04/28 01:12:44 UTC
svn commit: r1097275 [6/8] - in /hbase/trunk: ./ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/avro/
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/ src/main/java/o...
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Apr 27 23:12:42 2011
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.io.StringWriter;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@@ -34,7 +35,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.ClockOutO
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
@@ -70,11 +70,11 @@ import org.apache.hadoop.hbase.MasterAdd
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
@@ -139,6 +139,7 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.DNS;
import org.apache.zookeeper.KeeperException;
+import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
@@ -169,7 +170,6 @@ public class HRegionServer implements HR
// If false, the file system has become unavailable
protected volatile boolean fsOk;
- protected HServerInfo serverInfo;
protected final Configuration conf;
private final HConnection connection;
@@ -189,7 +189,6 @@ public class HRegionServer implements HR
new ConcurrentHashMap<String, HRegion>();
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final LinkedBlockingQueue<HMsg> outboundMsgs = new LinkedBlockingQueue<HMsg>();
final int numRetries;
protected final int threadWakeFrequency;
@@ -204,13 +203,16 @@ public class HRegionServer implements HR
// Server to handle client requests. Default access so can be accessed by
// unit tests.
- RpcServer server;
+ RpcServer rpcServer;
+
+ private final InetSocketAddress isa;
// Leases
private Leases leases;
- // Request counter
- private volatile AtomicInteger requestCount = new AtomicInteger();
+ // Request counter.
+ // Do we need this? Can't we just sum region counters? St.Ack 20110412
+ private AtomicInteger requestCount = new AtomicInteger();
// Info server. Default access so can be used by unit tests. REGIONSERVER
// is name of the webapp and the attribute name used stuffing this instance
@@ -248,7 +250,8 @@ public class HRegionServer implements HR
// flag set after we're done setting up server threads (used for testing)
protected volatile boolean isOnline;
- final Map<String, InternalScanner> scanners = new ConcurrentHashMap<String, InternalScanner>();
+ final Map<String, InternalScanner> scanners =
+ new ConcurrentHashMap<String, InternalScanner>();
// zookeeper connection and watcher
private ZooKeeperWatcher zooKeeper;
@@ -270,10 +273,6 @@ public class HRegionServer implements HR
private final int rpcTimeout;
- // The main region server thread.
- @SuppressWarnings("unused")
- private Thread regionServerThread;
-
// Instance of the hbase executor service.
private ExecutorService service;
@@ -283,87 +282,108 @@ public class HRegionServer implements HR
private final RegionServerAccounting regionServerAccounting;
/**
+ * The server name the Master sees us as. Its made from the hostname the
+ * master passes us, port, and server startcode. Gets set after registration
+ * against Master. The hostname can differ from the hostname in {@link #isa}
+ * but usually doesn't if both servers resolve .
+ */
+ private ServerName serverNameFromMasterPOV;
+
+ // Port we put up the webui on.
+ private int webuiport = -1;
+
+ /**
+ * This servers startcode.
+ */
+ private final long startcode;
+
+ /**
* Starts a HRegionServer at the default location
*
* @param conf
* @throws IOException
* @throws InterruptedException
*/
- public HRegionServer(Configuration conf) throws IOException, InterruptedException {
+ public HRegionServer(Configuration conf)
+ throws IOException, InterruptedException {
this.fsOk = true;
this.conf = conf;
this.connection = HConnectionManager.getConnection(conf);
this.isOnline = false;
-
- // check to see if the codec list is available:
- String [] codecs = conf.getStrings("hbase.regionserver.codecs",
- (String[])null);
- if (codecs != null) {
- for (String codec : codecs) {
- if (!CompressionTest.testCompression(codec)) {
- throw new IOException("Compression codec " + codec +
- " not supported, aborting RS construction");
- }
- }
- }
+ checkCodecs(this.conf);
// Config'ed params
this.numRetries = conf.getInt("hbase.client.retries.number", 10);
this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
- 10 * 1000);
+ 10 * 1000);
this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
- sleeper = new Sleeper(this.msgInterval, this);
+ this.sleeper = new Sleeper(this.msgInterval, this);
this.maxScannerResultSize = conf.getLong(
- HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
- HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+ HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+ HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.numRegionsToReport = conf.getInt(
- "hbase.regionserver.numregionstoreport", 10);
+ "hbase.regionserver.numregionstoreport", 10);
this.rpcTimeout = conf.getInt(
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.abortRequested = false;
this.stopped = false;
- // Server to handle client requests
- String machineName = DNS.getDefaultHost(conf.get(
- "hbase.regionserver.dns.interface", "default"), conf.get(
- "hbase.regionserver.dns.nameserver", "default"));
- String addressStr = machineName + ":" +
- conf.get(HConstants.REGIONSERVER_PORT,
- Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
- HServerAddress address = new HServerAddress(addressStr);
- this.server = HBaseRPC.getServer(this,
- new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
+ // Server to handle client requests.
+ String hostname = DNS.getDefaultHost(
+ conf.get("hbase.regionserver.dns.interface", "default"),
+ conf.get("hbase.regionserver.dns.nameserver", "default"));
+ int port = conf.getInt(HConstants.REGIONSERVER_PORT,
+ HConstants.DEFAULT_REGIONSERVER_PORT);
+ // Creation of a HSA will force a resolve.
+ InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
+ if (initialIsa.getAddress() == null) {
+ throw new IllegalArgumentException("Failed resolve of " + initialIsa);
+ }
+ this.rpcServer = HBaseRPC.getServer(this,
+ new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
OnlineRegions.class},
- address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+ initialIsa.getHostName(), // BindAddress is IP we got for this server.
+ initialIsa.getPort(),
+ conf.getInt("hbase.regionserver.handler.count", 10),
conf.getInt("hbase.regionserver.metahandler.count", 10),
- false, conf, QOS_THRESHOLD);
- this.server.setErrorHandler(this);
- this.server.setQosFunction(new QosFunction());
-
- // HServerInfo can be amended by master. See below in reportForDuty.
- this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(
- address.getBindAddress(), this.server.getListenerAddress().getPort())),
- System.currentTimeMillis(), this.conf.getInt(
- "hbase.regionserver.info.port", 60030), machineName);
- if (this.serverInfo.getServerAddress() == null) {
- throw new NullPointerException("Server address cannot be null; "
- + "hbase-958 debugging");
- }
+ conf.getBoolean("hbase.rpc.verbose", false),
+ conf, QOS_THRESHOLD);
+ // Set our address.
+ this.isa = this.rpcServer.getListenerAddress();
+
+ this.rpcServer.setErrorHandler(this);
+ this.rpcServer.setQosFunction(new QosFunction());
+ this.startcode = System.currentTimeMillis();
// login the server principal (if using secure Hadoop)
- User.login(conf, "hbase.regionserver.keytab.file",
- "hbase.regionserver.kerberos.principal", serverInfo.getHostname());
-
+ User.login(this.conf, "hbase.regionserver.keytab.file",
+ "hbase.regionserver.kerberos.principal", this.isa.getHostName());
regionServerAccounting = new RegionServerAccounting();
}
+ /**
+ * Run test on configured codecs to make sure supporting libs are in place.
+ * @param c
+ * @throws IOException
+ */
+ private static void checkCodecs(final Configuration c) throws IOException {
+ // check to see if the codec list is available:
+ String [] codecs = c.getStrings("hbase.regionserver.codecs", (String[])null);
+ if (codecs == null) return;
+ for (String codec : codecs) {
+ if (!CompressionTest.testCompression(codec)) {
+ throw new IOException("Compression codec " + codec +
+ " not supported, aborting RS construction");
+ }
+ }
+ }
+
private static final int NORMAL_QOS = 0;
private static final int QOS_THRESHOLD = 10; // the line between low and high qos
private static final int HIGH_QOS = 100;
@@ -373,6 +393,10 @@ public class HRegionServer implements HR
int priority() default 0;
}
+ /**
+ * Utility used ensuring higher quality of service for priority rpcs; e.g.
+ * rpcs to .META. and -ROOT-, etc.
+ */
class QosFunction implements Function<Writable,Integer> {
private final Map<String, Integer> annotatedQos;
@@ -441,7 +465,7 @@ public class HRegionServer implements HR
return HIGH_QOS;
}
} else if (inv.getParameterClasses()[0] == MultiAction.class) {
- MultiAction ma = (MultiAction) inv.getParameters()[0];
+ MultiAction<?> ma = (MultiAction<?>) inv.getParameters()[0];
Set<byte[]> regions = ma.getRegions();
// ok this sucks, but if any single of the actions touches a meta, the
// whole
@@ -464,14 +488,13 @@ public class HRegionServer implements HR
}
/**
- * Creates all of the state that needs to be reconstructed in case we are
- * doing a restart. This is shared between the constructor and restart(). Both
- * call it.
+ * All initialization needed before we go register with Master.
*
* @throws IOException
* @throws InterruptedException
*/
- private void initialize() throws IOException, InterruptedException {
+ private void preRegistrationInitialization()
+ throws IOException, InterruptedException {
try {
initializeZooKeeper();
initializeThreads();
@@ -483,7 +506,7 @@ public class HRegionServer implements HR
// Call stop if error or process will stick around for ever since server
// puts up non-daemon threads.
LOG.error("Stopping HRS because failed initialize", t);
- this.server.stop();
+ this.rpcServer.stop();
}
}
@@ -497,8 +520,8 @@ public class HRegionServer implements HR
*/
private void initializeZooKeeper() throws IOException, InterruptedException {
// Open connection to zookeeper and set primary watcher
- zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
- serverInfo.getServerAddress().getPort(), this);
+ this.zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
+ this.isa.getPort(), this);
// Create the master address manager, register with zk, and start it. Then
// block until a master is available. No point in starting up if no master
@@ -520,7 +543,7 @@ public class HRegionServer implements HR
// Create the log splitting worker and start it
this.splitLogWorker = new SplitLogWorker(this.zooKeeper,
- this.getConfiguration(), this.getServerName());
+ this.getConfiguration(), this.getServerName().toString());
splitLogWorker.start();
}
@@ -548,7 +571,6 @@ public class HRegionServer implements HR
}
private void initializeThreads() throws IOException {
-
// Cache flushing thread.
this.cacheFlusher = new MemStoreFlusher(conf, this);
@@ -557,10 +579,10 @@ public class HRegionServer implements HR
// Background thread to check for major compactions; needed if region
// has not gotten updates in a while. Make it run at a lesser frequency.
- int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY
- + ".multiplier", 1000);
+ int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY +
+ ".multiplier", 1000);
this.majorCompactionChecker = new MajorCompactionChecker(this,
- this.threadWakeFrequency * multiplier, this);
+ this.threadWakeFrequency * multiplier, this);
this.leases = new Leases((int) conf.getLong(
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
@@ -569,28 +591,30 @@ public class HRegionServer implements HR
}
/**
- * The HRegionServer sticks in this loop until closed. It repeatedly checks in
- * with the HMaster, sending heartbeats & reports, and receiving HRegion
- * load/unload instructions.
+ * The HRegionServer sticks in this loop until closed.
*/
public void run() {
-
try {
- // Initialize threads and wait for a master
- initialize();
+ // Do pre-registration initializations; zookeeper, lease threads, etc.
+ preRegistrationInitialization();
} catch (Exception e) {
abort("Fatal exception during initialization", e);
}
- this.regionServerThread = Thread.currentThread();
try {
+ // Try and register with the Master; tell it we are here.
while (!this.stopped) {
if (tryReportForDuty()) break;
+ LOG.warn("No response on reportForDuty. Sleeping and then retrying.");
+ this.sleeper.sleep();
}
+
+ // We registered with the Master. Go into run mode.
long lastMsg = 0;
- List<HMsg> outboundMessages = new ArrayList<HMsg>();
+ boolean onlyMetaRegionsRemaining = false;
+ long oldRequestCount = -1;
// The main run loop.
- for (int tries = 0; !this.stopped && isHealthy();) {
+ while (!this.stopped && isHealthy()) {
if (!isClusterUp()) {
if (isOnlineRegionsEmpty()) {
stop("Exiting; cluster shutdown set and not carrying any regions");
@@ -598,56 +622,37 @@ public class HRegionServer implements HR
this.stopping = true;
closeUserRegions(this.abortRequested);
} else if (this.stopping && LOG.isDebugEnabled()) {
+ if (!onlyMetaRegionsRemaining) {
+ onlyMetaRegionsRemaining = isOnlyMetaRegionsRemaining();
+ }
+ if (onlyMetaRegionsRemaining) {
+ // Set stopped if no requests since last time we went around the loop.
+ // The remaining meta regions will be closed on our way out.
+ if (oldRequestCount == this.requestCount.get()) {
+ stop("Stopped; only catalog regions remaining online");
+ break;
+ }
+ oldRequestCount = this.requestCount.get();
+ }
LOG.debug("Waiting on " + getOnlineRegionsAsPrintableString());
}
}
long now = System.currentTimeMillis();
- // Drop into the send loop if msgInterval has elapsed or if something
- // to send. If we fail talking to the master, then we'll sleep below
- // on poll of the outboundMsgs blockingqueue.
- if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
- try {
- doMetrics();
- tryRegionServerReport(outboundMessages);
- lastMsg = System.currentTimeMillis();
- // Reset tries count if we had a successful transaction.
- tries = 0;
- if (this.stopped) continue;
- } catch (Exception e) { // FindBugs REC_CATCH_EXCEPTION
- // Two special exceptions could be printed out here,
- // PleaseHoldException and YouAreDeadException
- if (e instanceof IOException) {
- e = RemoteExceptionHandler.checkIOException((IOException) e);
- }
- if (e instanceof YouAreDeadException) {
- // This will be caught and handled as a fatal error below
- throw e;
- }
- tries++;
- if (tries > 0 && (tries % this.numRetries) == 0) {
- // Check filesystem every so often.
- checkFileSystem();
- }
- if (this.stopped) {
- continue;
- }
- LOG.warn("Attempt=" + tries, e);
- // No point retrying immediately; this is probably connection to
- // master issue. Doing below will cause us to sleep.
- lastMsg = System.currentTimeMillis();
- }
+ if ((now - lastMsg) >= msgInterval) {
+ doMetrics();
+ tryRegionServerReport();
+ lastMsg = System.currentTimeMillis();
}
- now = System.currentTimeMillis();
- HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)), TimeUnit.MILLISECONDS);
- if (msg != null) outboundMessages.add(msg);
+ if (!this.stopped) this.sleeper.sleep();
} // for
} catch (Throwable t) {
if (!checkOOME(t)) {
abort("Unhandled exception: " + t.getMessage(), t);
}
}
+ // Run shutdown.
this.leases.closeAfterLeasesExpire();
- this.server.stop();
+ this.rpcServer.stop();
if (this.splitLogWorker != null) {
splitLogWorker.stop();
}
@@ -673,18 +678,18 @@ public class HRegionServer implements HR
if (this.majorCompactionChecker != null) this.majorCompactionChecker.interrupt();
if (this.killed) {
- // Just skip out w/o closing regions.
+ // Just skip out w/o closing regions. Used when testing.
} else if (abortRequested) {
if (this.fsOk) {
closeAllRegions(abortRequested); // Don't leave any open file handles
closeWAL(false);
}
- LOG.info("aborting server at: " + this.serverInfo.getServerName());
+ LOG.info("aborting server " + this.serverNameFromMasterPOV);
} else {
closeAllRegions(abortRequested);
closeWAL(true);
closeAllScanners();
- LOG.info("stopping server at: " + this.serverInfo.getServerName());
+ LOG.info("stopping server " + this.serverNameFromMasterPOV);
}
// Interrupt catalog tracker here in case any regions being opened out in
// handlers are stuck waiting on meta or root.
@@ -697,6 +702,11 @@ public class HRegionServer implements HR
this.hbaseMaster = null;
}
this.leases.close();
+ try {
+ deleteMyEphemeralNode();
+ } catch (KeeperException e) {
+ LOG.warn("Failed deleting my ephemeral node", e);
+ }
HConnectionManager.deleteConnection(conf, true);
this.zooKeeper.close();
if (!killed) {
@@ -705,6 +715,57 @@ public class HRegionServer implements HR
LOG.info(Thread.currentThread().getName() + " exiting");
}
+ private boolean isOnlyMetaRegionsRemaining() {
+ if (getNumberOfOnlineRegions() > 2) return false;
+ boolean onlyMetaRegionsRemaining = false;
+ for (Map.Entry<String, HRegion> e: this.onlineRegions.entrySet()) {
+ if (!e.getValue().getRegionInfo().isMetaRegion()) {
+ onlyMetaRegionsRemaining = false;
+ break;
+ }
+ onlyMetaRegionsRemaining = true;
+ }
+ return onlyMetaRegionsRemaining;
+ }
+
+ void tryRegionServerReport()
+ throws IOException {
+ HServerLoad hsl = buildServerLoad();
+ // Why we do this?
+ this.requestCount.set(0);
+ while (!this.stopped) {
+ try {
+ this.hbaseMaster.regionServerReport(this.serverNameFromMasterPOV.getBytes(), hsl);
+ break;
+ } catch (IOException ioe) {
+ if (ioe instanceof RemoteException) {
+ ioe = ((RemoteException)ioe).unwrapRemoteException();
+ }
+ if (ioe instanceof YouAreDeadException) {
+ // This will be caught and handled as a fatal error in run()
+ throw ioe;
+ }
+ // Couldn't connect to the master, get location from zk and reconnect
+ // Method blocks until new master is found or we are stopped
+ getMaster();
+ }
+ }
+ }
+
+ HServerLoad buildServerLoad() {
+ Collection<HRegion> regions = getOnlineRegionsLocalContext();
+ TreeMap<byte [], HServerLoad.RegionLoad> regionLoads =
+ new TreeMap<byte [], HServerLoad.RegionLoad>(Bytes.BYTES_COMPARATOR);
+ for (HRegion region: regions) {
+ regionLoads.put(region.getRegionName(), createRegionLoad(region));
+ }
+ MemoryUsage memory =
+ ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ return new HServerLoad(requestCount.get(),
+ (int)(memory.getUsed() / 1024 / 1024),
+ (int) (memory.getMax() / 1024 / 1024), regionLoads);
+ }
+
String getOnlineRegionsAsPrintableString() {
StringBuilder sb = new StringBuilder();
for (HRegion r: this.onlineRegions.values()) {
@@ -736,47 +797,6 @@ public class HRegionServer implements HR
}
}
- List<HMsg> tryRegionServerReport(final List<HMsg> outboundMessages)
- throws IOException {
- this.serverInfo.setLoad(buildServerLoad());
- this.requestCount.set(0);
- addOutboundMsgs(outboundMessages);
- HMsg [] msgs = null;
- while (!this.stopped) {
- try {
- msgs = this.hbaseMaster.regionServerReport(this.serverInfo,
- outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
- getMostLoadedRegions());
- break;
- } catch (IOException ioe) {
- if (ioe instanceof RemoteException) {
- ioe = ((RemoteException)ioe).unwrapRemoteException();
- }
- if (ioe instanceof YouAreDeadException) {
- // This will be caught and handled as a fatal error in run()
- throw ioe;
- }
- // Couldn't connect to the master, get location from zk and reconnect
- // Method blocks until new master is found or we are stopped
- getMaster();
- }
- }
- updateOutboundMsgs(outboundMessages);
- outboundMessages.clear();
- return outboundMessages;
- }
-
- private HServerLoad buildServerLoad() {
- MemoryUsage memory = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
- HServerLoad hsl = new HServerLoad(requestCount.get(),
- (int)(memory.getUsed() / 1024 / 1024),
- (int) (memory.getMax() / 1024 / 1024));
- for (HRegion r : this.onlineRegions.values()) {
- hsl.addRegionInfo(createRegionLoad(r));
- }
- return hsl;
- }
-
private void closeWAL(final boolean delete) {
try {
if (this.hlog != null) {
@@ -804,61 +824,23 @@ public class HRegionServer implements HR
}
/*
- * Add to the passed <code>msgs</code> messages to pass to the master.
- *
- * @param msgs Current outboundMsgs array; we'll add messages to this List.
- */
- private void addOutboundMsgs(final List<HMsg> msgs) {
- if (msgs.isEmpty()) {
- this.outboundMsgs.drainTo(msgs);
- return;
- }
- OUTER: for (HMsg m : this.outboundMsgs) {
- for (HMsg mm : msgs) {
- // Be careful don't add duplicates.
- if (mm.equals(m)) {
- continue OUTER;
- }
- }
- msgs.add(m);
- }
- }
-
- /*
- * Remove from this.outboundMsgs those messsages we sent the master.
- *
- * @param msgs Messages we sent the master.
- */
- private void updateOutboundMsgs(final List<HMsg> msgs) {
- if (msgs.isEmpty()) {
- return;
- }
- for (HMsg m : this.outboundMsgs) {
- for (HMsg mm : msgs) {
- if (mm.equals(m)) {
- this.outboundMsgs.remove(m);
- break;
- }
- }
- }
- }
-
- /*
* Run init. Sets up hlog and starts up all server threads.
*
* @param c Extra configuration.
*/
- protected void handleReportForDutyResponse(final MapWritable c) throws IOException {
+ protected void handleReportForDutyResponse(final MapWritable c)
+ throws IOException {
try {
- for (Map.Entry<Writable, Writable> e : c.entrySet()) {
-
+ for (Map.Entry<Writable, Writable> e :c.entrySet()) {
String key = e.getKey().toString();
- // Use the address the master passed us
- if (key.equals("hbase.regionserver.address")) {
- HServerAddress hsa = (HServerAddress) e.getValue();
- LOG.info("Master passed us address to use. Was="
- + this.serverInfo.getServerAddress() + ", Now=" + hsa.toString());
- this.serverInfo.setServerAddress(hsa);
+ // The hostname the master sees us as.
+ if (key.equals(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER)) {
+ String hostnameFromMasterPOV = e.getValue().toString();
+ this.serverNameFromMasterPOV = new ServerName(hostnameFromMasterPOV,
+ this.isa.getPort(), this.startcode);
+ LOG.info("Master passed us hostname to use. Was=" +
+ this.isa.getHostName() + ", Now=" +
+ this.serverNameFromMasterPOV.getHostname());
continue;
}
String value = e.getValue().toString();
@@ -867,14 +849,15 @@ public class HRegionServer implements HR
}
this.conf.set(key, value);
}
-
+
// hack! Maps DFSClient => RegionServer for logs. HDFS made this
// config param for task trackers, but we can piggyback off of it.
if (this.conf.get("mapred.task.id") == null) {
- this.conf.set("mapred.task.id",
- "hb_rs_" + this.serverInfo.getServerName() + "_" +
- System.currentTimeMillis());
+ this.conf.set("mapred.task.id", "hb_rs_" +
+ this.serverNameFromMasterPOV.toString());
}
+ // Set our ephemeral znode up in zookeeper now we have a name.
+ createMyEphemeralNode();
// Master sent us hbase.rootdir to use. Should be fully qualified
// path with file system specification included. Set 'fs.defaultFS'
@@ -889,8 +872,8 @@ public class HRegionServer implements HR
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
startServiceThreads();
- LOG.info("Serving as " + this.serverInfo.getServerName() +
- ", RPC listening on " + this.server.getListenerAddress() +
+ LOG.info("Serving as " + this.serverNameFromMasterPOV +
+ ", RPC listening on " + this.isa +
", sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
isOnline = true;
@@ -902,10 +885,23 @@ public class HRegionServer implements HR
}
}
+ private String getMyEphemeralNodePath() {
+ return ZKUtil.joinZNode(this.zooKeeper.rsZNode, getServerName().toString());
+ }
+
+ private void createMyEphemeralNode() throws KeeperException {
+ ZKUtil.createEphemeralNodeAndWatch(this.zooKeeper, getMyEphemeralNodePath(),
+ HConstants.EMPTY_BYTE_ARRAY);
+ }
+
+ private void deleteMyEphemeralNode() throws KeeperException {
+ ZKUtil.deleteNode(this.zooKeeper, getMyEphemeralNodePath());
+ }
+
public RegionServerAccounting getRegionServerAccounting() {
return regionServerAccounting;
}
-
+
/*
* @param r Region to get RegionLoad for.
*
@@ -1090,14 +1086,12 @@ public class HRegionServer implements HR
*/
private HLog setupWALAndReplication() throws IOException {
final Path oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- Path logdir = new Path(rootDir, HLog.getHLogDirectoryName(this.serverInfo));
- if (LOG.isDebugEnabled()) {
- LOG.debug("logdir=" + logdir);
- }
+ Path logdir = new Path(rootDir,
+ HLog.getHLogDirectoryName(this.serverNameFromMasterPOV.toString()));
+ if (LOG.isDebugEnabled()) LOG.debug("logdir=" + logdir);
if (this.fs.exists(logdir)) {
- throw new RegionServerRunningException("Region server already "
- + "running at " + this.serverInfo.getServerName()
- + " because logdir " + logdir.toString() + " exists");
+ throw new RegionServerRunningException("Region server has already " +
+ "created directory at " + this.serverNameFromMasterPOV.toString());
}
// Instantiate replication manager if replication enabled. Pass it the
@@ -1120,7 +1114,7 @@ public class HRegionServer implements HR
*/
protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
return new HLog(this.fs, logdir, oldLogDir, this.conf,
- getWALActionListeners(), this.serverInfo.getServerAddress().toString());
+ getWALActionListeners(), this.serverNameFromMasterPOV.toString());
}
/**
@@ -1240,7 +1234,7 @@ public class HRegionServer implements HR
};
// Start executor services
- this.service = new ExecutorService(getServerName());
+ this.service = new ExecutorService(getServerName().toString());
this.service.startExecutorService(ExecutorType.RS_OPEN_REGION,
conf.getInt("hbase.regionserver.executor.openregion.threads", 3));
this.service.startExecutorService(ExecutorType.RS_OPEN_ROOT,
@@ -1256,46 +1250,20 @@ public class HRegionServer implements HR
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
- handler);
+ handler);
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
- handler);
- Threads.setDaemonThreadRunning(this.majorCompactionChecker, n
- + ".majorCompactionChecker", handler);
+ handler);
+ Threads.setDaemonThreadRunning(this.majorCompactionChecker, n +
+ ".majorCompactionChecker", handler);
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
this.leases.setName(n + ".leaseChecker");
this.leases.start();
- // Put up info server.
- int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
- // -1 is for disabling info server
- if (port >= 0) {
- String addr = this.conf.get("hbase.regionserver.info.bindAddress",
- "0.0.0.0");
- // check if auto port bind enabled
- boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto",
- false);
- while (true) {
- try {
- this.infoServer = new InfoServer("regionserver", addr, port, false);
- this.infoServer.setAttribute("regionserver", this);
- this.infoServer.start();
- break;
- } catch (BindException e) {
- if (!auto) {
- // auto bind disabled throw BindException
- throw e;
- }
- // auto bind enabled, try to use another port
- LOG.info("Failed binding http info server to port: " + port);
- port++;
- // update HRS server info port.
- this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
- this.serverInfo.getStartCode(), port,
- this.serverInfo.getHostname());
- }
- }
- }
+
+ // Put up the webui. Webui may come up on port other than configured if
+ // that port is occupied. Adjust serverInfo if this is the case.
+ this.webuiport = putUpWebUI();
if (this.replicationHandler != null) {
this.replicationHandler.startReplicationServices();
@@ -1303,7 +1271,38 @@ public class HRegionServer implements HR
// Start Server. This service is like leases in that it internally runs
// a thread.
- this.server.start();
+ this.rpcServer.start();
+ }
+
+ /**
+ * Puts up the webui.
+ * @return Returns final port -- maybe different from what we started with.
+ * @throws IOException
+ */
+ private int putUpWebUI() throws IOException {
+ int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
+ // -1 is for disabling info server
+ if (port < 0) return port;
+ String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
+ // check if auto port bind enabled
+ boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto", false);
+ while (true) {
+ try {
+ this.infoServer = new InfoServer("regionserver", addr, port, false);
+ this.infoServer.setAttribute("regionserver", this);
+ this.infoServer.start();
+ break;
+ } catch (BindException e) {
+ if (!auto) {
+ // auto bind disabled throw BindException
+ throw e;
+ }
+ // auto bind enabled, try to use another port
+ LOG.info("Failed binding http info server to port: " + port);
+ port++;
+ }
+ }
+ return port;
}
/*
@@ -1361,15 +1360,18 @@ public class HRegionServer implements HR
// Update ZK, ROOT or META
if (r.getRegionInfo().isRootRegion()) {
RootLocationEditor.setRootLocation(getZooKeeper(),
- getServerInfo().getServerAddress());
+ this.serverNameFromMasterPOV);
} else if (r.getRegionInfo().isMetaRegion()) {
- MetaEditor.updateMetaLocation(ct, r.getRegionInfo(), getServerInfo());
+ MetaEditor.updateMetaLocation(ct, r.getRegionInfo(),
+ this.serverNameFromMasterPOV);
} else {
if (daughter) {
// If daughter of a split, update whole row, not just location.
- MetaEditor.addDaughter(ct, r.getRegionInfo(), getServerInfo());
+ MetaEditor.addDaughter(ct, r.getRegionInfo(),
+ this.serverNameFromMasterPOV);
} else {
- MetaEditor.updateRegionLocation(ct, r.getRegionInfo(), getServerInfo());
+ MetaEditor.updateRegionLocation(ct, r.getRegionInfo(),
+ this.serverNameFromMasterPOV);
}
}
}
@@ -1379,7 +1381,7 @@ public class HRegionServer implements HR
* @return Metrics instance.
*/
public HBaseRpcMetrics getRpcMetrics() {
- return server.getRpcMetrics();
+ return rpcServer.getRpcMetrics();
}
/**
@@ -1444,22 +1446,29 @@ public class HRegionServer implements HR
* Method will block until a master is available. You can break from this
* block by requesting the server stop.
*
- * @return master address, or null if server has been stopped
+ * @return master + port, or null if server has been stopped
*/
- private HServerAddress getMaster() {
- HServerAddress masterAddress = null;
+ private ServerName getMaster() {
+ ServerName masterServerName = null;
+ while ((masterServerName = this.masterAddressManager.getMasterAddress()) == null) {
+ if (stopped) {
+ return null;
+ }
+ LOG.debug("No master found, will retry");
+ sleeper.sleep();
+ }
+ InetSocketAddress isa =
+ new InetSocketAddress(masterServerName.getHostname(), masterServerName.getPort());
HMasterRegionInterface master = null;
-
while (!stopped && master == null) {
-
- masterAddress = getMasterAddress();
- LOG.info("Attempting connect to Master server at " + masterAddress);
+ LOG.info("Attempting connect to Master server at " +
+ this.masterAddressManager.getMasterAddress());
try {
// Do initial RPC setup. The final argument indicates that the RPC
// should retry indefinitely.
master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
HMasterRegionInterface.class, HMasterRegionInterface.VERSION,
- masterAddress.getInetSocketAddress(), this.conf, -1,
+ isa, this.conf, -1,
this.rpcTimeout, this.rpcTimeout);
} catch (IOException e) {
e = e instanceof RemoteException ?
@@ -1472,23 +1481,11 @@ public class HRegionServer implements HR
sleeper.sleep();
}
}
- LOG.info("Connected to master at " + masterAddress);
+ LOG.info("Connected to master at " + isa);
this.hbaseMaster = master;
- return masterAddress;
+ return masterServerName;
}
- private HServerAddress getMasterAddress() {
- HServerAddress masterAddress = null;
- while ((masterAddress = masterAddressManager.getMasterAddress()) == null) {
- if (stopped) {
- return null;
- }
- LOG.debug("No master found, will retry");
- sleeper.sleep();
- }
- return masterAddress;
- }
-
/**
* @return True if successfully invoked {@link #reportForDuty()}
* @throws IOException
@@ -1499,35 +1496,32 @@ public class HRegionServer implements HR
handleReportForDutyResponse(w);
return true;
}
- sleeper.sleep();
- LOG.warn("No response on reportForDuty. Sleeping and then retrying.");
return false;
}
/*
* Let the master know we're here Run initialization using parameters passed
* us by the master.
+ * @return A Map of key/value configurations we got from the Master else
+ * null if we failed to register.
+ * @throws IOException
*/
private MapWritable reportForDuty() throws IOException {
- HServerAddress masterAddress = null;
- while (!stopped && (masterAddress = getMaster()) == null) {
+ ServerName masterServerName = null;
+ while (!stopped && (masterServerName = getMaster()) == null) {
+ LOG.warn("Unable to get master for initialization -- sleeping");
sleeper.sleep();
- LOG.warn("Unable to get master for initialization");
}
-
MapWritable result = null;
long lastMsg = 0;
while (!stopped) {
try {
this.requestCount.set(0);
- lastMsg = System.currentTimeMillis();
- ZKUtil.setAddressAndWatch(zooKeeper,
- ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)),
- this.serverInfo.getServerAddress());
- this.serverInfo.setLoad(buildServerLoad());
- LOG.info("Telling master at " + masterAddress + " that we are up");
- result = this.hbaseMaster.regionServerStartup(this.serverInfo,
- EnvironmentEdgeManager.currentTimeMillis());
+ LOG.info("Telling master at " + masterServerName + " that we are up " +
+ "with port=" + this.isa.getPort() + ", startcode=" + this.startcode);
+ lastMsg = EnvironmentEdgeManager.currentTimeMillis();
+ int port = this.isa.getPort();
+ result = this.hbaseMaster.regionServerStartup(port, this.startcode, lastMsg);
break;
} catch (RemoteException e) {
IOException ioe = e.unwrapRemoteException();
@@ -1541,8 +1535,6 @@ public class HRegionServer implements HR
}
} catch (IOException e) {
LOG.warn("error telling master we are up", e);
- } catch (KeeperException e) {
- LOG.warn("error putting up ephemeral node in zookeeper", e);
}
sleeper.sleep(lastMsg);
}
@@ -2419,9 +2411,7 @@ public class HRegionServer implements HR
}
public int getNumberOfOnlineRegions() {
- int size = -1;
- size = this.onlineRegions.size();
- return size;
+ return this.onlineRegions.size();
}
boolean isOnlineRegionsEmpty() {
@@ -2429,6 +2419,43 @@ public class HRegionServer implements HR
}
/**
+ * @param encodedRegionName
+ * @return JSON Map of labels to values for passed in <code>encodedRegionName</code>
+ * @throws IOException
+ */
+ public byte [] getRegionStats(final String encodedRegionName)
+ throws IOException {
+ HRegion r = null;
+ synchronized (this.onlineRegions) {
+ r = this.onlineRegions.get(encodedRegionName);
+ }
+ if (r == null) return null;
+ ObjectMapper mapper = new ObjectMapper();
+ int stores = 0;
+ int storefiles = 0;
+ int storefileSizeMB = 0;
+ int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
+ int storefileIndexSizeMB = 0;
+ synchronized (r.stores) {
+ stores += r.stores.size();
+ for (Store store : r.stores.values()) {
+ storefiles += store.getStorefilesCount();
+ storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
+ storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
+ }
+ }
+ Map<String, Integer> map = new TreeMap<String, Integer>();
+ map.put("stores", stores);
+ map.put("storefiles", storefiles);
+ map.put("storefileSizeMB", storefileIndexSizeMB);
+ map.put("memstoreSizeMB", memstoreSizeMB);
+ StringWriter w = new StringWriter();
+ mapper.writeValue(w, map);
+ w.close();
+ return Bytes.toBytes(w.toString());
+ }
+
+ /**
* For tests and web ui.
* This method will only work if HRegionServer is in the same JVM as client;
* HRegion cannot be serialized to cross an rpc.
@@ -2554,18 +2581,11 @@ public class HRegionServer implements HR
@Override
@QosPriority(priority=HIGH_QOS)
public long getProtocolVersion(final String protocol, final long clientVersion)
- throws IOException {
+ throws IOException {
if (protocol.equals(HRegionInterface.class.getName())) {
return HRegionInterface.VERSION;
}
- throw new IOException("Unknown protocol to name node: " + protocol);
- }
-
- /**
- * @return Queue to which you can add outbound messages.
- */
- protected LinkedBlockingQueue<HMsg> getOutboundMsgs() {
- return this.outboundMsgs;
+ throw new IOException("Unknown protocol: " + protocol);
}
/**
@@ -2590,13 +2610,18 @@ public class HRegionServer implements HR
}
/**
- * @return Info on port this server has bound to, etc.
+ * @return This servers {@link HServerInfo}
*/
+ // TODO: Deprecate and do getServerName instead.
public HServerInfo getServerInfo() {
- return this.serverInfo;
+ try {
+ return getHServerInfo();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ return null;
}
-
@Override
public Result increment(byte[] regionName, Increment increment)
throws IOException {
@@ -2661,11 +2686,14 @@ public class HRegionServer implements HR
}
}
- /** {@inheritDoc} */
+ /** {@inheritDoc}
+ * @deprecated Use {@link #getServerName()} instead.
+ */
@Override
@QosPriority(priority=HIGH_QOS)
public HServerInfo getHServerInfo() throws IOException {
- return serverInfo;
+ return new HServerInfo(new HServerAddress(this.isa),
+ this.startcode, this.webuiport);
}
@SuppressWarnings("unchecked")
@@ -2680,8 +2708,8 @@ public class HRegionServer implements HR
// actions in the list.
Collections.sort(actionsForRegion);
Row action;
- List<Action> puts = new ArrayList<Action>();
- for (Action a : actionsForRegion) {
+ List<Action<R>> puts = new ArrayList<Action<R>>();
+ for (Action<R> a : actionsForRegion) {
action = a.getAction();
int originalIndex = a.getOriginalIndex();
@@ -2722,7 +2750,7 @@ public class HRegionServer implements HR
List<Pair<Put,Integer>> putsWithLocks =
Lists.newArrayListWithCapacity(puts.size());
- for (Action a : puts) {
+ for (Action<R> a : puts) {
Put p = (Put) a.getAction();
Integer lock;
@@ -2743,7 +2771,7 @@ public class HRegionServer implements HR
for( int i = 0 ; i < codes.length ; i++) {
OperationStatusCode code = codes[i];
- Action theAction = puts.get(i);
+ Action<R> theAction = puts.get(i);
Object result = null;
if (code == OperationStatusCode.SUCCESS) {
@@ -2757,7 +2785,7 @@ public class HRegionServer implements HR
}
} catch (IOException ioe) {
// fail all the puts with the ioe in question.
- for (Action a: puts) {
+ for (Action<R> a: puts) {
response.add(regionName, a.getOriginalIndex(), ioe);
}
}
@@ -2815,7 +2843,7 @@ public class HRegionServer implements HR
}
public String toString() {
- return this.serverInfo.toString();
+ return getServerName().toString();
}
/**
@@ -2833,8 +2861,11 @@ public class HRegionServer implements HR
}
@Override
- public String getServerName() {
- return serverInfo.getServerName();
+ public ServerName getServerName() {
+ // Our servername could change after we talk to the master.
+ return this.serverNameFromMasterPOV == null?
+ new ServerName(this.isa.getHostName(), this.isa.getPort(), this.startcode):
+ this.serverNameFromMasterPOV;
}
@Override
@@ -2862,8 +2893,7 @@ public class HRegionServer implements HR
*/
public static Thread startRegionServer(final HRegionServer hrs)
throws IOException {
- return startRegionServer(hrs, "regionserver"
- + hrs.getServerInfo().getServerAddress().getPort());
+ return startRegionServer(hrs, "regionserver" + hrs.isa.getPort());
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Wed Apr 27 23:12:42 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.ipc.HBaseRpcMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
@@ -52,12 +51,6 @@ public interface RegionServerServices ex
public FlushRequester getFlushRequester();
/**
- * Return data structure that has Server address and startcode.
- * @return The HServerInfo for this RegionServer.
- */
- public HServerInfo getServerInfo();
-
- /**
* @return the RegionServerAccounting for this Region Server
*/
public RegionServerAccounting getRegionServerAccounting();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java Wed Apr 27 23:12:42 2011
@@ -198,7 +198,7 @@ public class SplitLogWorker extends ZooK
try {
taskReadyLock.wait();
} catch (InterruptedException e) {
- LOG.warn("SplitLogWorker inteurrpted while waiting for task," +
+ LOG.warn("SplitLogWorker interurrpted while waiting for task," +
" exiting", e);
assert exitWorker == true;
return;
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java Wed Apr 27 23:12:42 2011
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
@@ -476,7 +477,7 @@ public class SplitTransaction {
}
// Look for any exception
- for (Future future : futures) {
+ for (Future<Void> future: futures) {
try {
future.get();
} catch (InterruptedException e) {
@@ -690,7 +691,7 @@ public class SplitTransaction {
* @throws IOException
*/
private static int createNodeSplitting(final ZooKeeperWatcher zkw,
- final HRegionInfo region, final String serverName)
+ final HRegionInfo region, final ServerName serverName)
throws KeeperException, IOException {
LOG.debug(zkw.prefix("Creating ephemeral node for " +
region.getEncodedName() + " in SPLITTING state"));
@@ -744,7 +745,7 @@ public class SplitTransaction {
* @throws IOException
*/
private static int transitionNodeSplit(ZooKeeperWatcher zkw,
- HRegionInfo parent, HRegionInfo a, HRegionInfo b, String serverName,
+ HRegionInfo parent, HRegionInfo a, HRegionInfo b, ServerName serverName,
final int znodeVersion)
throws KeeperException, IOException {
byte [] payload = Writables.getBytes(a, b);
@@ -755,7 +756,7 @@ public class SplitTransaction {
private static int transitionNodeSplitting(final ZooKeeperWatcher zkw,
final HRegionInfo parent,
- final String serverName, final int version)
+ final ServerName serverName, final int version)
throws KeeperException, IOException {
return ZKAssign.transitionNode(zkw, parent, serverName,
EventType.RS_ZK_REGION_SPLITTING, EventType.RS_ZK_REGION_SPLITTING, version);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java Wed Apr 27 23:12:42 2011
@@ -40,7 +40,6 @@ import java.util.concurrent.ConcurrentSk
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Matcher;
@@ -57,8 +56,8 @@ import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
@@ -146,9 +145,6 @@ public class HLog implements Syncable {
private Method getNumCurrentReplicas; // refers to DFSOutputStream.getNumCurrentReplicas
final static Object [] NO_ARGS = new Object []{};
- // used to indirectly tell syncFs to force the sync
- private boolean forceSync = false;
-
public interface Reader {
void init(FileSystem fs, Path path, Configuration c) throws IOException;
void close() throws IOException;
@@ -1279,36 +1275,10 @@ public class HLog implements Syncable {
/**
* Construct the HLog directory name
*
- * @param info HServerInfo for server
- * @return the HLog directory name
- */
- public static String getHLogDirectoryName(HServerInfo info) {
- return getHLogDirectoryName(info.getServerName());
- }
-
- /**
- * Construct the HLog directory name
- *
- * @param serverAddress
- * @param startCode
- * @return the HLog directory name
- */
- public static String getHLogDirectoryName(String serverAddress,
- long startCode) {
- if (serverAddress == null || serverAddress.length() == 0) {
- return null;
- }
- return getHLogDirectoryName(
- HServerInfo.getServerName(serverAddress, startCode));
- }
-
- /**
- * Construct the HLog directory name
- *
- * @param serverName
+ * @param serverName Server name formatted as described in {@link ServerName}
* @return the HLog directory name
*/
- public static String getHLogDirectoryName(String serverName) {
+ public static String getHLogDirectoryName(final String serverName) {
StringBuilder dirName = new StringBuilder(HConstants.HREGION_LOGDIR_NAME);
dirName.append("/");
dirName.append(serverName);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Wed Apr 27 23:12:42 2011
@@ -24,7 +24,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
/**
@@ -37,8 +37,7 @@ public class ReplicationPeer {
private final String clusterKey;
private final String id;
- private List<HServerAddress> regionServers =
- new ArrayList<HServerAddress>(0);
+ private List<ServerName> regionServers = new ArrayList<ServerName>(0);
private final AtomicBoolean peerEnabled = new AtomicBoolean();
// Cannot be final since a new object needs to be recreated when session fails
private ZooKeeperWatcher zkw;
@@ -82,7 +81,7 @@ public class ReplicationPeer {
* for this peer cluster
* @return list of addresses
*/
- public List<HServerAddress> getRegionServers() {
+ public List<ServerName> getRegionServers() {
return regionServers;
}
@@ -90,7 +89,7 @@ public class ReplicationPeer {
* Set the list of region servers for that peer
* @param regionServers list of addresses for the region servers
*/
- public void setRegionServers(List<HServerAddress> regionServers) {
+ public void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Wed Apr 27 23:12:42 2011
@@ -35,14 +35,13 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
/**
@@ -137,7 +136,7 @@ public class ReplicationZookeeper {
this.peerClusters = new HashMap<String, ReplicationPeer>();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
+ this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
connectExistingPeers();
}
@@ -204,14 +203,14 @@ public class ReplicationZookeeper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
- public List<HServerAddress> getSlavesAddresses(String peerClusterId)
+ public List<ServerName> getSlavesAddresses(String peerClusterId)
throws KeeperException {
if (this.peerClusters.size() == 0) {
- return new ArrayList<HServerAddress>(0);
+ return new ArrayList<ServerName>(0);
}
ReplicationPeer peer = this.peerClusters.get(peerClusterId);
if (peer == null) {
- return new ArrayList<HServerAddress>(0);
+ return new ArrayList<ServerName>(0);
}
peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
return peer.getRegionServers();
@@ -222,10 +221,10 @@ public class ReplicationZookeeper {
* @param zkw zk connection to use
* @return list of region server addresses
*/
- private List<HServerAddress> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
- List<HServerAddress> rss = null;
+ private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
+ List<ServerName> rss = null;
try {
- rss = ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
+ rss = listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
} catch (KeeperException e) {
LOG.warn("Cannot get peer's region server addresses", e);
}
@@ -233,6 +232,37 @@ public class ReplicationZookeeper {
}
/**
+ * Lists the children of the specified znode, retrieving the data of each
+ * child as a server address.
+ *
+ * Used to list the currently online regionservers and their addresses.
+ *
+ * Sets no watches at all, this method is best effort.
+ *
+ * Returns an empty list if the node has no children. Returns null if the
+ * parent node itself does not exist.
+ *
+ * @param zkw zookeeper reference
+ * @param znode node to get children of as addresses
+ * @return list of data of children of specified znode, empty if no children,
+ * null if parent does not exist
+ * @throws KeeperException if unexpected zookeeper exception
+ */
+ public static List<ServerName> listChildrenAndGetAsServerNames(
+ ZooKeeperWatcher zkw, String znode)
+ throws KeeperException {
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
+ if(children == null) {
+ return null;
+ }
+ List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+ for (String child : children) {
+ addresses.add(new ServerName(child));
+ }
+ return addresses;
+ }
+
+ /**
* This method connects this cluster to another one and registers it
* in this region server's replication znode
* @param peerId id of the peer cluster
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Apr 27 23:12:42 2011
@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
@@ -202,7 +203,7 @@ public class ReplicationSource extends T
*/
private void chooseSinks() throws KeeperException {
this.currentPeers.clear();
- List<HServerAddress> addresses =
+ List<ServerName> addresses =
this.zkHelper.getSlavesAddresses(peerClusterId);
Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
@@ -212,7 +213,8 @@ public class ReplicationSource extends T
HServerAddress address;
// Make sure we get one address that we don't already have
do {
- address = addresses.get(this.random.nextInt(addresses.size()));
+ ServerName sn = addresses.get(this.random.nextInt(addresses.size()));
+ address = new HServerAddress(sn.getHostname(), sn.getPort());
} while (setOfAddr.contains(address));
LOG.info("Choosing peer " + address);
setOfAddr.add(address);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/StorageClusterStatusResource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/StorageClusterStatusResource.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/StorageClusterStatusResource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/rest/StorageClusterStatusResource.java Wed Apr 27 23:12:42 2011
@@ -35,8 +35,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ClusterStatus;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.rest.model.StorageClusterStatusModel;
@@ -73,13 +73,13 @@ public class StorageClusterStatusResourc
model.setRegions(status.getRegionsCount());
model.setRequests(status.getRequestsCount());
model.setAverageLoad(status.getAverageLoad());
- for (HServerInfo info: status.getServerInfo()) {
- HServerLoad load = info.getLoad();
- StorageClusterStatusModel.Node node =
+ for (ServerName info: status.getServers()) {
+ HServerLoad load = status.getLoad(info);
+ StorageClusterStatusModel.Node node =
model.addLiveNode(
- info.getServerAddress().getHostname() + ":" +
- Integer.toString(info.getServerAddress().getPort()),
- info.getStartCode(), load.getUsedHeapMB(),
+ info.getHostname() + ":" +
+ Integer.toString(info.getPort()),
+ info.getStartcode(), load.getUsedHeapMB(),
load.getMaxHeapMB());
node.setRequests(load.getNumberOfRequests());
for (HServerLoad.RegionLoad region: load.getRegionsLoad().values()) {
@@ -88,8 +88,8 @@ public class StorageClusterStatusResourc
region.getMemStoreSizeMB(), region.getStorefileIndexSizeMB());
}
}
- for (String name: status.getDeadServerNames()) {
- model.addDeadNode(name);
+ for (ServerName name: status.getDeadServerNames()) {
+ model.addDeadNode(name.toString());
}
ResponseBuilder response = Response.ok(model);
response.cacheControl(cacheControl);
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Addressing.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Addressing.java?rev=1097275&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Addressing.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/Addressing.java Wed Apr 27 23:12:42 2011
@@ -0,0 +1,75 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Utility for network addresses, resolving and naming.
+ */
+public class Addressing {
+ public static final String HOSTNAME_PORT_SEPARATOR = ":";
+
+ /**
+ * @param hostAndPort Formatted as <code><hostname> ':' <port></code>
+ * @return An InetSocketInstance
+ */
+ public static InetSocketAddress createInetSocketAddressFromHostAndPortStr(
+ final String hostAndPort) {
+ return new InetSocketAddress(parseHostname(hostAndPort), parsePort(hostAndPort));
+ }
+
+ /**
+ * @param hostname Server hostname
+ * @param port Server port
+ * @return Returns a concatenation of <code>hostname</code> and
+ * <code>port</code> in following
+ * form: <code><hostname> ':' <port></code>. For example, if hostname
+ * is <code>example.org</code> and port is 1234, this method will return
+ * <code>example.org:1234</code>
+ */
+ public static String createHostAndPortStr(final String hostname, final int port) {
+ return hostname + HOSTNAME_PORT_SEPARATOR + port;
+ }
+
+ /**
+ * @param hostAndPort Formatted as <code><hostname> ':' <port></code>
+ * @return The hostname portion of <code>hostAndPort</code>
+ */
+ public static String parseHostname(final String hostAndPort) {
+ int colonIndex = hostAndPort.lastIndexOf(HOSTNAME_PORT_SEPARATOR);
+ if (colonIndex < 0) {
+ throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort);
+ }
+ return hostAndPort.substring(0, colonIndex);
+ }
+
+ /**
+ * @param hostAndPort Formatted as <code><hostname> ':' <port></code>
+ * @return The port portion of <code>hostAndPort</code>
+ */
+ public static int parsePort(final String hostAndPort) {
+ int colonIndex = hostAndPort.lastIndexOf(HOSTNAME_PORT_SEPARATOR);
+ if (colonIndex < 0) {
+ throw new IllegalArgumentException("Not a host:port pair: " + hostAndPort);
+ }
+ return Integer.parseInt(hostAndPort.substring(colonIndex + 1));
+ }
+}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java Wed Apr 27 23:12:42 2011
@@ -39,25 +39,28 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MetaScanner;
-import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -172,21 +175,21 @@ public class HBaseFsck {
}
// From the master, get a list of all known live region servers
- Collection<HServerInfo> regionServers = status.getServerInfo();
+ Collection<ServerName> regionServers = status.getServers();
errors.print("Number of live region servers: " +
regionServers.size());
if (details) {
- for (HServerInfo rsinfo: regionServers) {
- errors.print(" " + rsinfo.getServerName());
+ for (ServerName rsinfo: regionServers) {
+ errors.print(" " + rsinfo);
}
}
// From the master, get a list of all dead region servers
- Collection<String> deadRegionServers = status.getDeadServerNames();
+ Collection<ServerName> deadRegionServers = status.getDeadServerNames();
errors.print("Number of dead region servers: " +
deadRegionServers.size());
if (details) {
- for (String name: deadRegionServers) {
+ for (ServerName name: deadRegionServers) {
errors.print(" " + name);
}
}
@@ -302,31 +305,55 @@ public class HBaseFsck {
// Check if Root region is valid and existing
if (rootLocation == null || rootLocation.getRegionInfo() == null ||
- rootLocation.getServerAddress() == null) {
+ rootLocation.getHostname() == null) {
errors.reportError("Root Region or some of its attributes is null.");
return false;
}
-
- MetaEntry m = new MetaEntry(rootLocation.getRegionInfo(),
- rootLocation.getServerAddress(), null, System.currentTimeMillis());
+ ServerName sn;
+ try {
+ sn = getRootRegionServerName();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted", e);
+ }
+ MetaEntry m =
+ new MetaEntry(rootLocation.getRegionInfo(), sn, System.currentTimeMillis());
HbckInfo hbInfo = new HbckInfo(m);
regionInfo.put(rootLocation.getRegionInfo().getEncodedName(), hbInfo);
return true;
}
+ private ServerName getRootRegionServerName()
+ throws IOException, InterruptedException {
+ RootRegionTracker rootRegionTracker =
+ new RootRegionTracker(this.connection.getZooKeeperWatcher(), new Abortable() {
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.error(why, e);
+ System.exit(1);
+ }
+ });
+ rootRegionTracker.start();
+ ServerName sn = null;
+ try {
+ sn = rootRegionTracker.getRootRegionLocation();
+ } finally {
+ rootRegionTracker.stop();
+ }
+ return sn;
+ }
+
/**
* Contacts each regionserver and fetches metadata about regions.
* @param regionServerList - the list of region servers to connect to
* @throws IOException if a remote or network exception occurs
*/
- void processRegionServers(Collection<HServerInfo> regionServerList)
- throws IOException, InterruptedException {
-
+ void processRegionServers(Collection<ServerName> regionServerList)
+ throws IOException, InterruptedException {
WorkItemRegion[] work = new WorkItemRegion[regionServerList.size()];
int num = 0;
// loop to contact each region server in parallel
- for (HServerInfo rsinfo:regionServerList) {
+ for (ServerName rsinfo: regionServerList) {
work[num] = new WorkItemRegion(this, rsinfo, errors, connection);
executor.execute(work[num]);
num++;
@@ -478,7 +505,7 @@ public class HBaseFsck {
if (modTInfo == null) {
modTInfo = new TInfo(tableName);
}
- for (HServerAddress server : hbi.deployedOn) {
+ for (ServerName server : hbi.deployedOn) {
modTInfo.addServer(server);
}
modTInfo.addEdge(hbi.metaEntry.getStartKey(), hbi.metaEntry.getEndKey());
@@ -498,19 +525,19 @@ public class HBaseFsck {
private class TInfo {
String tableName;
TreeMap <byte[], byte[]> edges;
- TreeSet <HServerAddress> deployedOn;
+ TreeSet <ServerName> deployedOn;
TInfo(String name) {
this.tableName = name;
edges = new TreeMap <byte[], byte[]> (Bytes.BYTES_COMPARATOR);
- deployedOn = new TreeSet <HServerAddress>();
+ deployedOn = new TreeSet <ServerName>();
}
public void addEdge(byte[] fromNode, byte[] toNode) {
this.edges.put(fromNode, toNode);
}
- public void addServer(HServerAddress server) {
+ public void addServer(ServerName server) {
this.deployedOn.add(server);
}
@@ -647,7 +674,7 @@ public class HBaseFsck {
errors.print("Trying to fix a problem with .META...");
setShouldRerun();
// try fix it (treat is a dupe assignment)
- List <HServerAddress> deployedOn = Lists.newArrayList();
+ List <ServerName> deployedOn = Lists.newArrayList();
for (HbckInfo mRegion : metaRegions) {
deployedOn.add(mRegion.metaEntry.regionServer);
}
@@ -681,35 +708,19 @@ public class HBaseFsck {
// record the latest modification of this META record
long ts = Collections.max(result.list(), comp).getTimestamp();
-
- // record region details
- byte [] value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.REGIONINFO_QUALIFIER);
- if (value == null || value.length == 0) {
+ Pair<HRegionInfo, ServerName> pair =
+ MetaReader.metaRowToRegionPair(result);
+ if (pair == null || pair.getFirst() == null) {
emptyRegionInfoQualifiers.add(result);
return true;
}
- HRegionInfo info = Writables.getHRegionInfo(value);
- HServerAddress server = null;
- byte[] startCode = null;
-
- // record assigned region server
- value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.SERVER_QUALIFIER);
- if (value != null && value.length > 0) {
- String address = Bytes.toString(value);
- server = new HServerAddress(address);
- }
-
- // record region's start key
- value = result.getValue(HConstants.CATALOG_FAMILY,
- HConstants.STARTCODE_QUALIFIER);
- if (value != null) {
- startCode = value;
+ ServerName sn = null;
+ if (pair.getSecond() != null) {
+ sn = pair.getSecond();
}
- MetaEntry m = new MetaEntry(info, server, startCode, ts);
+ MetaEntry m = new MetaEntry(pair.getFirst(), sn, ts);
HbckInfo hbInfo = new HbckInfo(m);
- HbckInfo previous = regionInfo.put(info.getEncodedName(), hbInfo);
+ HbckInfo previous = regionInfo.put(pair.getFirst().getEncodedName(), hbInfo);
if (previous != null) {
throw new IOException("Two entries in META are same " + previous);
}
@@ -740,11 +751,10 @@ public class HBaseFsck {
* Stores the entries scanned from META
*/
private static class MetaEntry extends HRegionInfo {
- HServerAddress regionServer; // server hosting this region
+ ServerName regionServer; // server hosting this region
long modTime; // timestamp of most recent modification metadata
- public MetaEntry(HRegionInfo rinfo, HServerAddress regionServer,
- byte[] startCode, long modTime) {
+ public MetaEntry(HRegionInfo rinfo, ServerName regionServer, long modTime) {
super(rinfo);
this.regionServer = regionServer;
this.modTime = modTime;
@@ -758,13 +768,13 @@ public class HBaseFsck {
boolean onlyEdits = false;
MetaEntry metaEntry = null;
FileStatus foundRegionDir = null;
- List<HServerAddress> deployedOn = Lists.newArrayList();
+ List<ServerName> deployedOn = Lists.newArrayList();
HbckInfo(MetaEntry metaEntry) {
this.metaEntry = metaEntry;
}
- public synchronized void addServer(HServerAddress server) {
+ public synchronized void addServer(ServerName server) {
this.deployedOn.add(server);
}
@@ -792,7 +802,7 @@ public class HBaseFsck {
}
System.out.println(" Number of regions: " + tInfo.getNumRegions());
System.out.print(" Deployed on: ");
- for (HServerAddress server : tInfo.deployedOn) {
+ for (ServerName server : tInfo.deployedOn) {
System.out.print(" " + server.toString());
}
System.out.println();
@@ -865,12 +875,12 @@ public class HBaseFsck {
*/
static class WorkItemRegion implements Runnable {
private HBaseFsck hbck;
- private HServerInfo rsinfo;
+ private ServerName rsinfo;
private ErrorReporter errors;
private HConnection connection;
private boolean done;
- WorkItemRegion(HBaseFsck hbck, HServerInfo info,
+ WorkItemRegion(HBaseFsck hbck, ServerName info,
ErrorReporter errors, HConnection connection) {
this.hbck = hbck;
this.rsinfo = info;
@@ -888,8 +898,7 @@ public class HBaseFsck {
public synchronized void run() {
errors.progress();
try {
- HRegionInterface server = connection.getHRegionConnection(
- rsinfo.getServerAddress());
+ HRegionInterface server = connection.getHRegionConnection(new HServerAddress(rsinfo.getHostname(), rsinfo.getPort()));
// list all online regions from this region server
List<HRegionInfo> regions = server.getOnlineRegions();
@@ -908,7 +917,7 @@ public class HBaseFsck {
// check to see if the existance of this region matches the region in META
for (HRegionInfo r:regions) {
HbckInfo hbi = hbck.getOrCreateInfo(r.getEncodedName());
- hbi.addServer(rsinfo.getServerAddress());
+ hbi.addServer(rsinfo);
}
} catch (IOException e) { // unable to connect to the region server.
errors.reportError("RegionServer: " + rsinfo.getServerName() +
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java Wed Apr 27 23:12:42 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -47,13 +48,13 @@ public class HBaseFsckRepair {
* @throws InterruptedException
*/
public static void fixDupeAssignment(Configuration conf, HRegionInfo region,
- List<HServerAddress> servers)
+ List<ServerName> servers)
throws IOException, KeeperException, InterruptedException {
HRegionInfo actualRegion = new HRegionInfo(region);
// Close region on the servers silently
- for(HServerAddress server : servers) {
+ for(ServerName server : servers) {
closeRegionSilentlyAndWait(conf, server, actualRegion);
}
@@ -82,14 +83,14 @@ public class HBaseFsckRepair {
throws ZooKeeperConnectionException, KeeperException, IOException {
ZKAssign.createOrForceNodeOffline(
HConnectionManager.getConnection(conf).getZooKeeperWatcher(),
- region, HConstants.HBCK_CODE_NAME);
+ region, HConstants.HBCK_CODE_SERVERNAME);
}
private static void closeRegionSilentlyAndWait(Configuration conf,
- HServerAddress server, HRegionInfo region)
+ ServerName server, HRegionInfo region)
throws IOException, InterruptedException {
HRegionInterface rs =
- HConnectionManager.getConnection(conf).getHRegionConnection(server);
+ HConnectionManager.getConnection(conf).getHRegionConnection(new HServerAddress(server.getHostname(), server.getPort()));
rs.closeRegion(region, false);
long timeout = conf.getLong("hbase.hbck.close.timeout", 120000);
long expiration = timeout + System.currentTimeMillis();
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java Wed Apr 27 23:12:42 2011
@@ -189,7 +189,7 @@ public class JVMClusterUtil {
while (true) {
for (JVMClusterUtil.MasterThread t : masters) {
if (t.master.isActiveMaster()) {
- return t.master.getMasterAddress().toString();
+ return t.master.getServerName().toString();
}
}
try {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerTracker.java Wed Apr 27 23:12:42 2011
@@ -19,14 +19,18 @@
*/
package org.apache.hadoop.hbase.zookeeper;
+import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.ServerManager;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.zookeeper.KeeperException;
/**
@@ -41,7 +45,7 @@ import org.apache.zookeeper.KeeperExcept
*/
public class RegionServerTracker extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(RegionServerTracker.class);
-
+ private NavigableSet<ServerName> regionServers = new TreeSet<ServerName>();
private ServerManager serverManager;
private Abortable abortable;
@@ -58,32 +62,56 @@ public class RegionServerTracker extends
* <p>All RSs will be tracked after this method is called.
*
* @throws KeeperException
+ * @throws IOException
*/
- public void start() throws KeeperException {
+ public void start() throws KeeperException, IOException {
watcher.registerListener(this);
- ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
+ List<NodeAndData> servers =
+ ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
+ add(servers);
+ }
+
+ private void add(final List<NodeAndData> servers) throws IOException {
+ synchronized(this.regionServers) {
+ this.regionServers.clear();
+ for (NodeAndData n: servers) {
+ ServerName sn = new ServerName(ZKUtil.getNodeName(n.getNode()));
+ this.regionServers.add(sn);
+ }
+ }
+ }
+
+ private void remove(final ServerName sn) {
+ synchronized(this.regionServers) {
+ this.regionServers.remove(sn);
+ }
}
@Override
public void nodeDeleted(String path) {
- if(path.startsWith(watcher.rsZNode)) {
+ if (path.startsWith(watcher.rsZNode)) {
String serverName = ZKUtil.getNodeName(path);
LOG.info("RegionServer ephemeral node deleted, processing expiration [" +
- serverName + "]");
- HServerInfo hsi = serverManager.getServerInfo(serverName);
- if(hsi == null) {
- LOG.info("No HServerInfo found for " + serverName);
+ serverName + "]");
+ ServerName sn = new ServerName(serverName);
+ if (!serverManager.isServerOnline(sn)) {
+ LOG.info(serverName.toString() + " is not online");
return;
}
- serverManager.expireServer(hsi);
+ remove(sn);
+ this.serverManager.expireServer(sn);
}
}
@Override
public void nodeChildrenChanged(String path) {
- if(path.equals(watcher.rsZNode)) {
+ if (path.equals(watcher.rsZNode)) {
try {
- ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
+ List<NodeAndData> servers =
+ ZKUtil.watchAndGetNewChildren(watcher, watcher.rsZNode);
+ add(servers);
+ } catch (IOException e) {
+ abortable.abort("Unexpected zk exception getting RS nodes", e);
} catch (KeeperException e) {
abortable.abort("Unexpected zk exception getting RS nodes", e);
}
@@ -92,10 +120,12 @@ public class RegionServerTracker extends
/**
* Gets the online servers.
- * @return list of online servers from zk
+ * @return list of online servers
* @throws KeeperException
*/
- public List<HServerAddress> getOnlineServers() throws KeeperException {
- return ZKUtil.listChildrenAndGetAsAddresses(watcher, watcher.rsZNode);
+ public List<ServerName> getOnlineServers() {
+ synchronized (this.regionServers) {
+ return new ArrayList<ServerName>(this.regionServers);
+ }
}
-}
+}
\ No newline at end of file