You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/07 21:58:57 UTC
svn commit: r535970 [2/4] - in /lucene/hadoop/trunk: ./
src/contrib/hbase/bin/ src/contrib/hbase/conf/
src/contrib/hbase/src/java/org/apache/hadoop/hbase/
src/contrib/hbase/src/test/org/apache/hadoop/hbase/
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Mon May 7 12:58:53 2007
@@ -23,34 +23,38 @@
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.util.StringUtils;
import java.io.*;
import java.util.*;
-/*******************************************************************************
+/**
* HMaster is the "master server" for a HBase.
* There is only one HMaster for a single HBase deployment.
- ******************************************************************************/
-public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface {
+ */
+public class HMaster implements HConstants, HMasterInterface,
+ HMasterRegionInterface, Runnable {
- public long getProtocolVersion(String protocol,
- long clientVersion) throws IOException {
+ public long getProtocolVersion(String protocol, long clientVersion)
+ throws IOException {
if (protocol.equals(HMasterInterface.class.getName())) {
return HMasterInterface.versionID;
- } else if (protocol.equals(HMasterRegionInterface.class.getName())){
+ } else if (protocol.equals(HMasterRegionInterface.class.getName())) {
return HMasterRegionInterface.versionID;
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
- private final Log LOG = LogFactory.getLog(this.getClass().getName());
+ private static final Log LOG =
+ LogFactory.getLog(org.apache.hadoop.hbase.HMaster.class.getName());
- private boolean closed;
+ private volatile boolean closed;
private Path dir;
private Configuration conf;
private FileSystem fs;
private Random rand;
+ private long threadWakeFrequency;
private long maxRegionOpenTime;
// The 'msgQueue' is used to assign work to the client processor thread
@@ -67,170 +71,227 @@
private HServerAddress rootRegionLocation;
- //////////////////////////////////////////////////////////////////////////////
- // The ROOT/META scans, performed each time a meta region comes on-line
- // Since both have identical structure, we only need one class to get the work
- // done, however we have to create separate objects for each.
- //////////////////////////////////////////////////////////////////////////////
+ /**
+ * Columns in the 'meta' ROOT and META tables.
+ */
+ private static final Text METACOLUMNS[] = {
+ COLUMN_FAMILY
+ };
+
+ static final String MASTER_NOT_RUNNING = "Master not running";
private boolean rootScanned;
private int numMetaRegions;
-
+
/**
- * How do we know if all regions are assigned?
+ * Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
+ * <code>META</code> HRegion scanners.
*
- * 1. After the initial scan of the root and meta regions, all regions known
- * at that time will have been or are in the process of being assigned.
+ * <p>How do we know if all regions are assigned? After the initial scan of
+ * the <code>ROOT</code> and <code>META</code> regions, all regions known at
+ * that time will have been or are in the process of being assigned.</p>
*
- * 2. When a region is split the region server notifies the master of the split
- * and the new regions are assigned. But suppose the master loses the split
- * message? We need to periodically rescan the root and meta regions.
- *
- * - If we rescan, any regions that are new but not assigned will have no
- * server info. Any regions that are not being served by the same server
- * will get re-assigned.
+ * <p>When a region is split the region server notifies the master of the
+ * split and the new regions are assigned. But suppose the master loses the
+ * split message? We need to periodically rescan the <code>ROOT</code> and
+ * <code>META</code> regions.
+ * <ul>
+ * <li>If we rescan, any regions that are new but not assigned will have
+ * no server info. Any regions that are not being served by the same
+ * server will get re-assigned.</li>
*
- * - Thus a periodic rescan of the root region will find any new meta
- * regions where we missed the meta split message or we failed to detect
- * a server death and consequently need to assign the region to a new
- * server.
- *
- * - if we keep track of all the known meta regions, then we can rescan
- * them periodically. If we do this then we can detect an regions for
- * which we missed a region split message.
+ * <li>Thus a periodic rescan of the root region will find any new
+ * <code>META</code> regions where we missed the <code>META</code> split
+ * message or we failed to detect a server death and consequently need to
+ * assign the region to a new server.</li>
*
- * Thus just keeping track of all the meta regions permits periodic rescanning
- * which will detect unassigned regions (new or otherwise) without the need to
- * keep track of every region.
+ * <li>if we keep track of all the known <code>META</code> regions, then
+ * we can rescan them periodically. If we do this then we can detect any
+ * regions for which we missed a region split message.</li>
+ * </ul>
+ *
+ * Thus just keeping track of all the <code>META</code> regions permits
+ * periodic rescanning which will detect unassigned regions (new or
+ * otherwise) without the need to keep track of every region.</p>
*
- * So the root region scanner needs to wake up
- * 1. when the master receives notification that the root region has been opened.
- * 2. periodically after the first scan
+ * <p>So the <code>ROOT</code> region scanner needs to wake up:
+ * <ol>
+ * <li>when the master receives notification that the <code>ROOT</code>
+ * region has been opened.</li>
+ * <li>periodically after the first scan</li>
+ * </ol>
*
- * The meta scanner needs to wake up:
- * 1. when a meta region comes on line
- * 2. periodically to rescan the known meta regions
+ * The <code>META</code> scanner needs to wake up:
+ * <ol>
+ * <li>when a <code>META</code> region comes on line</li>
+ * </li>periodically to rescan the known <code>META</code> regions</li>
+ * </ol>
*
- * A meta region is not 'known' until it has been scanned once.
- *
+ * <p>A <code>META</code> region is not 'known' until it has been scanned
+ * once.
*/
- private class RootScanner implements Runnable {
- private final Text cols[] = {
- ROOT_COLUMN_FAMILY
- };
- private final Text firstRow = new Text();
- private HRegionInterface rootServer;
+ private abstract class BaseScanner implements Runnable {
+ private final Text FIRST_ROW = new Text();
- public RootScanner() {
- rootServer = null;
- }
-
- public void run() {
- while((!closed)) {
- rootScanned = false;
- waitForRootRegion();
+ protected boolean scanRegion(final MetaRegion region)
+ throws IOException {
+ boolean scannedRegion = false;
+ HRegionInterface server = null;
+ long scannerId = -1L;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("scanning meta region " + region.regionName);
+ }
- rootServer = null;
- long scannerId = -1L;
-
- try {
- rootServer = client.getHRegionConnection(rootRegionLocation);
- scannerId = rootServer.openScanner(HGlobals.rootRegionInfo.regionName, cols, firstRow);
-
- } catch(IOException iex) {
- try {
- iex.printStackTrace();
- if(scannerId != -1L) {
- rootServer.close(scannerId);
- }
-
- } catch(IOException iex2) {
- }
- closed = true;
- break;
- }
- try {
- LOG.debug("starting root region scan");
+ try {
+ server = client.getHRegionConnection(region.server);
+ scannerId = server.openScanner(region.regionName, METACOLUMNS, FIRST_ROW);
- DataInputBuffer inbuf = new DataInputBuffer();
- while(true) {
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
- HStoreKey key = new HStoreKey();
- LabelledData[] values = rootServer.next(scannerId, key);
- if(values.length == 0) {
- break;
- }
- for(int i = 0; i < values.length; i++) {
- results.put(values[i].getLabel(), values[i].getData().get());
- }
- byte[] bytes = results.get(ROOT_COL_REGIONINFO);
- if(bytes == null || bytes.length == 0) {
- LOG.fatal("no value for " + ROOT_COL_REGIONINFO);
- stop();
- }
- inbuf.reset(bytes, bytes.length);
- HRegionInfo info = new HRegionInfo();
- info.readFields(inbuf);
+ DataInputBuffer inbuf = new DataInputBuffer();
+ while (true) {
+ TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ HStoreKey key = new HStoreKey();
- String serverName = null;
- bytes = results.get(ROOT_COL_SERVER);
- if(bytes != null && bytes.length != 0) {
- serverName = new String(bytes, UTF8_ENCODING);
- }
-
- long startCode = -1L;
- bytes = results.get(ROOT_COL_STARTCODE);
- if(bytes != null && bytes.length != 0) {
- startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
- }
-
- // Note META region to load.
-
- HServerInfo storedInfo = null;
- if(serverName != null) {
- serversToServerInfo.get(serverName);
- }
- if(storedInfo == null
- || storedInfo.getStartCode() != startCode) {
+ LabelledData[] values = server.next(scannerId, key);
- // The current assignment is no good; load the region.
+ if (values.length == 0) {
+ break;
+ }
- unassignedRegions.put(info.regionName, info);
- assignAttempts.put(info.regionName, 0L);
+ for (int i = 0; i < values.length; i++) {
+ results.put(values[i].getLabel(), values[i].getData().get());
+ }
- LOG.debug("region unassigned: " + info.regionName);
- }
+ HRegionInfo info = getRegionInfo(COL_REGIONINFO, results, inbuf);
+ String serverName = getServerName(COL_SERVER, results);
+ long startCode = getStartCode(COL_STARTCODE, results);
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("row: " + info.toString() + ", server: " + serverName
+ + ", startCode: " + startCode);
+ }
+ // Note Region has been assigned.
+ checkAssigned(info, serverName, startCode);
+
+ scannedRegion = true;
+ }
+ } finally {
+ try {
+ if (scannerId != -1L) {
+ server.close(scannerId);
+ }
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ scannerId = -1L;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("scan of meta region " + region.regionName + " complete");
+ }
+ return scannedRegion;
+ }
+
+ protected HRegionInfo getRegionInfo(final Text key,
+ final TreeMap<Text, byte[]> data, final DataInputBuffer in)
+ throws IOException {
+ byte[] bytes = data.get(key);
+ if (bytes == null || bytes.length == 0) {
+ throw new IOException("no value for " + key);
+ }
+ in.reset(bytes, bytes.length);
+ HRegionInfo info = new HRegionInfo();
+ info.readFields(in);
+ return info;
+ }
+
+ protected String getServerName(final Text key,
+ final TreeMap<Text, byte[]> data)
+ throws UnsupportedEncodingException {
+ byte [] bytes = data.get(key);
+ String name = (bytes != null && bytes.length != 0)?
+ new String(bytes, UTF8_ENCODING): null;
+ return (name != null)? name.trim(): name;
+ }
+
+ protected long getStartCode(final Text key,
+ final TreeMap<Text, byte[]> data)
+ throws NumberFormatException, UnsupportedEncodingException {
+ long startCode = -1L;
+ byte [] bytes = data.get(key);
+ if(bytes != null && bytes.length != 0) {
+ startCode = Long.valueOf(new String(bytes, UTF8_ENCODING).trim());
+ }
+ return startCode;
+ }
+
+ protected void checkAssigned(final HRegionInfo info,
+ final String serverName, final long startCode) {
+ HServerInfo storedInfo = null;
+ if(serverName != null) {
+ storedInfo = serversToServerInfo.get(serverName);
+ }
+ if(storedInfo == null || storedInfo.getStartCode() != startCode) {
+ // The current assignment is no good; load the region.
+ unassignedRegions.put(info.regionName, info);
+ assignAttempts.put(info.regionName, 0L);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("region unassigned: " + info.regionName);
+ }
+ }
+ }
+ }
+
+ /**
+ * Scanner for the <code>ROOT</code> HRegion.
+ */
+ private class RootScanner extends BaseScanner {
+ public void run() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running ROOT scanner");
+ }
+ try {
+ while(!closed) {
+ // rootRegionLocation will be filled in when we get an 'open region'
+ // regionServerReport message from the HRegionServer that has been
+ // allocated the ROOT region below. If we get back false, then
+ // HMaster has closed.
+ if (waitForRootRegionOrClose()) {
+ continue;
+ }
+ rootScanned = false;
+ // Make a MetaRegion instance for ROOT region to pass scanRegion.
+ MetaRegion mr = new MetaRegion();
+ mr.regionName = HGlobals.rootRegionInfo.regionName;
+ mr.server = HMaster.this.rootRegionLocation;
+ mr.startKey = null;
+ if (scanRegion(mr)) {
numMetaRegions += 1;
}
-
- } catch(Exception iex) {
- iex.printStackTrace();
-
- } finally {
+ rootScanned = true;
try {
- if(scannerId != -1L) {
- rootServer.close(scannerId);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RootScanner going to sleep");
}
-
- } catch(IOException iex2) {
+ Thread.sleep(metaRescanInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("RootScanner woke up");
+ }
+ } catch(InterruptedException e) {
+ // Catch and go around again. If interrupt, its spurious or we're
+ // being shutdown. Go back up to the while test.
}
- scannerId = -1L;
}
+ } catch(IOException e) {
+ e.printStackTrace();
+ closed = true;
}
- rootScanned = true;
- try {
- Thread.sleep(metaRescanInterval);
-
- } catch(InterruptedException e) {
- }
+ LOG.debug("ROOT scanner exiting");
}
}
private RootScanner rootScanner;
private Thread rootScannerThread;
- /** Contains information the meta scanner needs to process a "new" meta region */
private class MetaRegion {
public HServerAddress server;
public Text regionName;
@@ -245,165 +306,89 @@
private boolean allMetaRegionsScanned;
/**
- * MetaScanner scans a region either in the META table.
+ * MetaScanner <code>META</code> table.
*
- * When a meta server comes on line, a MetaRegion object is queued up by
- * regionServerReport() and this thread wakes up.
+ * When a <code>META</code> server comes on line, a MetaRegion object is
+ * queued up by regionServerReport() and this thread wakes up.
*
* It's important to do this work in a separate thread, or else the blocking
* action would prevent other work from getting done.
*/
- private class MetaScanner implements Runnable {
- private final Text cols[] = {
- META_COLUMN_FAMILY
- };
- private final Text firstRow = new Text();
-
- public MetaScanner() {
- }
-
- private void scanRegion(MetaRegion region) {
- HRegionInterface server = null;
- long scannerId = -1L;
-
- LOG.debug("scanning meta region: " + region.regionName);
-
- try {
- server = client.getHRegionConnection(region.server);
- scannerId = server.openScanner(region.regionName, cols, firstRow);
-
- } catch(IOException iex) {
- try {
- if(scannerId != -1L) {
- server.close(scannerId);
- scannerId = -1L;
- }
- stop();
-
- } catch(IOException iex2) {
- }
- return;
- }
-
- DataInputBuffer inbuf = new DataInputBuffer();
- try {
- while(true) {
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
- HStoreKey key = new HStoreKey();
-
- LabelledData[] values = server.next(scannerId, key);
-
- if(values.length == 0) {
- break;
- }
-
- for(int i = 0; i < values.length; i++) {
- results.put(values[i].getLabel(), values[i].getData().get());
- }
-
- byte bytes[] = results.get(META_COL_REGIONINFO);
- if(bytes == null || bytes.length == 0) {
- LOG.fatal("no value for " + META_COL_REGIONINFO);
- stop();
- }
- inbuf.reset(bytes, bytes.length);
- HRegionInfo info = new HRegionInfo();
- info.readFields(inbuf);
-
- String serverName = null;
- bytes = results.get(META_COL_SERVER);
- if(bytes != null && bytes.length != 0) {
- serverName = new String(bytes, UTF8_ENCODING);
- }
-
- long startCode = -1L;
- bytes = results.get(META_COL_STARTCODE);
- if(bytes != null && bytes.length != 0) {
- startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
- }
-
- // Note HRegion to load.
-
- HServerInfo storedInfo = null;
- if(serverName != null) {
- serversToServerInfo.get(serverName);
- }
- if(storedInfo == null
- || storedInfo.getStartCode() != startCode) {
-
- // The current assignment is no good; load the region.
-
- unassignedRegions.put(info.regionName, info);
- assignAttempts.put(info.regionName, 0L);
- LOG.debug("region unassigned: " + info.regionName);
- }
- }
-
- } catch(Exception iex) {
- iex.printStackTrace();
-
- } finally {
- try {
- if(scannerId != -1L) {
- server.close(scannerId);
- }
-
- } catch(IOException iex2) {
- iex2.printStackTrace();
- }
- scannerId = -1L;
- }
- }
-
+ private class MetaScanner extends BaseScanner {
public void run() {
- while((!closed)) {
+ while (!closed) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Running META scanner");
+ }
MetaRegion region = null;
-
- while(region == null) {
- synchronized(metaRegionsToScan) {
- if(metaRegionsToScan.size() != 0) {
+ while (region == null && !closed) {
+ synchronized (metaRegionsToScan) {
+ if (metaRegionsToScan.size() != 0) {
region = metaRegionsToScan.remove(0);
}
- if(region == null) {
+ if (region == null) {
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MetaScanner going into wait");
+ }
metaRegionsToScan.wait();
-
- } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MetaScanner woke up");
+ }
+ } catch (InterruptedException e) {
+ // Catch and go around again. We've been woken because there
+ // are new meta regions available or because we are being
+ // shut down.
}
}
}
}
-
- scanRegion(region);
- if(closed) {
- break;
- }
- knownMetaRegions.put(region.startKey, region);
- if(rootScanned && knownMetaRegions.size() == numMetaRegions) {
- LOG.debug("all meta regions scanned");
- allMetaRegionsScanned = true;
- metaRegionsScanned();
+ if (closed) {
+ continue;
}
-
- do {
- try {
- Thread.sleep(metaRescanInterval);
-
- } catch(InterruptedException ex) {
- }
- if(!allMetaRegionsScanned) {
- break; // A meta region must have split
- }
-
- // Rescan the known meta regions every so often
-
- Vector<MetaRegion> v = new Vector<MetaRegion>();
- v.addAll(knownMetaRegions.values());
-
- for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) {
- scanRegion(i.next());
+ try {
+ scanRegion(region);
+ knownMetaRegions.put(region.startKey, region);
+ if (rootScanned && knownMetaRegions.size() == numMetaRegions) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("all meta regions scanned");
+ }
+ allMetaRegionsScanned = true;
+ metaRegionsScanned();
}
- } while(true);
+
+ do {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleep for meta rescan interval");
+ }
+ Thread.sleep(metaRescanInterval);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleep for meta rescan interval");
+ }
+ } catch(InterruptedException ex) {
+ // Catch and go around again.
+ }
+ if(!allMetaRegionsScanned // A meta region must have split
+ || closed) { // We're shutting down
+ break;
+ }
+
+ // Rescan the known meta regions every so often
+ Vector<MetaRegion> v = new Vector<MetaRegion>();
+ v.addAll(knownMetaRegions.values());
+ for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) {
+ scanRegion(i.next());
+ }
+ } while(true);
+
+ } catch(IOException e) {
+ e.printStackTrace();
+ closed = true;
+ }
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("META scanner exiting");
}
}
@@ -412,11 +397,19 @@
}
public synchronized void waitForMetaScan() {
- while(!allMetaRegionsScanned) {
+ while(!closed && !allMetaRegionsScanned) {
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wait for all meta regions scanned");
+ }
wait();
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake from wait for all meta regions scanned");
+ }
} catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake from wait for all meta regions scanned (IE)");
+ }
}
}
}
@@ -425,11 +418,6 @@
private MetaScanner metaScanner;
private Thread metaScannerThread;
- // Client processing
-
- private ClientProcessor clientProcessor;
- private Thread clientProcessorThread;
-
// The 'unassignedRegions' table maps from a region name to a HRegionInfo record,
// which includes the region's table, its id, and its start/end keys.
//
@@ -449,9 +437,8 @@
private SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
- // 'serversToServerInfo' maps from the String to its HServerInfo
-
- private SortedMap<String, HServerInfo> serversToServerInfo;
+ private SortedMap<String, HServerInfo> serversToServerInfo =
+ Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
/** Build the HMaster out of a raw configuration item. */
public HMaster(Configuration conf) throws IOException {
@@ -468,7 +455,8 @@
*
* @throws IOException
*/
- public HMaster(Path dir, HServerAddress address, Configuration conf) throws IOException {
+ public HMaster(Path dir, HServerAddress address, Configuration conf)
+ throws IOException {
this.closed = true;
this.dir = dir;
this.conf = conf;
@@ -481,12 +469,10 @@
fs.mkdirs(dir);
}
- Path rootRegionDir = HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName);
+ Path rootRegionDir =
+ HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName);
if(! fs.exists(rootRegionDir)) {
- LOG.debug("bootstrap: creating root and meta regions");
-
- // Bootstrap! Need to create the root region and the first meta region.
-
+ LOG.info("bootstrap: creating ROOT and first META regions");
try {
HRegion root = createNewHRegion(HGlobals.rootTableDesc, 0L);
HRegion meta = createNewHRegion(HGlobals.metaTableDesc, 1L);
@@ -501,13 +487,16 @@
}
}
+ this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
this.msgQueue = new Vector<PendingOperation>();
- this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 30 * 1000),
+ this.serverLeases = new Leases(
+ conf.getLong("hbase.master.lease.period", 30 * 1000),
conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
this.server = RPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+ address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+ false, conf);
// The rpc-server port can be ephemeral... ensure we have the correct info
@@ -539,11 +528,6 @@
this.metaScanner = new MetaScanner();
this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
- // Process updates to meta asychronously
-
- this.clientProcessor = new ClientProcessor();
- this.clientProcessorThread = new Thread(clientProcessor, "HMaster.clientProcessor");
-
this.unassignedRegions =
Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
@@ -558,75 +542,118 @@
Collections.synchronizedSortedMap(
new TreeMap<String, TreeMap<Text, HRegionInfo>>());
- this.serversToServerInfo =
- Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
-
// We're almost open for business
this.closed = false;
+ LOG.info("HMaster initialized on " + address.toString());
+ }
+
+ public boolean isMasterRunning() {
+ return !closed;
+ }
+
+ public void run() {
+ Thread.currentThread().setName("HMaster");
try {
// Start things up
-
this.rootScannerThread.start();
this.metaScannerThread.start();
- this.clientProcessorThread.start();
// Start the server last so everything else is running before we start
// receiving requests
-
this.server.start();
-
} catch(IOException e) {
- // Something happend during startup. Shut things down.
-
+ // Something happened during startup. Shut things down.
this.closed = true;
- throw e;
+ e.printStackTrace();
}
- LOG.info("HMaster started");
- }
-
- /** Turn off the HMaster. Turn off all the threads, close files, etc. */
- public void stop() throws IOException {
- closed = true;
- try {
- client.close();
-
- } catch(IOException iex) {
+ // Main processing loop
+ for(PendingOperation op = null; !closed; ) {
+ synchronized(msgQueue) {
+ while(msgQueue.size() == 0 && serversToServerInfo.size() != 0) {
+ try {
+ msgQueue.wait(threadWakeFrequency);
+ } catch(InterruptedException iex) {
+ }
+ }
+ if(msgQueue.size() == 0 || closed) {
+ continue;
+ }
+ op = msgQueue.remove(msgQueue.size()-1);
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + op.toString());
+ }
+ op.process();
+ } catch(Exception ex) {
+ msgQueue.insertElementAt(op, 0);
+ }
}
- server.stop();
- LOG.info("shutting down HMaster");
- }
-
- /** returns the HMaster server address */
- public HServerAddress getMasterAddress() {
- return address;
- }
- /** Call this to wait for everything to finish */
- public void join() {
+ /*
+ * Clean up and close up shop
+ */
+
+ // Wake other threads so they notice the close
+
+ rootScannerThread.interrupt();
+ metaScannerThread.interrupt();
+ server.stop(); // Stop server
+ serverLeases.close(); // Turn off the lease monitor
try {
- server.join();
-
- } catch(InterruptedException iex) {
+ fs.close();
+ client.close(); // Shut down the client
+ } catch(IOException iex) {
+ // Print if ever there is an interrupt (Just for kicks. Remove if it
+ // ever happens).
+ iex.printStackTrace();
}
+
+ // Join up with all threads
+
try {
- clientProcessorThread.join();
-
+ // Wait for the root scanner to finish.
+ rootScannerThread.join();
} catch(Exception iex) {
+ // Print if ever there is an interrupt (Just for kicks. Remove if it
+ // ever happens).
+ iex.printStackTrace();
}
try {
+ // Join the thread till it finishes.
metaScannerThread.join();
-
} catch(Exception iex) {
+ // Print if ever there is an interrupt (Just for kicks. Remove if it
+ // ever happens).
+ iex.printStackTrace();
}
try {
- rootScannerThread.join();
-
- } catch(Exception iex) {
+ // Join until its finished. TODO: Maybe do in parallel in its own thread
+ // as is done in TaskTracker if its taking a long time to go down.
+ server.join();
+ } catch(InterruptedException iex) {
+ // Print if ever there is an interrupt (Just for kicks. Remove if it
+ // ever happens).
+ iex.printStackTrace();
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("HMaster main thread exiting");
+ }
+ }
+
+ /**
+ * Turn off the HMaster. Sets a flag so that the main thread know to shut
+ * things down in an orderly fashion.
+ */
+ public void shutdown() throws IOException {
+ closed = true;
+ synchronized(msgQueue) {
+ msgQueue.clear(); // Empty the queue
+ msgQueue.notifyAll(); // Wake main thread
}
- LOG.info("HMaster stopped");
}
//////////////////////////////////////////////////////////////////////////////
@@ -635,20 +662,19 @@
/** HRegionServers call this method upon startup. */
public void regionServerStartup(HServerInfo serverInfo) throws IOException {
- String server = serverInfo.getServerAddress().toString();
+ String server = serverInfo.getServerAddress().toString().trim();
HServerInfo storedInfo = null;
- LOG.debug("received start message from: " + server);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("received start message from: " + server);
+ }
// If we get the startup message but there's an old server by that
// name, then we can timeout the old one right away and register
// the new one.
-
- storedInfo = serversToServerInfo.get(server);
-
- if(storedInfo != null) {
- serversToServerInfo.remove(server);
+ storedInfo = serversToServerInfo.remove(server);
+ if(storedInfo != null && !closed) {
synchronized(msgQueue) {
msgQueue.add(new PendingServerShutdown(storedInfo));
msgQueue.notifyAll();
@@ -659,19 +685,33 @@
serversToServerInfo.put(server, serverInfo);
- Text serverLabel = new Text(server);
- serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
+ if(!closed) {
+ Text serverLabel = new Text(server);
+ serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
+ }
}
/** HRegionServers call this method repeatedly. */
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException {
- String server = serverInfo.getServerAddress().toString();
+ String server = serverInfo.getServerAddress().toString().trim();
+
+ if(closed) {
+ // We're shutting down. Tell the server to go away.
+ serversToServerInfo.remove(server);
+
+ HMsg returnMsgs[] = {
+ new HMsg(HMsg.MSG_REGIONSERVER_STOP)
+ };
+ return returnMsgs;
+ }
HServerInfo storedInfo = serversToServerInfo.get(server);
if(storedInfo == null) {
- LOG.debug("received server report from unknown server: " + server);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("received server report from unknown server: " + server);
+ }
// The HBaseMaster may have been restarted.
// Tell the RegionServer to start over and call regionServerStartup()
@@ -691,10 +731,13 @@
//
// The answer is to ask A to shut down for good.
- LOG.debug("region server race condition detected: " + server);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("region server race condition detected: " + server);
+ }
- HMsg returnMsgs[] = new HMsg[1];
- returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING);
+ HMsg returnMsgs[] = {
+ new HMsg(HMsg.MSG_REGIONSERVER_STOP)
+ };
return returnMsgs;
} else {
@@ -720,7 +763,9 @@
// Process the kill list
- TreeMap<Text, HRegionInfo> regionsToKill = killList.get(info.toString());
+ TreeMap<Text, HRegionInfo> regionsToKill =
+ killList.remove(info.getServerAddress().toString());
+
if(regionsToKill != null) {
for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
i.hasNext(); ) {
@@ -741,8 +786,10 @@
if(regionInfo == null) {
- LOG.debug("region server " + info.getServerAddress().toString()
- + "should not have opened region " + region.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("region server " + info.getServerAddress().toString()
+ + "should not have opened region " + region.regionName);
+ }
// This Region should not have been opened.
// Ask the server to shut it down, but don't report it as closed.
@@ -753,8 +800,10 @@
} else {
- LOG.debug(info.getServerAddress().toString() + " serving "
- + region.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(info.getServerAddress().toString() + " serving "
+ + region.regionName);
+ }
// Remove from unassigned list so we don't assign it to someone else
@@ -797,8 +846,10 @@
break;
case HMsg.MSG_REPORT_CLOSE:
- LOG.debug(info.getServerAddress().toString() + " no longer serving "
- + region.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(info.getServerAddress().toString() + " no longer serving "
+ + region.regionName);
+ }
if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region
rootRegionLocation = null;
@@ -835,7 +886,9 @@
break;
case HMsg.MSG_NEW_REGION:
- LOG.debug("new region " + region.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("new region " + region.regionName);
+ }
if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
// A meta region has split.
@@ -865,15 +918,17 @@
long now = System.currentTimeMillis();
for(Iterator<Text> it = unassignedRegions.keySet().iterator();
- it.hasNext(); ) {
+ it.hasNext(); ) {
Text curRegionName = it.next();
HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
long assignedTime = assignAttempts.get(curRegionName);
if(now - assignedTime > maxRegionOpenTime) {
- LOG.debug("assigning region " + regionInfo.regionName + " to server "
- + info.getServerAddress().toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("assigning region " + regionInfo.regionName + " to server "
+ + info.getServerAddress().toString());
+ }
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
@@ -893,51 +948,55 @@
notifyAll();
}
- private synchronized void waitForRootRegion() {
- while(rootRegionLocation == null) {
+ /**
+ * Wait until <code>rootRegionLocation</code> has been set or until the
+ * <code>closed</code> flag has been set.
+ * @return True if <code>rootRegionLocation</code> was populated.
+ */
+ private synchronized boolean waitForRootRegionOrClose() {
+ while (!closed && rootRegionLocation == null) {
try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wait for root region (or close)");
+ }
wait();
-
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake from wait for root region (or close)");
+ }
} catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake from wait for root region (or close) (IE)");
+ }
}
}
+ return this.rootRegionLocation == null;
}
-
- //////////////////////////////////////////////////////////////////////////////
- // Some internal classes to manage msg-passing and client operations
- //////////////////////////////////////////////////////////////////////////////
- private class ClientProcessor implements Runnable {
- public ClientProcessor() {
- }
-
- public void run() {
- while(!closed) {
- PendingOperation op = null;
-
- synchronized(msgQueue) {
- while(msgQueue.size() == 0) {
- try {
- msgQueue.wait();
-
- } catch(InterruptedException iex) {
- }
- }
- op = msgQueue.remove(msgQueue.size()-1);
+ private synchronized void waitForRootRegion() {
+ while (rootRegionLocation == null) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wait for root region");
}
- try {
- op.process();
-
- } catch(Exception ex) {
- msgQueue.insertElementAt(op, 0);
+ wait();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake from wait for root region");
+ }
+ } catch(InterruptedException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Wake from wait for root region (IE)");
}
}
}
}
+ //////////////////////////////////////////////////////////////////////////////
+ // Some internal classes to manage msg-passing and client operations
+ //////////////////////////////////////////////////////////////////////////////
+
private abstract class PendingOperation {
protected final Text[] columns = {
- META_COLUMN_FAMILY
+ COLUMN_FAMILY
};
protected final Text startRow = new Text();
protected long clientId;
@@ -981,16 +1040,25 @@
results.put(values[i].getLabel(), values[i].getData().get());
}
- String serverName =
- new String(results.get(META_COL_SERVER), UTF8_ENCODING);
+ byte[] bytes = results.get(COL_SERVER);
+ String serverName = null;
+ if(bytes == null || bytes.length == 0) {
+ // No server
+ continue;
+ }
+ serverName = new String(bytes, UTF8_ENCODING);
if(deadServer.compareTo(serverName) != 0) {
// This isn't the server you're looking for - move along
continue;
}
- long startCode =
- Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING));
+ bytes = results.get(COL_STARTCODE);
+ if(bytes == null || bytes.length == 0) {
+ // No start code
+ continue;
+ }
+ long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
if(oldStartCode != startCode) {
// Close but no cigar
@@ -999,12 +1067,17 @@
// Bingo! Found it.
- byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
- inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+ bytes = results.get(COL_REGIONINFO);
+ if(bytes == null || bytes.length == 0) {
+ throw new IOException("no value for " + COL_REGIONINFO);
+ }
+ inbuf.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
- LOG.debug(serverName + " was serving " + info.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(serverName + " was serving " + info.regionName);
+ }
// Add to our to do lists
@@ -1029,8 +1102,8 @@
for(int i = 0; i < toDoList.size(); i++) {
long lockid = server.startUpdate(regionName, clientId, toDoList.get(i).getRow());
- server.delete(regionName, clientId, lockid, META_COL_SERVER);
- server.delete(regionName, clientId, lockid, META_COL_STARTCODE);
+ server.delete(regionName, clientId, lockid, COL_SERVER);
+ server.delete(regionName, clientId, lockid, COL_STARTCODE);
server.commit(regionName, clientId, lockid);
}
@@ -1049,7 +1122,9 @@
}
public void process() throws IOException {
- LOG.debug("server shutdown: " + deadServer);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("server shutdown: " + deadServer);
+ }
// Scan the ROOT region
@@ -1107,30 +1182,40 @@
metaScanner.waitForMetaScan();
- LOG.debug("region closed: " + regionInfo.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("region closed: " + regionInfo.regionName);
+ }
// Mark the Region as unavailable in the appropriate meta table
Text metaRegionName;
HRegionInterface server;
- if(rootRegion) {
+ if (rootRegion) {
metaRegionName = HGlobals.rootRegionInfo.regionName;
waitForRootRegion(); // Make sure root region available
server = client.getHRegionConnection(rootRegionLocation);
} else {
- Text metaStartRow = knownMetaRegions.headMap(regionInfo.regionName).lastKey();
- MetaRegion r = knownMetaRegions.get(metaStartRow);
+ MetaRegion r = null;
+ if(knownMetaRegions.containsKey(regionInfo.regionName)) {
+ r = knownMetaRegions.get(regionInfo.regionName);
+
+ } else {
+ r = knownMetaRegions.get(
+ knownMetaRegions.headMap(regionInfo.regionName).lastKey());
+ }
metaRegionName = r.regionName;
server = client.getHRegionConnection(r.server);
}
long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName);
- server.delete(metaRegionName, clientId, lockid, META_COL_SERVER);
- server.delete(metaRegionName, clientId, lockid, META_COL_STARTCODE);
+ server.delete(metaRegionName, clientId, lockid, COL_SERVER);
+ server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
server.commit(metaRegionName, clientId, lockid);
if(reassignRegion) {
- LOG.debug("reassign region: " + regionInfo.regionName);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("reassign region: " + regionInfo.regionName);
+ }
unassignedRegions.put(regionInfo.regionName, regionInfo);
assignAttempts.put(regionInfo.regionName, 0L);
@@ -1169,6 +1254,7 @@
String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
} catch(UnsupportedEncodingException e) {
+ e.printStackTrace();
}
}
@@ -1180,7 +1266,10 @@
metaScanner.waitForMetaScan();
- LOG.debug(regionName + " open on " + serverAddress.toString());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(regionName + " open on "
+ + new String(serverAddress.get(), UTF8_ENCODING));
+ }
// Register the newly-available Region's location.
@@ -1192,14 +1281,23 @@
server = client.getHRegionConnection(rootRegionLocation);
} else {
- Text metaStartRow = knownMetaRegions.headMap(regionName).lastKey();
- MetaRegion r = knownMetaRegions.get(metaStartRow);
+ MetaRegion r = null;
+ if(knownMetaRegions.containsKey(regionName)) {
+ r = knownMetaRegions.get(regionName);
+
+ } else {
+ r = knownMetaRegions.get(
+ knownMetaRegions.headMap(regionName).lastKey());
+ }
metaRegionName = r.regionName;
server = client.getHRegionConnection(r.server);
}
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("updating row " + regionName + " in table " + metaRegionName);
+ }
long lockid = server.startUpdate(metaRegionName, clientId, regionName);
- server.put(metaRegionName, clientId, lockid, META_COL_SERVER, serverAddress);
- server.put(metaRegionName, clientId, lockid, META_COL_STARTCODE, startCode);
+ server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
+ server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
server.commit(metaRegionName, clientId, lockid);
}
}
@@ -1208,7 +1306,15 @@
// HMasterInterface
//////////////////////////////////////////////////////////////////////////////
+ /** returns the HMaster server address */
+ public HServerAddress getMasterAddress() {
+ return address;
+ }
+
public void createTable(HTableDescriptor desc) throws IOException {
+ if (!isMasterRunning()) {
+ throw new IllegalStateException(MASTER_NOT_RUNNING);
+ }
HRegionInfo newRegion = new HRegionInfo(rand.nextLong(), desc, null, null);
// We can not access any meta region if they have not already been assigned
@@ -1218,13 +1324,19 @@
// 1. Check to see if table already exists
- Text metaStartRow = knownMetaRegions.headMap(newRegion.regionName).lastKey();
- MetaRegion m = knownMetaRegions.get(metaStartRow);
+ MetaRegion m = null;
+ if(knownMetaRegions.containsKey(newRegion.regionName)) {
+ m = knownMetaRegions.get(newRegion.regionName);
+
+ } else {
+ m = knownMetaRegions.get(
+ knownMetaRegions.headMap(newRegion.regionName).lastKey());
+ }
Text metaRegionName = m.regionName;
HRegionInterface server = client.getHRegionConnection(m.server);
- BytesWritable bytes = server.get(metaRegionName, desc.getName(), META_COL_REGIONINFO);
+ BytesWritable bytes = server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
if(bytes != null && bytes.getSize() != 0) {
byte[] infoBytes = bytes.get();
DataInputBuffer inbuf = new DataInputBuffer();
@@ -1250,7 +1362,7 @@
long clientId = rand.nextLong();
long lockid = server.startUpdate(metaRegionName, clientId, regionName);
- server.put(metaRegionName, clientId, lockid, META_COL_REGIONINFO,
+ server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
new BytesWritable(byteValue.toByteArray()));
server.commit(metaRegionName, clientId, lockid);
@@ -1263,7 +1375,9 @@
unassignedRegions.put(regionName, info);
assignAttempts.put(regionName, 0L);
- LOG.debug("created table " + desc.getName());
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("created table " + desc.getName());
+ }
}
/**
@@ -1283,8 +1397,9 @@
Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
fs.mkdirs(regionDir);
- return new HRegion(dir, new HLog(fs, new Path(regionDir, "log"), conf), fs,
- conf, info, null, null);
+ return new HRegion(dir,
+ new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
+ fs, conf, info, null, null);
}
/**
@@ -1307,48 +1422,65 @@
table.getRegionInfo().write(s);
- meta.put(writeid, META_COL_REGIONINFO, bytes.toByteArray());
+ meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
meta.commit(writeid);
}
public void deleteTable(Text tableName) throws IOException {
- Text[] columns = {
- META_COLUMN_FAMILY
- };
+ if (!isMasterRunning()) {
+ throw new IllegalStateException(MASTER_NOT_RUNNING);
+ }
// We can not access any meta region if they have not already been assigned
// and scanned.
metaScanner.waitForMetaScan();
- for(Iterator<MetaRegion> it = knownMetaRegions.tailMap(tableName).values().iterator();
+ Text firstMetaRegion = null;
+ if(knownMetaRegions.size() == 1) {
+ firstMetaRegion = knownMetaRegions.firstKey();
+
+ } else if(knownMetaRegions.containsKey(tableName)) {
+ firstMetaRegion = tableName;
+
+ } else {
+ firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
+ }
+ for(Iterator<MetaRegion> it =
+ knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
it.hasNext(); ) {
// Find all the regions that make up this table
- long clientId = rand.nextLong();
MetaRegion m = it.next();
HRegionInterface server = client.getHRegionConnection(m.server);
+ Vector<Text> rowsToDelete = new Vector<Text>();
+
long scannerId = -1L;
try {
- scannerId = server.openScanner(m.regionName, columns, tableName);
+ scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
- Vector<Text> rowsToDelete = new Vector<Text>();
DataInputBuffer inbuf = new DataInputBuffer();
+ byte[] bytes;
while(true) {
LabelledData[] values = null;
HStoreKey key = new HStoreKey();
values = server.next(scannerId, key);
- if(values.length == 0) {
+ if(values == null || values.length == 0) {
break;
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
for(int i = 0; i < values.length; i++) {
- results.put(values[i].getLabel(), values[i].getData().get());
+ bytes = new byte[values[i].getData().getSize()];
+ System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
+ results.put(values[i].getLabel(), bytes);
+ }
+ bytes = results.get(COL_REGIONINFO);
+ if(bytes == null || bytes.length == 0) {
+ break;
}
- byte bytes[] = results.get(META_COL_REGIONINFO);
inbuf.reset(bytes, bytes.length);
HRegionInfo info = new HRegionInfo();
info.readFields(inbuf);
@@ -1356,33 +1488,37 @@
if(info.tableDesc.getName().compareTo(tableName) > 0) {
break; // Beyond any more entries for this table
}
+
+ rowsToDelete.add(info.regionName);
// Is it being served?
- String serverName =
- new String(results.get(META_COL_SERVER), UTF8_ENCODING);
-
- long startCode =
- Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING));
+ bytes = results.get(COL_SERVER);
+ if(bytes != null && bytes.length != 0) {
+ String serverName = new String(bytes, UTF8_ENCODING);
+
+ bytes = results.get(COL_STARTCODE);
+ if(bytes != null && bytes.length != 0) {
+ long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
- HServerInfo s = serversToServerInfo.get(serverName);
- if(s != null && s.getStartCode() == startCode) {
+ HServerInfo s = serversToServerInfo.get(serverName);
+ if(s != null && s.getStartCode() == startCode) {
- // It is being served. Tell the server to stop it and not report back
+ // It is being served.
+ // Tell the server to stop it and not report back.
- TreeMap<Text, HRegionInfo> regionsToKill = killList.get(serverName);
- if(regionsToKill == null) {
- regionsToKill = new TreeMap<Text, HRegionInfo>();
+ TreeMap<Text, HRegionInfo> regionsToKill =
+ killList.get(serverName);
+
+ if(regionsToKill == null) {
+ regionsToKill = new TreeMap<Text, HRegionInfo>();
+ }
+ regionsToKill.put(info.regionName, info);
+ killList.put(serverName, regionsToKill);
+ }
}
- regionsToKill.put(info.regionName, info);
- killList.put(serverName, regionsToKill);
}
}
- for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
- long lockid = server.startUpdate(m.regionName, clientId, row.next());
- server.delete(m.regionName, clientId, lockid, columns[0]);
- server.commit(m.regionName, clientId, lockid);
- }
} catch(IOException e) {
e.printStackTrace();
@@ -1398,8 +1534,27 @@
}
scannerId = -1L;
}
+ for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
+ Text rowName = row.next();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("deleting columns in row: " + rowName);
+ }
+ try {
+ long clientId = rand.nextLong();
+ long lockid = server.startUpdate(m.regionName, clientId, rowName);
+ server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
+ server.delete(m.regionName, clientId, lockid, COL_SERVER);
+ server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
+ server.commit(m.regionName, clientId, lockid);
+
+ } catch(Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("deleted table: " + tableName);
}
- LOG.debug("deleted table: " + tableName);
}
public HServerAddress findRootRegion() {
@@ -1418,7 +1573,9 @@
}
public void leaseExpired() {
- LOG.debug(server + " lease expired");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug(server + " lease expired");
+ }
HServerInfo storedInfo = serversToServerInfo.remove(server);
synchronized(msgQueue) {
@@ -1428,30 +1585,53 @@
}
}
- private static void printUsage() {
+ private static void printUsageAndExit() {
System.err.println("Usage: java org.apache.hbase.HMaster " +
- "[--bind=hostname:port]");
+ "[--bind=hostname:port] start|stop");
+ System.exit(0);
}
public static void main(String [] args) throws IOException {
+ if (args.length < 1) {
+ printUsageAndExit();
+ }
+
Configuration conf = new HBaseConfiguration();
// Process command-line args. TODO: Better cmd-line processing
// (but hopefully something not as painful as cli options).
+ final String addressArgKey = "--bind=";
for (String cmd: args) {
- if (cmd.equals("-h") || cmd.startsWith("--h")) {
- printUsage();
- return;
+ if (cmd.startsWith(addressArgKey)) {
+ conf.set(MASTER_ADDRESS, cmd.substring(addressArgKey.length()));
+ continue;
}
- final String addressArgKey = "--bind=";
- if (cmd.startsWith(addressArgKey)) {
- conf.set(MASTER_ADDRESS,
- cmd.substring(addressArgKey.length()));
+ if (cmd.equals("start")) {
+ try {
+ (new Thread(new HMaster(conf))).start();
+ } catch (Throwable t) {
+ LOG.error( "Can not start master because "+
+ StringUtils.stringifyException(t) );
+ System.exit(-1);
+ }
+ break;
}
+
+ if (cmd.equals("stop")) {
+ try {
+ HClient client = new HClient(conf);
+ client.shutdown();
+ } catch (Throwable t) {
+ LOG.error( "Can not stop master because " +
+ StringUtils.stringifyException(t) );
+ System.exit(-1);
+ }
+ break;
+ }
+
+ // Print out usage if we get to here.
+ printUsageAndExit();
}
-
- new HMaster(conf);
}
-}
-
+}
\ No newline at end of file
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java Mon May 7 12:58:53 2007
@@ -20,10 +20,11 @@
import java.io.IOException;
-/*******************************************************************************
- * Clients interact with the HMasterInterface to gain access to meta-level HBase
- * functionality, like finding an HRegionServer and creating/destroying tables.
- ******************************************************************************/
+/**
+ * Clients interact with the HMasterInterface to gain access to meta-level
+ * HBase functionality, like finding an HRegionServer and creating/destroying
+ * tables.
+ */
public interface HMasterInterface extends VersionedProtocol {
public static final long versionID = 1L; // initial version
@@ -33,6 +34,11 @@
public void createTable(HTableDescriptor desc) throws IOException;
public void deleteTable(Text tableName) throws IOException;
+
+ /**
+ * Shutdown an HBase cluster.
+ */
+ public void shutdown() throws IOException;
//////////////////////////////////////////////////////////////////////////////
// These are the method calls of last resort when trying to find an HRegion
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Mon May 7 12:58:53 2007
@@ -75,11 +75,15 @@
throw new IOException("Snapshot in progress!");
}
if(memcache.size() == 0) {
- LOG.debug("memcache empty. Skipping snapshot");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("memcache empty. Skipping snapshot");
+ }
return retval;
}
- LOG.debug("starting memcache snapshot");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("starting memcache snapshot");
+ }
retval.memcacheSnapshot = memcache;
this.snapshot = memcache;
@@ -87,7 +91,9 @@
memcache = new TreeMap<HStoreKey, BytesWritable>();
retval.sequenceId = log.startCacheFlush();
- LOG.debug("memcache snapshot complete");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("memcache snapshot complete");
+ }
return retval;
@@ -108,7 +114,9 @@
if(snapshot == null) {
throw new IOException("Snapshot not present!");
}
- LOG.debug("deleting snapshot");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("deleting snapshot");
+ }
for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
it.hasNext(); ) {
@@ -121,7 +129,9 @@
}
this.snapshot = null;
- LOG.debug("snapshot deleted");
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("snapshot deleted");
+ }
} finally {
this.locker.writeLock().unlock();
@@ -133,15 +143,15 @@
*
* Operation uses a write lock.
*/
- public void add(Text row, TreeMap<Text, byte[]> columns, long timestamp) {
+ public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
this.locker.writeLock().lock();
try {
for(Iterator<Text> it = columns.keySet().iterator(); it.hasNext(); ) {
Text column = it.next();
- byte[] val = columns.get(column);
+ BytesWritable val = columns.get(column);
HStoreKey key = new HStoreKey(row, column, timestamp);
- memcache.put(key, new BytesWritable(val));
+ memcache.put(key, val);
}
} finally {
@@ -154,11 +164,11 @@
*
* We only need a readlock here.
*/
- public byte[][] get(HStoreKey key, int numVersions) {
- Vector<byte[]> results = new Vector<byte[]>();
+ public BytesWritable[] get(HStoreKey key, int numVersions) {
+ Vector<BytesWritable> results = new Vector<BytesWritable>();
this.locker.readLock().lock();
try {
- Vector<byte[]> result = get(memcache, key, numVersions-results.size());
+ Vector<BytesWritable> result = get(memcache, key, numVersions-results.size());
results.addAll(0, result);
for(int i = history.size()-1; i >= 0; i--) {
@@ -174,22 +184,22 @@
return null;
} else {
- return (byte[][]) results.toArray(new byte[results.size()][]);
+ return results.toArray(new BytesWritable[results.size()]);
}
} finally {
this.locker.readLock().unlock();
}
}
-
+
/**
* Return all the available columns for the given key. The key indicates a
* row and timestamp, but not a column name.
*
* The returned object should map column names to byte arrays (byte[]).
*/
- public TreeMap<Text, byte[]> getFull(HStoreKey key) throws IOException {
- TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+ public TreeMap<Text, BytesWritable> getFull(HStoreKey key) throws IOException {
+ TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
this.locker.readLock().lock();
try {
internalGetFull(memcache, key, results);
@@ -205,7 +215,7 @@
}
void internalGetFull(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key,
- TreeMap<Text, byte[]> results) {
+ TreeMap<Text, BytesWritable> results) {
SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(key);
@@ -216,7 +226,7 @@
if(results.get(itCol) == null
&& key.matchesWithoutColumn(itKey)) {
BytesWritable val = tailMap.get(itKey);
- results.put(itCol, val.get());
+ results.put(itCol, val);
} else if(key.getRow().compareTo(itKey.getRow()) > 0) {
break;
@@ -233,8 +243,8 @@
* TODO - This is kinda slow. We need a data structure that allows for
* proximity-searches, not just precise-matches.
*/
- Vector<byte[]> get(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, int numVersions) {
- Vector<byte[]> result = new Vector<byte[]>();
+ Vector<BytesWritable> get(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, int numVersions) {
+ Vector<BytesWritable> result = new Vector<BytesWritable>();
HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
@@ -242,7 +252,7 @@
HStoreKey itKey = it.next();
if(itKey.matchesRowCol(curKey)) {
- result.add(tailMap.get(itKey).get());
+ result.add(tailMap.get(itKey));
curKey.setVersion(itKey.getTimestamp() - 1);
}
@@ -256,7 +266,7 @@
/**
* Return a scanner over the keys in the HMemcache
*/
- public HScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
+ public HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
throws IOException {
return new HMemcacheScanner(timestamp, targetCols, firstRow);
@@ -280,13 +290,14 @@
locker.readLock().lock();
try {
this.backingMaps = new TreeMap[history.size() + 1];
- int i = 0;
- for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
- it.hasNext(); ) {
-
- backingMaps[i++] = it.next();
+
+ //NOTE: Since we iterate through the backing maps from 0 to n, we need
+ // to put the memcache first, the newest history second, ..., etc.
+
+ backingMaps[0] = memcache;
+ for(int i = history.size() - 1; i > 0; i--) {
+ backingMaps[i] = history.elementAt(i);
}
- backingMaps[backingMaps.length - 1] = memcache;
this.keyIterators = new Iterator[backingMaps.length];
this.keys = new HStoreKey[backingMaps.length];
@@ -295,7 +306,7 @@
// Generate list of iterators
HStoreKey firstKey = new HStoreKey(firstRow);
- for(i = 0; i < backingMaps.length; i++) {
+ for(int i = 0; i < backingMaps.length; i++) {
if(firstRow.getLength() != 0) {
keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
@@ -327,7 +338,8 @@
* @return - true if this is the first row
*/
boolean findFirstRow(int i, Text firstRow) {
- return ((firstRow.getLength() == 0) || (keys[i].getRow().equals(firstRow)));
+ return ((firstRow.getLength() == 0)
+ || (keys[i].getRow().toString().startsWith(firstRow.toString())));
}
/**
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Mon May 7 12:58:53 2007
@@ -28,7 +28,7 @@
public static final byte MSG_REGION_CLOSE = 2;
public static final byte MSG_REGION_MERGE = 3;
public static final byte MSG_CALL_SERVER_STARTUP = 4;
- public static final byte MSG_REGIONSERVER_ALREADY_RUNNING = 5;
+ public static final byte MSG_REGIONSERVER_STOP = 5;
public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
public static final byte MSG_REGION_CLOSE_AND_DELETE = 7;