You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/08/11 00:11:06 UTC
svn commit: r564780 [2/3] - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=564780&r1=564779&r2=564780
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri Aug 10 15:11:05 2007
@@ -31,7 +31,6 @@
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
-import java.util.SortedSet;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
@@ -40,6 +39,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -61,25 +61,25 @@
* There is only one HMaster for a single HBase deployment.
*/
public class HMaster implements HConstants, HMasterInterface,
- HMasterRegionInterface, Runnable {
+HMasterRegionInterface, Runnable {
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public long getProtocolVersion(String protocol,
- @SuppressWarnings("unused") long clientVersion)
- throws IOException {
+ @SuppressWarnings("unused") long clientVersion) throws IOException {
+
if (protocol.equals(HMasterInterface.class.getName())) {
return HMasterInterface.versionID;
+
} else if (protocol.equals(HMasterRegionInterface.class.getName())) {
return HMasterRegionInterface.versionID;
+
} else {
throw new IOException("Unknown protocol to name node: " + protocol);
}
}
static final Log LOG = LogFactory.getLog(HMaster.class.getName());
-
+
volatile boolean closed;
Path dir;
Configuration conf;
@@ -88,25 +88,18 @@
long threadWakeFrequency;
int numRetries;
long maxRegionOpenTime;
-
+
BlockingQueue<PendingOperation> msgQueue;
-
+
private Leases serverLeases;
private Server server;
private HServerAddress address;
-
+
HConnection connection;
-
+
long metaRescanInterval;
-
- volatile HServerAddress rootRegionLocation;
-
- /**
- * Columns in the 'meta' ROOT and META tables.
- */
- static final Text METACOLUMNS[] = {
- COLUMN_FAMILY
- };
+
+ final AtomicReference<HServerAddress> rootRegionLocation;
/**
* Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
@@ -156,22 +149,19 @@
* once.
*/
abstract class BaseScanner implements Runnable {
- private final Text FIRST_ROW = new Text();
protected boolean rootRegion;
protected final Text tableName;
-
+
protected abstract void initialScan();
protected abstract void maintenanceScan();
-
+
BaseScanner(final Text tableName) {
super();
this.tableName = tableName;
this.rootRegion = tableName.equals(ROOT_TABLE_NAME);
}
-
- /**
- * {@inheritDoc}
- */
+
+ /** {@inheritDoc} */
public void run() {
initialScan();
while (!closed) {
@@ -194,14 +184,18 @@
long scannerId = -1L;
LOG.info(Thread.currentThread().getName() + " scanning meta region " +
region.regionName + " on " + region.server.toString());
+
// Array to hold list of split parents found. Scan adds to list. After
// scan we go check if parents can be removed.
+
Map<HRegionInfo, TreeMap<Text, byte[]>> splitParents =
new HashMap<HRegionInfo, TreeMap<Text, byte[]>>();
try {
regionServer = connection.getHRegionConnection(region.server);
- scannerId = regionServer.openScanner(region.regionName, METACOLUMNS,
- FIRST_ROW, System.currentTimeMillis(), null);
+ scannerId =
+ regionServer.openScanner(region.regionName, COLUMN_FAMILY_ARRAY,
+ EMPTY_START_ROW, System.currentTimeMillis(), null);
+
int numberOfRegionsFound = 0;
while (true) {
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
@@ -209,13 +203,16 @@
if (values.length == 0) {
break;
}
+
for (int i = 0; i < values.length; i++) {
results.put(values[i].getKey().getColumn(), values[i].getData());
}
+
HRegionInfo info = HRegion.getRegionInfo(results);
String serverName = HRegion.getServerName(results);
long startCode = HRegion.getStartCode(results);
- if(LOG.isDebugEnabled()) {
+
+ if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + " scanner: " +
Long.valueOf(scannerId) + " regioninfo: {" + info.toString() +
"}, server: " + serverName + ", startCode: " + startCode);
@@ -223,17 +220,20 @@
// Note Region has been assigned.
checkAssigned(info, serverName, startCode);
+
if (isSplitParent(info)) {
splitParents.put(info, results);
}
numberOfRegionsFound += 1;
}
- if(rootRegion) {
+ if (rootRegion) {
numberOfMetaRegions.set(numberOfRegionsFound);
}
+
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+
if (e instanceof UnknownScannerException) {
// Reset scannerId so we do not try closing a scanner the other side
// has lost account of: prevents duplicated stack trace out of the
@@ -242,10 +242,11 @@
}
}
throw e;
+
} finally {
try {
if (scannerId != -1L && regionServer != null) {
- regionServer.close(scannerId);
+ regionServer.close(scannerId);
}
} catch (IOException e) {
if (e instanceof RemoteException) {
@@ -254,24 +255,28 @@
LOG.error("Closing scanner", e);
}
}
- // Scan is finished. Take a look at split parents to see if any
- // we can clean up.
+
+ // Scan is finished. Take a look at split parents to see if any we can clean up.
+
if (splitParents.size() > 0) {
for (Map.Entry<HRegionInfo, TreeMap<Text, byte[]>> e:
- splitParents.entrySet()) {
+ splitParents.entrySet()) {
+
TreeMap<Text, byte[]> results = e.getValue();
cleanupSplits(e.getKey(),
- HRegion.getSplit(results, HRegion.COL_SPLITA),
- HRegion.getSplit(results, HRegion.COL_SPLITB));
+ HRegion.getSplit(results, HRegion.COL_SPLITA),
+ HRegion.getSplit(results, HRegion.COL_SPLITB));
}
}
LOG.info(Thread.currentThread().getName() + " scan of meta region " +
region.regionName + " complete");
}
-
+
private boolean isSplitParent(final HRegionInfo info) {
boolean result = false;
+
// Skip if not a split region.
+
if (!info.isSplit()) {
return result;
}
@@ -280,7 +285,7 @@
}
return true;
}
-
+
/*
* @param info
* @param splitA
@@ -290,52 +295,62 @@
* @throws IOException
*/
private boolean cleanupSplits(final HRegionInfo info,
- final HRegionInfo splitA, final HRegionInfo splitB)
+ final HRegionInfo splitA, final HRegionInfo splitB)
throws IOException {
+
boolean result = false;
if (LOG.isDebugEnabled()) {
LOG.debug("Checking " + info.getRegionName() + " to see if daughter " +
- "splits still hold references");
+ "splits still hold references");
}
boolean noReferencesA = splitA == null;
boolean noReferencesB = splitB == null;
+
if (!noReferencesA) {
- noReferencesA = hasReferences(info.getRegionName(), splitA,
- HRegion.COL_SPLITA);
+ noReferencesA =
+ hasReferences(info.getRegionName(), splitA, HRegion.COL_SPLITA);
}
if (!noReferencesB) {
- noReferencesB = hasReferences(info.getRegionName(), splitB,
- HRegion.COL_SPLITB);
+ noReferencesB =
+ hasReferences(info.getRegionName(), splitB, HRegion.COL_SPLITB);
}
if (!(noReferencesA && noReferencesB)) {
+
// No references. Remove this item from table and deleted region on
// disk.
+
LOG.info("Deleting region " + info.getRegionName() +
- " because daughter splits no longer hold references");
+ " because daughter splits no longer hold references");
+
HRegion.deleteRegion(fs, dir, info.getRegionName());
HRegion.removeRegionFromMETA(conf, this.tableName,
- info.getRegionName());
+ info.getRegionName());
+
result = true;
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("Done checking " + info.getRegionName() + ": splitA: " +
- noReferencesA + ", splitB: "+ noReferencesB);
+ noReferencesA + ", splitB: "+ noReferencesB);
}
return result;
}
-
+
protected boolean hasReferences(final Text regionName,
- final HRegionInfo split, final Text column)
- throws IOException {
+ final HRegionInfo split, final Text column) throws IOException {
+
boolean result =
HRegion.hasReferences(fs, fs.makeQualified(dir), split);
+
if (result) {
return result;
}
+
if (LOG.isDebugEnabled()) {
LOG.debug(split.getRegionName().toString()
- +" no longer has references to " + regionName.toString());
+ +" no longer has references to " + regionName.toString());
}
+
HTable t = new HTable(conf, this.tableName);
try {
HRegion.removeSplitFromMETA(t, regionName, column);
@@ -344,56 +359,63 @@
}
return result;
}
-
+
protected void checkAssigned(final HRegionInfo info,
final String serverName, final long startCode) {
+
// Skip region - if ...
- if(info.offLine // offline
- || killedRegions.contains(info.regionName) // queued for offline
- || regionsToDelete.contains(info.regionName)) { // queued for delete
+
+ if(info.offLine // offline
+ || killedRegions.contains(info.regionName) // queued for offline
+ || regionsToDelete.contains(info.regionName)) { // queued for delete
unassignedRegions.remove(info.regionName);
assignAttempts.remove(info.regionName);
+
if(LOG.isDebugEnabled()) {
- LOG.debug("not assigning region: " + info.regionName +
- " (offline: " + info.isOffline() + ", split: " + info.isSplit() +
- ")");
+ LOG.debug("not assigning region: " + info.regionName + " (offline: " +
+ info.isOffline() + ", split: " + info.isSplit() + ")");
}
return;
}
-
+
HServerInfo storedInfo = null;
- if(serverName != null) {
- TreeMap<Text, HRegionInfo> regionsToKill = killList.get(serverName);
- if(regionsToKill != null && regionsToKill.containsKey(info.regionName)) {
+ if (serverName != null) {
+ Map<Text, HRegionInfo> regionsToKill = killList.get(serverName);
+ if (regionsToKill != null &&
+ regionsToKill.containsKey(info.regionName)) {
+
// Skip if region is on kill list
+
if(LOG.isDebugEnabled()) {
- LOG.debug("not assigning region (on kill list): " +
- info.regionName);
+ LOG.debug("not assigning region (on kill list): " + info.regionName);
}
return;
}
- storedInfo = serversToServerInfo.get(serverName);
+ synchronized (serversToServerInfo) {
+ storedInfo = serversToServerInfo.get(serverName);
+ }
}
- if( !(unassignedRegions.containsKey(info.regionName) ||
- pendingRegions.contains(info.regionName))
+ if (!(unassignedRegions.containsKey(info.regionName) ||
+ pendingRegions.contains(info.regionName))
&& (storedInfo == null || storedInfo.getStartCode() != startCode)) {
+
// The current assignment is no good; load the region.
+
unassignedRegions.put(info.regionName, info);
assignAttempts.put(info.regionName, Long.valueOf(0L));
+
} else if (LOG.isDebugEnabled()) {
- LOG.debug("Finished if " + info.getRegionName() + " is assigned: " +
+ LOG.debug("Finished if " + info.getRegionName() + " is assigned: " +
"unassigned: " + unassignedRegions.containsKey(info.regionName) +
", pending: " + pendingRegions.contains(info.regionName));
}
}
}
-
+
volatile boolean rootScanned;
-
- /**
- * Scanner for the <code>ROOT</code> HRegion.
- */
+
+ /** Scanner for the <code>ROOT</code> HRegion. */
class RootScanner extends BaseScanner {
/** Constructor */
public RootScanner() {
@@ -403,14 +425,17 @@
private void scanRoot() {
int tries = 0;
while (!closed && tries < numRetries) {
- while(!closed && rootRegionLocation == null) {
- // rootRegionLocation will be filled in when we get an 'open region'
- // regionServerReport message from the HRegionServer that has been
- // allocated the ROOT region below.
- try {
- Thread.sleep(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue
+ synchronized (rootRegionLocation) {
+ while(!closed && rootRegionLocation.get() == null) {
+ // rootRegionLocation will be filled in when we get an 'open region'
+ // regionServerReport message from the HRegionServer that has been
+ // allocated the ROOT region below.
+
+ try {
+ rootRegionLocation.wait();
+ } catch (InterruptedException e) {
+ // continue
+ }
}
}
if (closed) {
@@ -418,15 +443,20 @@
}
try {
- synchronized(rootScannerLock) { // Don't interrupt us while we're working
- scanRegion(new MetaRegion(rootRegionLocation,
+ // Don't interrupt us while we're working
+
+ synchronized(rootScannerLock) {
+ scanRegion(new MetaRegion(rootRegionLocation.get(),
HGlobals.rootRegionInfo.regionName, null));
}
break;
+
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
+
} catch (IOException ex) {
e = ex;
}
@@ -439,6 +469,8 @@
}
}
if (!closed) {
+ // sleep before retry
+
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
@@ -447,19 +479,19 @@
}
}
}
-
+
@Override
protected void initialScan() {
scanRoot();
rootScanned = true;
}
-
+
@Override
protected void maintenanceScan() {
scanRoot();
}
}
-
+
private RootScanner rootScanner;
private Thread rootScannerThread;
Integer rootScannerLock = new Integer(0);
@@ -469,24 +501,20 @@
HServerAddress server;
Text regionName;
Text startKey;
-
+
MetaRegion(HServerAddress server, Text regionName, Text startKey) {
this.server = server;
this.regionName = regionName;
this.startKey = startKey;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public boolean equals(Object o) {
return this.compareTo(o) == 0;
}
-
- /**
- * {@inheritDoc}
- */
+
+ /** {@inheritDoc} */
@Override
public int hashCode() {
int result = this.regionName.hashCode();
@@ -496,24 +524,21 @@
// Comparable
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public int compareTo(Object o) {
MetaRegion other = (MetaRegion)o;
-
+
int result = this.regionName.compareTo(other.regionName);
if(result == 0) {
result = this.startKey.compareTo(other.startKey);
}
return result;
}
-
}
/** Set by root scanner to indicate the number of meta regions */
AtomicInteger numberOfMetaRegions;
-
+
/** Work for the meta scanner is queued up here */
BlockingQueue<MetaRegion> metaRegionsToScan;
@@ -522,7 +547,7 @@
/** Set by meta scanner after initial scan */
volatile boolean initialMetaScanComplete;
-
+
/**
* MetaScanner <code>META</code> table.
*
@@ -537,11 +562,11 @@
public MetaScanner() {
super(HConstants.META_TABLE_NAME);
}
-
+
private void scanOneMetaRegion(MetaRegion region) {
int tries = 0;
while (!closed && tries < numRetries) {
- while (!closed && !rootScanned && rootRegionLocation == null) {
+ while (!closed && !rootScanned && rootRegionLocation.get() == null) {
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
@@ -553,8 +578,9 @@
}
try {
+ // Don't interrupt us while we're working
+
synchronized (metaScannerLock) {
- // Don't interrupt us while we're working
scanRegion(region);
onlineMetaRegions.put(region.startKey, region);
}
@@ -565,6 +591,7 @@
try {
e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e);
+
} catch (IOException ex) {
e = ex;
}
@@ -577,6 +604,8 @@
}
}
if (!closed) {
+ // sleep before retry
+
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
@@ -603,7 +632,7 @@
}
initialMetaScanComplete = true;
}
-
+
@Override
protected void maintenanceScan() {
ArrayList<MetaRegion> regions = new ArrayList<MetaRegion>();
@@ -613,29 +642,31 @@
}
metaRegionsScanned();
}
-
+
/**
* Called by the meta scanner when it has completed scanning all meta
* regions. This wakes up any threads that were waiting for this to happen.
*/
private synchronized boolean metaRegionsScanned() {
- if (!rootScanned || numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+ if (!rootScanned ||
+ numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+
return false;
}
LOG.info("all meta regions scanned");
notifyAll();
return true;
}
-
+
/**
* Other threads call this method to wait until all the meta regions have
* been scanned.
*/
synchronized boolean waitForMetaRegionsOrClose() {
while (!closed) {
- if (rootScanned
- && numberOfMetaRegions.get() == onlineMetaRegions.size()) {
-
+ if (rootScanned &&
+ numberOfMetaRegions.get() == onlineMetaRegions.size()) {
+
break;
}
@@ -660,39 +691,49 @@
* We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
* set of all known valid regions.
*/
- SortedMap<Text, HRegionInfo> unassignedRegions;
+ Map<Text, HRegionInfo> unassignedRegions;
/**
* The 'assignAttempts' table maps from regions to a timestamp that indicates
* the last time we *tried* to assign the region to a RegionServer. If the
* timestamp is out of date, then we can try to reassign it.
*/
- SortedMap<Text, Long> assignAttempts;
+ Map<Text, Long> assignAttempts;
/**
* Regions that have been assigned, and the server has reported that it has
* started serving it, but that we have not yet recorded in the meta table.
*/
- SortedSet<Text> pendingRegions;
-
+ Set<Text> pendingRegions;
+
/**
* The 'killList' is a list of regions that are going to be closed, but not
* reopened.
*/
- SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
-
+ Map<String, HashMap<Text, HRegionInfo>> killList;
+
/** 'killedRegions' contains regions that are in the process of being closed */
- SortedSet<Text> killedRegions;
+ Set<Text> killedRegions;
/**
* 'regionsToDelete' contains regions that need to be deleted, but cannot be
* until the region server closes it
*/
- SortedSet<Text> regionsToDelete;
-
- /** The map of known server names to server info */
- SortedMap<String, HServerInfo> serversToServerInfo =
- Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
+ Set<Text> regionsToDelete;
+
+ /**
+ * The map of known server names to server info
+ *
+ * Access to this map and loadToServers and serversToLoad must be synchronized
+ * on this object
+ */
+ Map<String, HServerInfo> serversToServerInfo;
+
+ /** SortedMap server load -> Set of server names */
+ SortedMap<HServerLoad, Set<String>> loadToServers;
+
+ /** Map of server names -> server load */
+ Map<String, HServerLoad> serversToLoad;
/** Build the HMaster out of a raw configuration item.
*
@@ -722,7 +763,7 @@
this.rand = new Random();
// Make sure the root directory exists!
-
+
if(! fs.exists(dir)) {
fs.mkdirs(dir);
}
@@ -730,99 +771,111 @@
Path rootRegionDir =
HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName);
LOG.info("Root region dir: " + rootRegionDir.toString());
- if(! fs.exists(rootRegionDir)) {
+
+ if (!fs.exists(rootRegionDir)) {
LOG.info("bootstrap: creating ROOT and first META regions");
try {
HRegion root = HRegion.createHRegion(0L, HGlobals.rootTableDesc,
- this.dir, this.conf);
+ this.dir, this.conf);
HRegion meta = HRegion.createHRegion(1L, HGlobals.metaTableDesc,
this.dir, this.conf);
+
// Add first region from the META table to the ROOT region.
+
HRegion.addRegionToMETA(root, meta);
root.close();
root.getLog().closeAndDelete();
meta.close();
meta.getLog().closeAndDelete();
+
} catch (IOException e) {
if (e instanceof RemoteException) {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
}
- LOG.error("", e);
+ LOG.error("bootstrap", e);
}
}
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
- this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
+ this.maxRegionOpenTime =
+ conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
+
this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
+
this.serverLeases = new Leases(
- conf.getLong("hbase.master.lease.period", 30 * 1000),
- conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+ conf.getLong("hbase.master.lease.period", 30 * 1000),
+ conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+
this.server = RPC.getServer(this, address.getBindAddress(),
- address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
- false, conf);
+ address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+ false, conf);
// The rpc-server port can be ephemeral... ensure we have the correct info
+
this.address = new HServerAddress(server.getListenerAddress());
conf.set(MASTER_ADDRESS, address.toString());
-
+
this.connection = HConnectionManager.getConnection(conf);
-
- this.metaRescanInterval
- = conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
+
+ this.metaRescanInterval =
+ conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
// The root region
-
- this.rootRegionLocation = null;
+
+ this.rootRegionLocation = new AtomicReference<HServerAddress>();
this.rootScanned = false;
this.rootScanner = new RootScanner();
this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
-
+
// Scans the meta table
this.numberOfMetaRegions = new AtomicInteger();
this.metaRegionsToScan = new LinkedBlockingQueue<MetaRegion>();
-
+
this.onlineMetaRegions =
Collections.synchronizedSortedMap(new TreeMap<Text, MetaRegion>());
-
+
this.initialMetaScanComplete = false;
-
+
this.metaScanner = new MetaScanner();
this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
this.unassignedRegions =
- Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
-
- this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo);
-
+ Collections.synchronizedMap(new HashMap<Text, HRegionInfo>());
+
+ this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
+ HGlobals.rootRegionInfo);
+
this.assignAttempts =
- Collections.synchronizedSortedMap(new TreeMap<Text, Long>());
-
- this.pendingRegions =
- Collections.synchronizedSortedSet(new TreeSet<Text>());
-
+ Collections.synchronizedMap(new HashMap<Text, Long>());
+
this.assignAttempts.put(HGlobals.rootRegionInfo.regionName,
- Long.valueOf(0L));
+ Long.valueOf(0L));
+
+ this.pendingRegions =
+ Collections.synchronizedSet(new HashSet<Text>());
this.killList =
- Collections.synchronizedSortedMap(
- new TreeMap<String, TreeMap<Text, HRegionInfo>>());
-
+ Collections.synchronizedMap(
+ new HashMap<String, HashMap<Text, HRegionInfo>>());
+
this.killedRegions =
- Collections.synchronizedSortedSet(new TreeSet<Text>());
-
+ Collections.synchronizedSet(new HashSet<Text>());
+
this.regionsToDelete =
- Collections.synchronizedSortedSet(new TreeSet<Text>());
-
+ Collections.synchronizedSet(new HashSet<Text>());
+
+ this.serversToServerInfo = new HashMap<String, HServerInfo>();
+ this.loadToServers = new TreeMap<HServerLoad, Set<String>>();
+ this.serversToLoad = new HashMap<String, HServerLoad>();
+
// We're almost open for business
this.closed = false;
LOG.info("HMaster initialized on " + this.address.toString());
}
-
- /**
- * @return HServerAddress of the master server
- */
+
+ /** @return HServerAddress of the master server */
public HServerAddress getMasterAddress() {
return address;
}
@@ -837,24 +890,33 @@
// Start the server last so everything else is running before we start
// receiving requests
+
this.server.start();
+
} catch (IOException e) {
if (e instanceof RemoteException) {
try {
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+
} catch (IOException ex) {
- LOG.warn("", ex);
+ LOG.warn("thread start", ex);
}
}
+
// Something happened during startup. Shut things down.
+
this.closed = true;
LOG.error("Failed startup", e);
}
- // Main processing loop
+ /*
+ * Main processing loop
+ */
+
for (PendingOperation op = null; !closed; ) {
try {
op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+
} catch (InterruptedException e) {
// continue
}
@@ -865,15 +927,18 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Main processing loop: " + op.toString());
}
+
if (!op.process()) {
// Operation would have blocked because not all meta regions are
// online. This could cause a deadlock, because this thread is waiting
// for the missing meta region(s) to come back online, but since it
// is waiting, it cannot process the meta region online operation it
// is waiting for. So put this operation back on the queue for now.
+
if (msgQueue.size() == 0) {
// The queue is currently empty so wait for a while to see if what
// we need comes in first
+
try {
Thread.sleep(threadWakeFrequency);
} catch (InterruptedException e) {
@@ -886,16 +951,18 @@
}
msgQueue.put(op);
} catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was " +
- "interrupted.", e);
+ throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
}
+
} catch (Exception ex) {
if (ex instanceof RemoteException) {
try {
- ex = RemoteExceptionHandler.decodeRemoteException((RemoteException) ex);
+ ex = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) ex);
+
} catch (IOException e) {
- LOG.warn("", e);
+ LOG.warn("main processing loop: " + op.toString(), e);
}
}
LOG.warn("Processing pending operations: " + op.toString(), ex);
@@ -907,53 +974,44 @@
}
}
letRegionServersShutdown();
-
+
/*
* Clean up and close up shop
*/
- // Wake other threads so they notice the close
-
synchronized(rootScannerLock) {
- rootScannerThread.interrupt();
+ rootScannerThread.interrupt(); // Wake root scanner
}
synchronized(metaScannerLock) {
- metaScannerThread.interrupt();
+ metaScannerThread.interrupt(); // Wake meta scanner
}
- server.stop(); // Stop server
- serverLeases.close(); // Turn off the lease monitor
-
+ server.stop(); // Stop server
+ serverLeases.close(); // Turn off the lease monitor
+
// Join up with all threads
-
+
try {
- // Wait for the root scanner to finish.
- rootScannerThread.join();
+ rootScannerThread.join(); // Wait for the root scanner to finish.
} catch (Exception iex) {
- // Print if ever there is an interrupt (Just for kicks. Remove if it
- // ever happens).
LOG.warn("root scanner", iex);
}
try {
- // Join the thread till it finishes.
- metaScannerThread.join();
+ metaScannerThread.join(); // Wait for meta scanner to finish.
} catch(Exception iex) {
- // Print if ever there is an interrupt (Just for kicks. Remove if it
- // ever happens).
LOG.warn("meta scanner", iex);
}
try {
- // Join until its finished. TODO: Maybe do in parallel in its own thread
- // as is done in TaskTracker if its taking a long time to go down.
- server.join();
+ // TODO: Maybe do in parallel in its own thread as is done in TaskTracker
+ // if its taking a long time to go down.
+
+ server.join(); // Wait for server to finish.
} catch(InterruptedException iex) {
- // Print if ever there is an interrupt (Just for kicks. Remove if it
- // ever happens).
LOG.warn("server", iex);
}
-
+
LOG.info("HMaster main thread exiting");
}
-
+
/*
* Wait on regionservers to report in
* with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice
@@ -962,36 +1020,48 @@
* by remote region servers have expired.
*/
private void letRegionServersShutdown() {
- while (this.serversToServerInfo.size() > 0) {
- LOG.info("Waiting on following regionserver(s) to go down (or " +
- "region server lease expiration, whichever happens first): " +
- this.serversToServerInfo.values());
- try {
- Thread.sleep(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue
+ synchronized (serversToServerInfo) {
+ while (this.serversToServerInfo.size() > 0) {
+ LOG.info("Waiting on following regionserver(s) to go down (or " +
+ "region server lease expiration, whichever happens first): " +
+ this.serversToServerInfo.values());
+ try {
+ serversToServerInfo.wait(threadWakeFrequency);
+ } catch (InterruptedException e) {
+ // continue
+ }
}
}
}
-
- //////////////////////////////////////////////////////////////////////////////
- // HMasterRegionInterface
- //////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
+
+ /*
+ * HMasterRegionInterface
*/
+
+ /** {@inheritDoc} */
@SuppressWarnings("unused")
- public void regionServerStartup(HServerInfo serverInfo)
- throws IOException {
+ public void regionServerStartup(HServerInfo serverInfo) throws IOException {
String s = serverInfo.getServerAddress().toString().trim();
HServerInfo storedInfo = null;
LOG.info("received start message from: " + s);
-
+
// If we get the startup message but there's an old server by that
// name, then we can timeout the old one right away and register
// the new one.
- storedInfo = serversToServerInfo.remove(s);
+
+ synchronized (serversToServerInfo) {
+ storedInfo = serversToServerInfo.remove(s);
+ HServerLoad load = serversToLoad.remove(s);
+
+ if (load != null) {
+ Set<String> servers = loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(s);
+ loadToServers.put(load, servers);
+ }
+ }
+ serversToServerInfo.notifyAll();
+ }
if (storedInfo != null && !closed) {
try {
msgQueue.put(new PendingServerShutdown(storedInfo));
@@ -1001,36 +1071,49 @@
}
// Either way, record the new server
- serversToServerInfo.put(s, serverInfo);
- if(!closed) {
- long serverLabel = getServerLabel(s);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Created lease for " + serverLabel);
+
+ synchronized (serversToServerInfo) {
+ HServerLoad load = new HServerLoad();
+ serverInfo.setLoad(load);
+ serversToServerInfo.put(s, serverInfo);
+ serversToLoad.put(s, load);
+ Set<String> servers = loadToServers.get(load);
+ if (servers == null) {
+ servers = new HashSet<String>();
}
+ servers.add(s);
+ loadToServers.put(load, servers);
+ }
+
+ if (!closed) {
+ long serverLabel = getServerLabel(s);
serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
}
}
-
+
private long getServerLabel(final String s) {
return s.hashCode();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
throws IOException {
- String s = serverInfo.getServerAddress().toString().trim();
- long serverLabel = getServerLabel(s);
+ String serverName = serverInfo.getServerAddress().toString().trim();
+ long serverLabel = getServerLabel(serverName);
+
if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
-
+
// HRegionServer is shutting down. Cancel the server's lease.
- if (cancelLease(s, serverLabel)) {
+ // Note that cancelling the server's lease takes care of updating
+ // serversToServerInfo, etc.
+
+ if (cancelLease(serverName, serverLabel)) {
// Only process the exit message if the server still has a lease.
// Otherwise we could end up processing the server exit twice.
- LOG.info("Region server " + s + ": MSG_REPORT_EXITING");
-
+
+ LOG.info("Region server " + serverName + ": MSG_REPORT_EXITING");
+
// Get all the regions the server was serving reassigned
// (if we are not shutting down).
@@ -1039,7 +1122,7 @@
HRegionInfo info = msgs[i].getRegionInfo();
if (info.tableDesc.getName().equals(ROOT_TABLE_NAME)) {
- rootRegionLocation = null;
+ rootRegionLocation.set(null);
} else if (info.tableDesc.getName().equals(META_TABLE_NAME)) {
onlineMetaRegions.remove(info.getStartKey());
@@ -1050,28 +1133,32 @@
}
}
}
-
+
// We don't need to return anything to the server because it isn't
// going to do any more work.
return new HMsg[0];
}
-
+
if (closed) {
// Tell server to shut down if we are shutting down. This should
// happen after check of MSG_REPORT_EXITING above, since region server
// will send us one of these messages after it gets MSG_REGIONSERVER_STOP
+
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
}
- HServerInfo storedInfo = serversToServerInfo.get(s);
+ HServerInfo storedInfo;
+ synchronized (serversToServerInfo) {
+ storedInfo = serversToServerInfo.get(serverName);
+ }
if(storedInfo == null) {
if(LOG.isDebugEnabled()) {
- LOG.debug("received server report from unknown server: " + s);
+ LOG.debug("received server report from unknown server: " + serverName);
}
// The HBaseMaster may have been restarted.
// Tell the RegionServer to start over and call regionServerStartup()
-
+
return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)};
} else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
@@ -1086,60 +1173,111 @@
// The answer is to ask A to shut down for good.
if (LOG.isDebugEnabled()) {
- LOG.debug("region server race condition detected: " + s);
+ LOG.debug("region server race condition detected: " + serverName);
}
+ cancelLease(serverName, serverLabel);
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
-
+
} else {
// All's well. Renew the server's lease.
// This will always succeed; otherwise, the fetch of serversToServerInfo
// would have failed above.
-
+
serverLeases.renewLease(serverLabel, serverLabel);
- // Refresh the info object
- serversToServerInfo.put(s, serverInfo);
+ // Refresh the info object and the load information
- // Next, process messages for this server
- return processMsgs(serverInfo, msgs);
- }
- }
+ synchronized (serversToServerInfo) {
+ serversToServerInfo.put(serverName, serverInfo);
- /** cancel a server's lease */
- private boolean cancelLease(final String serverName, final long serverLabel) {
- boolean leaseCancelled = false;
- if (serversToServerInfo.remove(serverName) != null) {
- // Only cancel lease once.
- // This method can be called a couple of times during shutdown.
- LOG.info("Cancelling lease for " + serverName);
- serverLeases.cancelLease(serverLabel, serverLabel);
- leaseCancelled = true;
- }
+ HServerLoad load = serversToLoad.get(serverName);
+ if (load != null && !load.equals(serverInfo.getLoad())) {
+ // We have previous information about the load on this server
+ // and the load on this server has changed
+
+ Set<String> servers = loadToServers.get(load);
+
+ // Note that servers should never be null because loadToServers
+ // and serversToLoad are manipulated in pairs
+
+ servers.remove(serverName);
+ loadToServers.put(load, servers);
+ }
+
+ // Set the current load information
+
+ load = serverInfo.getLoad();
+ serversToLoad.put(serverName, load);
+ Set<String> servers = loadToServers.get(load);
+ if (servers == null) {
+ servers = new HashSet<String>();
+ }
+ servers.add(serverName);
+ loadToServers.put(load, servers);
+ }
+
+ // Next, process messages for this server
+ return processMsgs(serverInfo, msgs);
+ }
+ }
+
+ /** Cancel a server's lease and update its load information */
+ private boolean cancelLease(final String serverName, final long serverLabel) {
+ boolean leaseCancelled = false;
+ synchronized (serversToServerInfo) {
+ HServerInfo info = serversToServerInfo.remove(serverName);
+ if (info != null) {
+ // Only cancel lease and update load information once.
+ // This method can be called a couple of times during shutdown.
+
+ LOG.info("Cancelling lease for " + serverName);
+ serverLeases.cancelLease(serverLabel, serverLabel);
+ leaseCancelled = true;
+
+ // update load information
+
+ HServerLoad load = serversToLoad.remove(serverName);
+ if (load != null) {
+ Set<String> servers = loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(serverName);
+ loadToServers.put(load, servers);
+ }
+ }
+ }
+ serversToServerInfo.notifyAll();
+ }
return leaseCancelled;
}
-
- /** Process all the incoming messages from a server that's contacted us. */
- private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[]) throws IOException {
- ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
-
- TreeMap<Text, HRegionInfo> regionsToKill =
- killList.remove(info.getServerAddress().toString());
+
+ /**
+ * Process all the incoming messages from a server that's contacted us.
+ *
+ * Note that we never need to update the server's load information because
+ * that has already been done in regionServerReport.
+ */
+ private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[])
+ throws IOException {
+ ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
+ String serverName = info.getServerAddress().toString();
+ HashMap<Text, HRegionInfo> regionsToKill = killList.remove(serverName);
+
// Get reports on what the RegionServer did.
-
- for(int i = 0; i < incomingMsgs.length; i++) {
+
+ for (int i = 0; i < incomingMsgs.length; i++) {
HRegionInfo region = incomingMsgs[i].getRegionInfo();
- switch(incomingMsgs[i].getMsg()) {
+ switch (incomingMsgs[i].getMsg()) {
case HMsg.MSG_REPORT_OPEN:
HRegionInfo regionInfo = unassignedRegions.get(region.regionName);
- if(regionInfo == null) {
+ if (regionInfo == null) {
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("region server " + info.getServerAddress().toString()
+ " should not have opened region " + region.regionName);
}
@@ -1156,19 +1294,29 @@
region.regionName);
// Remove from unassigned list so we don't assign it to someone else
+
unassignedRegions.remove(region.regionName);
assignAttempts.remove(region.regionName);
- if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) {
+
+ if (region.regionName.compareTo(
+ HGlobals.rootRegionInfo.regionName) == 0) {
+
// Store the Root Region location (in memory)
- rootRegionLocation = new HServerAddress(info.getServerAddress());
+
+ synchronized (rootRegionLocation) {
+ rootRegionLocation.set(new HServerAddress(info.getServerAddress()));
+ rootRegionLocation.notifyAll();
+ }
break;
}
// Note that the table has been assigned and is waiting for the meta
// table to be updated.
+
pendingRegions.add(region.regionName);
-
+
// Queue up an update to note the region location.
+
try {
msgQueue.put(new PendingOpenReport(info, region));
} catch (InterruptedException e) {
@@ -1181,8 +1329,12 @@
LOG.info(info.getServerAddress().toString() + " no longer serving " +
region.regionName);
- if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region
- rootRegionLocation = null;
+ if (region.regionName.compareTo(
+ HGlobals.rootRegionInfo.regionName) == 0) {
+
+ // Root region
+
+ rootRegionLocation.set(null);
unassignedRegions.put(region.regionName, region);
assignAttempts.put(region.regionName, Long.valueOf(0L));
@@ -1190,45 +1342,53 @@
boolean reassignRegion = true;
boolean deleteRegion = false;
- if(killedRegions.remove(region.regionName)) {
+ if (killedRegions.remove(region.regionName)) {
reassignRegion = false;
}
-
- if(regionsToDelete.remove(region.regionName)) {
+
+ if (regionsToDelete.remove(region.regionName)) {
reassignRegion = false;
deleteRegion = true;
}
+
+ // NOTE: we cannot put the region into unassignedRegions as that
+ // could create a race with the pending close if it gets
+ // reassigned before the close is processed.
+
unassignedRegions.remove(region.regionName);
assignAttempts.remove(region.regionName);
try {
- msgQueue.put(new PendingCloseReport(region, reassignRegion, deleteRegion));
+ msgQueue.put(new PendingCloseReport(region, reassignRegion,
+ deleteRegion));
+
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
-
- // NOTE: we cannot put the region into unassignedRegions as that
- // could create a race with the pending close if it gets
- // reassigned before the close is processed.
-
}
break;
case HMsg.MSG_REPORT_SPLIT:
// A region has split.
+
HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
+
LOG.info("region " + region.regionName + " split. New regions are: "
- + newRegionA.regionName + ", " + newRegionB.regionName);
- if(region.tableDesc.getName().equals(META_TABLE_NAME)) {
+ + newRegionA.regionName + ", " + newRegionB.regionName);
+
+ if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
// A meta region has split.
+
onlineMetaRegions.remove(region.getStartKey());
onlineMetaRegions.put(newRegionA.getStartKey(),
- new MetaRegion(info.getServerAddress(), newRegionA.getRegionName(),
- newRegionA.getStartKey()));
+ new MetaRegion(info.getServerAddress(),
+ newRegionA.getRegionName(), newRegionA.getStartKey()));
+
onlineMetaRegions.put(newRegionB.getStartKey(),
- new MetaRegion(info.getServerAddress(), newRegionB.getRegionName(),
- newRegionB.getStartKey()));
+ new MetaRegion(info.getServerAddress(),
+ newRegionB.getRegionName(), newRegionB.getStartKey()));
+
numberOfMetaRegions.incrementAndGet();
}
break;
@@ -1241,56 +1401,183 @@
}
// Process the kill list
- if(regionsToKill != null) {
- for(HRegionInfo i: regionsToKill.values()) {
+
+ if (regionsToKill != null) {
+ for (HRegionInfo i: regionsToKill.values()) {
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
killedRegions.add(i.regionName);
}
}
// Figure out what the RegionServer ought to do, and write back.
- if(unassignedRegions.size() > 0) {
- // Open new regions as necessary
- int targetForServer = (int) Math.ceil(unassignedRegions.size()
- / (1.0 * serversToServerInfo.size()));
-
- int counter = 0;
- long now = System.currentTimeMillis();
- for (Text curRegionName: unassignedRegions.keySet()) {
- HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
- long assignedTime = assignAttempts.get(curRegionName);
- if (now - assignedTime > maxRegionOpenTime) {
- LOG.info("assigning region " + regionInfo.regionName + " to server " +
- info.getServerAddress().toString());
- returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
+ assignRegions(info, serverName, returnMsgs);
+ return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
+ }
+
+ /**
+ * Assigns regions to region servers attempting to balance the load across
+ * all region servers
+ *
+ * @param info
+ * @param serverName
+ * @param returnMsgs
+ */
+ private void assignRegions(HServerInfo info, String serverName,
+ ArrayList<HMsg> returnMsgs) {
+
+ long now = System.currentTimeMillis();
+ TreeSet<Text> regionsToAssign = new TreeSet<Text>();
+ for (Map.Entry<Text, Long> e: assignAttempts.entrySet()) {
+ if (now - e.getValue() > maxRegionOpenTime) {
+ regionsToAssign.add(e.getKey());
+ }
+ }
+ int nRegionsToAssign = regionsToAssign.size();
+
+ if (nRegionsToAssign > 0) {
+ if (serversToServerInfo.size() == 1) {
+ // Only one server. An unlikely case but still possible.
+ // Assign all unassigned regions to it.
+
+ for (Text regionName: regionsToAssign) {
+ HRegionInfo regionInfo = unassignedRegions.get(regionName);
+ LOG.info("assigning region " + regionName + " to server " +
+ serverName);
- assignAttempts.put(curRegionName, Long.valueOf(now));
- counter++;
+ assignAttempts.put(regionName, Long.valueOf(now));
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
}
- if(counter >= targetForServer) {
- break;
+ } else {
+ // Multiple servers in play.
+ // We need to allocate regions only to most lightly loaded servers.
+
+ HServerLoad thisServersLoad = info.getLoad();
+
+ synchronized (serversToServerInfo) {
+ SortedMap<HServerLoad, Set<String>> lightServers =
+ loadToServers.headMap(thisServersLoad);
+
+ // How many regions we can assign to more lightly loaded servers?
+
+ int nregions = 0;
+ for (Map.Entry<HServerLoad, Set<String>> e: lightServers.entrySet()) {
+ HServerLoad lightLoad =
+ new HServerLoad(e.getKey().getNumberOfRequests(),
+ e.getKey().getNumberOfRegions());
+
+ do {
+ lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
+ nregions += 1;
+
+ } while (lightLoad.compareTo(thisServersLoad) <= 0 &&
+ nregions < nRegionsToAssign);
+
+ nregions *= e.getValue().size();
+
+ if (nregions >= nRegionsToAssign) {
+ break;
+ }
+ }
+
+ nRegionsToAssign -= nregions;
+ if (nRegionsToAssign > 0) {
+ // We still have more regions to assign. See how many we can assign
+ // before this server becomes more heavily loaded than the next
+ // most heavily loaded server.
+
+ SortedMap<HServerLoad, Set<String>> heavyServers =
+ loadToServers.tailMap(thisServersLoad);
+ int nservers = 0;
+ HServerLoad heavierLoad = null;
+ for (Map.Entry<HServerLoad, Set<String>> e:
+ heavyServers.entrySet()) {
+
+ Set<String> servers = e.getValue();
+ nservers += servers.size();
+
+ if (e.getKey().compareTo(thisServersLoad) == 0) {
+ // This is the load factor of the server we are considering
+
+ nservers -= 1;
+ continue;
+ }
+
+ // If we get here, we are at the first load entry that is a
+ // heavier load than the server we are considering
+
+ heavierLoad = e.getKey();
+ break;
+ }
+
+ nregions = 0;
+ if (heavierLoad != null) {
+ // There is a more heavily loaded server
+
+ for (HServerLoad load =
+ new HServerLoad(thisServersLoad.getNumberOfRequests(),
+ thisServersLoad.getNumberOfRegions());
+ load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
+ load.setNumberOfRegions(load.getNumberOfRegions() + 1),
+ nregions++) {
+ }
+ }
+
+ if (nregions < nRegionsToAssign) {
+ // There are some more heavily loaded servers
+ // but we can't assign all the regions to this server.
+
+ if (nservers > 0) {
+ // There are other servers that can share the load.
+ // Split regions that need assignment across the servers.
+
+ nregions =
+ (int) Math.ceil((1.0 * nRegionsToAssign) / (1.0 * nservers));
+
+ } else {
+ // No other servers with same load.
+ // Split regions over all available servers
+
+ nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
+ / (1.0 * serversToServerInfo.size()));
+
+ }
+
+ } else {
+ // Assign all regions to this server
+
+ nregions = nRegionsToAssign;
+ }
+
+ for (Map.Entry<Text, HRegionInfo> e: unassignedRegions.entrySet()) {
+ Text regionName = e.getKey();
+ HRegionInfo regionInfo = e.getValue();
+ LOG.info("assigning region " + regionName + " to server " +
+ serverName);
+
+ assignAttempts.put(regionName, Long.valueOf(now));
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
+
+ if (--nregions <= 0) {
+ break;
+ }
+ }
+ }
}
}
}
- return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
}
-
- //////////////////////////////////////////////////////////////////////////////
- // Some internal classes to manage msg-passing and client operations
- //////////////////////////////////////////////////////////////////////////////
-
- private abstract class PendingOperation {
- protected final Text[] columns = {
- COLUMN_FAMILY
- };
- protected final Text startRow = new Text();
+ /*
+ * Some internal classes to manage msg-passing and client operations
+ */
+
+ private abstract class PendingOperation {
PendingOperation() {
super();
}
-
+
abstract boolean process() throws IOException;
}
@@ -1302,17 +1589,16 @@
private class PendingServerShutdown extends PendingOperation {
private HServerAddress deadServer;
private String deadServerName;
- private long oldStartCode;
private transient boolean logSplit;
private transient boolean rootChecked;
private transient boolean rootRescanned;
-
+
private class ToDoEntry {
boolean deleteRegion;
boolean regionOffline;
Text row;
HRegionInfo info;
-
+
ToDoEntry(Text row, HRegionInfo info) {
this.deleteRegion = false;
this.regionOffline = false;
@@ -1320,122 +1606,146 @@
this.info = info;
}
}
-
+
PendingServerShutdown(HServerInfo serverInfo) {
super();
this.deadServer = serverInfo.getServerAddress();
this.deadServerName = this.deadServer.toString();
- this.oldStartCode = serverInfo.getStartCode();
this.logSplit = false;
this.rootChecked = false;
this.rootRescanned = false;
}
-
+
+ /** {@inheritDoc} */
@Override
public String toString() {
return "PendingServerShutdown of " + this.deadServer.toString();
}
-
+
/** Finds regions that the dead region server was serving */
private void scanMetaRegion(HRegionInterface server, long scannerId,
- Text regionName)
- throws IOException {
+ Text regionName) throws IOException {
+
ArrayList<ToDoEntry> toDoList = new ArrayList<ToDoEntry>();
TreeMap<Text, HRegionInfo> regions = new TreeMap<Text, HRegionInfo>();
- DataInputBuffer inbuf = new DataInputBuffer();
+
try {
- while(true) {
+ while (true) {
KeyedData[] values = null;
+
try {
values = server.next(scannerId);
+
} catch (IOException e) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
+
}
LOG.error("Shutdown scanning of meta region", e);
break;
}
-
- if(values == null || values.length == 0) {
+
+ if (values == null || values.length == 0) {
break;
}
TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
Text row = null;
- for(int i = 0; i < values.length; i++) {
+ for (int i = 0; i < values.length; i++) {
if(row == null) {
row = values[i].getKey().getRow();
+
} else {
- if(!row.equals(values[i].getKey().getRow())) {
+ if (!row.equals(values[i].getKey().getRow())) {
LOG.error("Multiple rows in same scanner result set. firstRow="
+ row + ", currentRow=" + values[i].getKey().getRow());
}
}
results.put(values[i].getKey().getColumn(), values[i].getData());
}
- if (LOG.isDebugEnabled()) {
+
+ if (LOG.isDebugEnabled() && row != null) {
LOG.debug("shutdown scanner looking at " + row.toString());
}
// Check server name. If null, be conservative and treat as though
// region had been on shutdown server (could be null because we
// missed edits in hlog because hdfs does not do write-append).
+
String serverName = null;
try {
serverName = Keying.bytesToString(results.get(COL_SERVER));
+
} catch(UnsupportedEncodingException e) {
LOG.error("Server name", e);
break;
}
if (serverName != null && serverName.length() > 0 &&
deadServerName.compareTo(serverName) != 0) {
+
// This isn't the server you're looking for - move along
+
if (LOG.isDebugEnabled()) {
LOG.debug("Server name " + serverName + " is not same as " +
- deadServerName + ": Passing");
+ deadServerName + ": Passing");
}
continue;
}
// Bingo! Found it.
+
HRegionInfo info = null;
try {
- info = (HRegionInfo)Writables.
- getWritable(results.get(COL_REGIONINFO), new HRegionInfo());
+ info = (HRegionInfo) Writables.getWritable(
+ results.get(COL_REGIONINFO), new HRegionInfo());
+
} catch (IOException e) {
LOG.error("Read fields", e);
break;
}
LOG.info(info.getRegionName() + " was on shutdown server <" +
- serverName + "> (or server is null). Marking unassigned if " +
- "meta and clearing pendingRegions");
+ serverName + "> (or server is null). Marking unassigned if " +
+ "meta and clearing pendingRegions");
+
if (info.tableDesc.getName().equals(META_TABLE_NAME)) {
onlineMetaRegions.remove(info.getStartKey());
}
-
+
ToDoEntry todo = new ToDoEntry(row, info);
toDoList.add(todo);
- if(killList.containsKey(deadServerName)) {
- TreeMap<Text, HRegionInfo> regionsToKill = killList.get(deadServerName);
- if(regionsToKill.containsKey(info.regionName)) {
+
+ if (killList.containsKey(deadServerName)) {
+ HashMap<Text, HRegionInfo> regionsToKill =
+ killList.get(deadServerName);
+
+ if (regionsToKill.containsKey(info.regionName)) {
regionsToKill.remove(info.regionName);
killList.put(deadServerName, regionsToKill);
unassignedRegions.remove(info.regionName);
assignAttempts.remove(info.regionName);
- if(regionsToDelete.contains(info.regionName)) {
+
+ if (regionsToDelete.contains(info.regionName)) {
// Delete this region
+
regionsToDelete.remove(info.regionName);
todo.deleteRegion = true;
+
} else {
// Mark region offline
+
todo.regionOffline = true;
}
}
+
} else {
// Get region reassigned
+
regions.put(info.regionName, info);
- // If was pending, remove otherwise will obstruct its getting
- // reassigned.
+
+ // If it was pending, remove.
+ // Otherwise will obstruct its getting reassigned.
+
pendingRegions.remove(info.getRegionName());
}
}
@@ -1444,10 +1754,11 @@
if(scannerId != -1L) {
try {
server.close(scannerId);
-
+
} catch (IOException e) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
LOG.error("Closing scanner", e);
}
@@ -1455,27 +1766,30 @@
}
// Remove server from root/meta entries
+
long clientId = rand.nextLong();
for (ToDoEntry e: toDoList) {
long lockid = server.startUpdate(regionName, clientId, e.row);
- if(e.deleteRegion) {
+
+ if (e.deleteRegion) {
server.delete(regionName, clientId, lockid, COL_REGIONINFO);
- } else if(e.regionOffline) {
+
+ } else if (e.regionOffline) {
e.info.offLine = true;
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
e.info.write(s);
server.put(regionName, clientId, lockid, COL_REGIONINFO,
- byteValue.toByteArray());
+ byteValue.toByteArray());
}
server.delete(regionName, clientId, lockid, COL_SERVER);
server.delete(regionName, clientId, lockid, COL_STARTCODE);
server.commit(regionName, clientId, lockid, System.currentTimeMillis());
}
-
+
// Get regions reassigned
- for(Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
+ for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
Text region = e.getKey();
HRegionInfo regionInfo = e.getValue();
@@ -1487,64 +1801,72 @@
@Override
boolean process() throws IOException {
LOG.info("process shutdown of server " + deadServer + ": logSplit: " +
- this.logSplit + ", rootChecked: " + this.rootChecked +
- ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " +
- numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " +
- onlineMetaRegions.size());
+ this.logSplit + ", rootChecked: " + this.rootChecked +
+ ", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " +
+ numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " +
+ onlineMetaRegions.size());
- if(!logSplit) {
+ if (!logSplit) {
// Process the old log file
+
HLog.splitLog(dir, new Path(dir, "log" + "_" +
- deadServer.getBindAddress() + "_" + deadServer.getPort()), fs, conf);
+ deadServer.getBindAddress() + "_" + deadServer.getPort()), fs, conf);
+
logSplit = true;
}
- if(!rootChecked) {
- if(rootRegionLocation != null
- && deadServer.equals(rootRegionLocation)) {
- rootRegionLocation = null;
+ if (!rootChecked) {
+ if (rootRegionLocation.get() != null &&
+ deadServer.equals(rootRegionLocation.get())) {
+
+ rootRegionLocation.set(null);
unassignedRegions.put(HGlobals.rootRegionInfo.regionName,
HGlobals.rootRegionInfo);
+
assignAttempts.put(HGlobals.rootRegionInfo.regionName,
Long.valueOf(0L));
}
rootChecked = true;
}
- if(!rootRescanned) {
+ if (!rootRescanned) {
// Scan the ROOT region
+
HRegionInterface server = null;
long scannerId = -1L;
- for(int tries = 0; tries < numRetries; tries ++) {
+ for (int tries = 0; tries < numRetries; tries ++) {
if (closed) {
return true;
}
- if (rootRegionLocation == null || !rootScanned) {
- // We can't proceed until the root region is online and has been
- // scanned
+ if (rootRegionLocation.get() == null || !rootScanned) {
+ // We can't proceed until the root region is online and has been scanned
+
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region " +
- "cancelled because rootRegionLocation is null");
+ "cancelled because rootRegionLocation is null");
}
return false;
}
- server = connection.getHRegionConnection(rootRegionLocation);
+ server = connection.getHRegionConnection(rootRegionLocation.get());
scannerId = -1L;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
- rootRegionLocation.getBindAddress());
+ rootRegionLocation.get().getBindAddress());
}
scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName,
- columns, startRow, System.currentTimeMillis(), null);
- scanMetaRegion(server, scannerId,
- HGlobals.rootRegionInfo.regionName);
+ COLUMN_FAMILY_ARRAY, EMPTY_START_ROW,
+ System.currentTimeMillis(), null);
+
+ scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
break;
+
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
throw e;
}
@@ -1552,8 +1874,8 @@
}
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
- rootRegionLocation.getBindAddress() + " finished " +
- Thread.currentThread().getName());
+ rootRegionLocation.get().getBindAddress() + " finished " +
+ Thread.currentThread().getName());
}
rootRescanned = true;
}
@@ -1569,37 +1891,45 @@
// We can't block either because that would prevent the meta region
// online message from being processed. So return false to have this
// operation requeued.
+
if (LOG.isDebugEnabled()) {
LOG.debug("Requeuing shutdown because rootScanned: " +
- rootScanned + ", numberOfMetaRegions: " +
- numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " +
- onlineMetaRegions.size());
+ rootScanned + ", numberOfMetaRegions: " +
+ numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " +
+ onlineMetaRegions.size());
}
return false;
}
for (MetaRegion r: onlineMetaRegions.values()) {
+
HRegionInterface server = null;
long scannerId = -1L;
+
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning " + r.regionName +
- " on " + r.server + " " + Thread.currentThread().getName());
+ " on " + r.server + " " + Thread.currentThread().getName());
}
server = connection.getHRegionConnection(r.server);
- scannerId = server.openScanner(r.regionName, columns, startRow,
- System.currentTimeMillis(), null);
+
+ scannerId = server.openScanner(r.regionName, COLUMN_FAMILY_ARRAY,
+ EMPTY_START_ROW, System.currentTimeMillis(), null);
+
scanMetaRegion(server, scannerId, r.regionName);
+
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown finished scanning " +
- r.regionName +
- " on " + r.server + " " + Thread.currentThread().getName());
+ r.regionName + " on " + r.server + " " +
+ Thread.currentThread().getName());
}
}
break;
+
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
throw e;
}
@@ -1608,7 +1938,7 @@
return true;
}
}
-
+
/**
* PendingCloseReport is instantiated when a region server reports that it
* has closed a region.
@@ -1618,10 +1948,10 @@
private boolean reassignRegion;
private boolean deleteRegion;
private boolean rootRegion;
-
+
PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion,
boolean deleteRegion) {
-
+
super();
this.regionInfo = regionInfo;
@@ -1630,55 +1960,57 @@
// If the region closing down is a meta region then we need to update
// the ROOT table
-
- if(this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) {
+
+ if (this.regionInfo.tableDesc.getName().equals(META_TABLE_NAME)) {
this.rootRegion = true;
-
+
} else {
this.rootRegion = false;
}
}
-
+
+ /** {@inheritDoc} */
@Override
public String toString() {
return "PendingCloseReport of " + this.regionInfo.getRegionName();
}
-
+
@Override
boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
+ if (closed) {
+ return true;
+ }
LOG.info("region closed: " + regionInfo.regionName);
// Mark the Region as unavailable in the appropriate meta table
Text metaRegionName;
HRegionInterface server;
-
- if (closed) {
- return true;
- }
if (rootRegion) {
- if (rootRegionLocation == null || !rootScanned) {
+ if (rootRegionLocation.get() == null || !rootScanned) {
// We can't proceed until the root region is online and has been
// scanned
return false;
}
metaRegionName = HGlobals.rootRegionInfo.regionName;
- server = connection.getHRegionConnection(rootRegionLocation);
+ server = connection.getHRegionConnection(rootRegionLocation.get());
onlineMetaRegions.remove(regionInfo.getStartKey());
} else {
if (!rootScanned ||
numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+
// We can't proceed because not all of the meta regions are online.
// We can't block either because that would prevent the meta region
// online message from being processed. So return false to have this
// operation requeued.
+
if (LOG.isDebugEnabled()) {
LOG.debug("Requeuing close because rootScanned=" +
- rootScanned + ", numberOfMetaRegions=" +
- numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" +
- onlineMetaRegions.size());
+ rootScanned + ", numberOfMetaRegions=" +
+ numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" +
+ onlineMetaRegions.size());
}
return false;
}
@@ -1688,8 +2020,8 @@
r = onlineMetaRegions.get(regionInfo.getRegionName());
} else {
- r = onlineMetaRegions.get(
- onlineMetaRegions.headMap(regionInfo.getRegionName()).lastKey());
+ r = onlineMetaRegions.get(onlineMetaRegions.headMap(
+ regionInfo.getRegionName()).lastKey());
}
metaRegionName = r.regionName;
server = connection.getHRegionConnection(r.server);
@@ -1699,30 +2031,31 @@
try {
long lockid = server.startUpdate(metaRegionName, clientId,
regionInfo.regionName);
-
- if(deleteRegion) {
+
+ if (deleteRegion) {
server.delete(metaRegionName, clientId, lockid, COL_REGIONINFO);
-
- } else if(!reassignRegion ) {
+
+ } else if (!reassignRegion ) {
regionInfo.offLine = true;
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
DataOutputStream s = new DataOutputStream(byteValue);
regionInfo.write(s);
server.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
- byteValue.toByteArray());
+ byteValue.toByteArray());
}
server.delete(metaRegionName, clientId, lockid, COL_SERVER);
server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
-
+
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
throw e;
}
@@ -1730,19 +2063,20 @@
}
}
- if(reassignRegion) {
+ if (reassignRegion) {
LOG.info("reassign region: " + regionInfo.regionName);
-
+
unassignedRegions.put(regionInfo.regionName, regionInfo);
assignAttempts.put(regionInfo.regionName, Long.valueOf(0L));
-
- } else if(deleteRegion) {
+
+ } else if (deleteRegion) {
try {
HRegion.deleteRegion(fs, dir, regionInfo.regionName);
} catch (IOException e) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
LOG.error("failed delete region " + regionInfo.regionName, e);
throw e;
@@ -1762,14 +2096,17 @@
private HRegionInfo region;
private HServerAddress serverAddress;
private byte [] startCode;
-
+
PendingOpenReport(HServerInfo info, HRegionInfo region) {
if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
// The region which just came on-line is a META region.
// We need to look in the ROOT region for its information.
+
this.rootRegion = true;
+
} else {
// Just an ordinary region. Look for it in the META table.
+
this.rootRegion = false;
}
this.region = region;
@@ -1781,44 +2118,47 @@
LOG.error("Start code", e);
}
}
-
+
+ /** {@inheritDoc} */
@Override
public String toString() {
return "PendingOpenOperation from " + serverAddress.toString();
}
-
+
@Override
boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
+ if (closed) {
+ return true;
+ }
LOG.info(region.getRegionName() + " open on " +
this.serverAddress.toString());
+
// Register the newly-available Region's location.
+
Text metaRegionName;
HRegionInterface server;
- if (closed) {
- return true;
- }
-
if (rootRegion) {
- if (rootRegionLocation == null || !rootScanned) {
- // We can't proceed until the root region is online and has been
- // scanned
+ if (rootRegionLocation.get() == null || !rootScanned) {
+ // We can't proceed until the root region is online and has been scanned
if (LOG.isDebugEnabled()) {
- LOG.debug("root region=" + rootRegionLocation.toString() +
+ LOG.debug("root region=" + rootRegionLocation.get().toString() +
", rootScanned=" + rootScanned);
}
return false;
}
metaRegionName = HGlobals.rootRegionInfo.regionName;
- server = connection.getHRegionConnection(rootRegionLocation);
+ server = connection.getHRegionConnection(rootRegionLocation.get());
+
} else {
if (!rootScanned ||
- numberOfMetaRegions.get() != onlineMetaRegions.size()) {
-
+ numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+
// We can't proceed because not all of the meta regions are online.
// We can't block either because that would prevent the meta region
// online message from being processed. So return false to have this
- // operation requeue
+ // operation requeued.
+
if (LOG.isDebugEnabled()) {
LOG.debug("Requeuing open because rootScanned: " +
rootScanned + ", numberOfMetaRegions: " +
@@ -1831,49 +2171,62 @@
MetaRegion r = null;
if (onlineMetaRegions.containsKey(region.getRegionName())) {
r = onlineMetaRegions.get(region.getRegionName());
+
} else {
- r = onlineMetaRegions.get(
- onlineMetaRegions.headMap(region.getRegionName()).lastKey());
+ r = onlineMetaRegions.get(onlineMetaRegions.headMap(
+ region.getRegionName()).lastKey());
}
metaRegionName = r.regionName;
server = connection.getHRegionConnection(r.server);
}
LOG.info("updating row " + region.getRegionName() + " in table " +
metaRegionName);
+
long clientId = rand.nextLong();
try {
long lockid = server.startUpdate(metaRegionName, clientId,
region.getRegionName());
+
server.put(metaRegionName, clientId, lockid, COL_SERVER,
serverAddress.toString().getBytes(UTF8_ENCODING));
+
server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
+
server.commit(metaRegionName, clientId, lockid,
System.currentTimeMillis());
-
+
if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
// It's a meta region.
+
MetaRegion m =
new MetaRegion(serverAddress, region.regionName, region.startKey);
+
if (!initialMetaScanComplete) {
// Put it on the queue to be scanned for the first time.
+
try {
metaRegionsToScan.put(m);
+
} catch (InterruptedException e) {
throw new RuntimeException(
"Putting into metaRegionsToScan was interrupted.", e);
}
} else {
// Add it to the online meta regions
+
onlineMetaRegions.put(region.startKey, m);
}
}
// If updated successfully, remove from pending list.
+
pendingRegions.remove(region.getRegionName());
break;
+
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
throw e;
}
@@ -1883,20 +2236,16 @@
}
}
- //////////////////////////////////////////////////////////////////////////////
- // HMasterInterface
- //////////////////////////////////////////////////////////////////////////////
-
- /**
- * {@inheritDoc}
+ /*
+ * HMasterInterface
*/
+
+ /** {@inheritDoc} */
public boolean isMasterRunning() {
return !closed;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public void shutdown() {
TimerTask tt = new TimerTask() {
@Override
@@ -1912,11 +2261,10 @@
t.schedule(tt, 10);
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public void createTable(HTableDescriptor desc)
throws IOException {
+
if (!isMasterRunning()) {
throw new MasterNotRunningException();
}
@@ -1926,28 +2274,29 @@
try {
// We can not access meta regions if they have not already been
// assigned and scanned. If we timeout waiting, just shutdown.
+
if (metaScanner.waitForMetaRegionsOrClose()) {
break;
}
createTable(newRegion);
LOG.info("created table " + desc.getName());
break;
+
} catch (IOException e) {
if (tries == numRetries - 1) {
if (e instanceof RemoteException) {
- e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+ e = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException) e);
}
throw e;
}
}
}
}
-
- /*
- * Set of tables currently in creation. Access needs to be synchronized.
- */
+
+ /* Set of tables currently in creation. Access needs to be synchronized. */
private Set<Text> tableInCreation = new HashSet<Text>();
-
+
private void createTable(final HRegionInfo newRegion) throws IOException {
Text tableName = newRegion.tableDesc.getName();
synchronized (tableInCreation) {
@@ -1962,36 +2311,43 @@
// table would sit should it exist. Open scanner on it. If a region
// for the table we want to create already exists, then table already
// created. Throw already-exists exception.
- MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName)) ?
+
+ MetaRegion m = (onlineMetaRegions.containsKey(newRegion.regionName) ?
onlineMetaRegions.get(newRegion.regionName) :
- onlineMetaRegions.get(onlineMetaRegions.
- headMap(newRegion.getTableDesc().getName()).lastKey());
+ onlineMetaRegions.get(onlineMetaRegions.headMap(
+ newRegion.getTableDesc().getName()).lastKey()));
+
Text metaRegionName = m.regionName;
HRegionInterface r = connection.getHRegionConnection(m.server);
- long scannerid = r.openScanner(metaRegionName,
- new Text[] { COL_REGIONINFO }, tableName, System.currentTimeMillis(),
- null);
+ long scannerid = r.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
+ tableName, System.currentTimeMillis(), null);
try {
KeyedData[] data = r.next(scannerid);
- // Test data and that the row for the data is for our table. If
- // table does not exist, scanner will return row after where our table
- // would be inserted if it exists so look for exact match on table
- // name.
+
+ // Test data and that the row for the data is for our table. If table
+ // does not exist, scanner will return row after where our table would
+ // be inserted if it exists so look for exact match on table name.
+
if (data != null && data.length > 0 &&
- HRegionInfo.getTableNameFromRegionName(data[0].getKey().getRow()).
- equals(tableName)) {
+ HRegionInfo.getTableNameFromRegionName(
+ data[0].getKey().getRow()).equals(tableName)) {
+
// Then a region for this table already exists. Ergo table exists.
+
throw new TableExistsException(tableName.toString());
}
+
} finally {
r.close(scannerid);
}
// 2. Create the HRegion
- HRegion region = HRegion.createHRegion(newRegion.regionId, newRegion.
- getTableDesc(), this.dir, this.conf);
+
+ HRegion region = HRegion.createHRegion(newRegion.regionId,
+ newRegion.getTableDesc(), this.dir, this.conf);
// 3. Insert into meta
+
HRegionInfo info = region.getRegionInfo();
Text regionName = region.getRegionName();
ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
@@ -1999,18 +2355,22 @@
info.write(s);
long clientId = rand.nextLong();
long lockid = r.startUpdate(metaRegionName, clientId, regionName);
+
r.put(metaRegionName, clientId, lockid, COL_REGIONINFO,
- byteValue.toByteArray());
- r.commit(metaRegionName, clientId, lockid,
- System.currentTimeMillis());
+ byteValue.toByteArray());
+
+ r.commit(metaRegionName, clientId, lockid, System.currentTimeMillis());
// 4. Close the new region to flush it to disk. Close its log file too.
+
region.close();
region.getLog().closeAndDelete();
// 5. Get it assigned to a server
+
unassignedRegions.put(regionName, info);
assignAttempts.put(regionName, Long.valueOf(0L));
+
} finally {
synchronized (tableInCreation) {
tableInCreation.remove(newRegion.getTableDesc().getName());
@@ -2018,70 +2378,62 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public void deleteTable(Text tableName) throws IOException {
new TableDelete(tableName).process();
LOG.info("deleted table: " + tableName);
}
-
- /**
- * {@inheritDoc}
- */
- public void addColumn(Text tableName, HColumnDescriptor column) throws IOException {
+
+ /** {@inheritDoc} */
+ public void addColumn(Text tableName, HColumnDescriptor column)
+ throws IOException {
+
new AddColumn(tableName, column).process();
}
-
- /**
- * {@inheritDoc}
- */
+
+ /** {@inheritDoc} */
public void deleteColumn(Text tableName, Text columnName) throws IOException {
new DeleteColumn(tableName, HStoreKey.extractFamily(columnName)).process();
}
-
- /**
- * {@inheritDoc}
- */
+
+ /** {@inheritDoc} */
public void enableTable(Text tableName) throws IOException {
new ChangeTableState(tableName, true).process();
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
public void disableTable(Text tableName) throws IOException {
new ChangeTableState(tableName, false).process();
}
-
- /**
- * {@inheritDoc}
- */
+
+ /** {@inheritDoc} */
public HServerAddress findRootRegion() {
- return rootRegionLocation;
+ return rootRegionLocation.get();
}
-
- // Helper classes for HMasterInterface
+
+ /*
+ * Helper classes for HMasterInterface
+ */
private abstract class TableOperation {
- private SortedSet<MetaRegion> metaRegions;
+ private Set<MetaRegion> metaRegions;
protected Text tableName;
-
- protected TreeSet<HRegionInfo> unservedRegions;
-
+ protected Set<HRegionInfo> unservedRegions;
+
protected TableOperation(Text tableName) throws IOException {
if (!isMasterRunning()) {
throw new MasterNotRunningException();
}
- this.metaRegions = new TreeSet<MetaRegion>();
+
+ this.metaRegions = new HashSet<MetaRegion>();
this.tableName = tableName;
- this.unservedRegions = new TreeSet<HRegionInfo>();
+ this.unservedRegions = new HashSet<HRegionInfo>();
// We can not access any meta region if they have not already been
// assigned and scanned.
[... 716 lines stripped ...]