You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2008/02/24 00:53:31 UTC
svn commit: r630545 [1/2] - in /hadoop/hbase/trunk/src:
java/org/apache/hadoop/hbase/client/ java/org/apache/hadoop/hbase/master/
java/org/apache/hadoop/hbase/util/ test/org/apache/hadoop/hbase/
Author: bryanduxbury
Date: Sat Feb 23 15:53:21 2008
New Revision: 630545
URL: http://svn.apache.org/viewvc?rev=630545&view=rev
Log:
HBASE-457 Factor Master into Master, RegionManager, and ServerManager
Added:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
Modified:
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/MetaScanner.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RootScanner.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableDelete.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableOperation.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/SoftSortedMap.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java
hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Feb 23 15:53:21 2008
@@ -541,20 +541,24 @@
if (!matchingRegions.isEmpty()) {
HRegionLocation possibleRegion =
matchingRegions.get(matchingRegions.lastKey());
-
- Text endKey = possibleRegion.getRegionInfo().getEndKey();
- // make sure that the end key is greater than the row we're looking
- // for, otherwise the row actually belongs in the next region, not
- // this one. the exception case is when the endkey is EMPTY_START_ROW,
- // signifying that the region we're checking is actually the last
- // region in the table.
- if (endKey.equals(EMPTY_TEXT) || endKey.compareTo(row) > 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found possible location for " + row + ", " +
- possibleRegion);
+ // there is a possibility that the reference was garbage collected
+ // in the instant since we checked isEmpty().
+ if (possibleRegion != null) {
+ Text endKey = possibleRegion.getRegionInfo().getEndKey();
+
+ // make sure that the end key is greater than the row we're looking
+ // for, otherwise the row actually belongs in the next region, not
+ // this one. the exception case is when the endkey is EMPTY_START_ROW,
+ // signifying that the region we're checking is actually the last
+ // region in the table.
+ if (endKey.equals(EMPTY_TEXT) || endKey.compareTo(row) > 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found possible location for " + row + ", " +
+ possibleRegion);
+ }
+ return possibleRegion;
}
- return possibleRegion;
}
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/BaseScanner.java Sat Feb 23 15:53:21 2008
@@ -101,15 +101,28 @@
protected final boolean rootRegion;
protected final HMaster master;
+ protected final RegionManager regionManager;
+
+ protected boolean initialScanComplete;
protected abstract boolean initialScan();
protected abstract void maintenanceScan();
- BaseScanner(final HMaster master, final boolean rootRegion, final int period,
- final AtomicBoolean stop) {
+ // will use this variable to synchronize and make sure we aren't interrupted
+ // mid-scan
+ final Integer scannerLock = new Integer(0);
+
+ BaseScanner(final HMaster master, final RegionManager regionManager,
+ final boolean rootRegion, final int period, final AtomicBoolean stop) {
super(period, stop);
this.rootRegion = rootRegion;
this.master = master;
+ this.regionManager = regionManager;
+ this.initialScanComplete = false;
+ }
+
+ public boolean isInitialScanComplete() {
+ return initialScanComplete;
}
@Override
@@ -173,8 +186,8 @@
}
numberOfRegionsFound += 1;
}
- if (this.rootRegion) {
- master.numberOfMetaRegions.set(numberOfRegionsFound);
+ if (rootRegion) {
+ regionManager.setNumMetaRegions(numberOfRegionsFound);
}
} catch (IOException e) {
if (e instanceof RemoteException) {
@@ -328,34 +341,32 @@
}
protected void checkAssigned(final HRegionInfo info,
- final String serverName, final long startCode) throws IOException {
+ final String serverName, final long startCode)
+ throws IOException {
// Skip region - if ...
if(info.isOffline() // offline
- || master.killedRegions.contains(info.getRegionName()) // queued for offline
- || master.regionsToDelete.contains(info.getRegionName())) { // queued for delete
+ || regionManager.isClosing(info.getRegionName()) // queued for offline
+ || regionManager.isMarkedForDeletion(info.getRegionName())) { // queued for delete
- master.unassignedRegions.remove(info);
+ regionManager.noLongerUnassigned(info);
return;
}
HServerInfo storedInfo = null;
boolean deadServer = false;
if (serverName.length() != 0) {
- synchronized (master.killList) {
- Map<Text, HRegionInfo> regionsToKill = master.killList.get(serverName);
- if (regionsToKill != null &&
- regionsToKill.containsKey(info.getRegionName())) {
-
- // Skip if region is on kill list
- if(LOG.isDebugEnabled()) {
- LOG.debug("not assigning region (on kill list): " +
- info.getRegionName());
- }
- return;
+
+ if (regionManager.isMarkedClosedNoReopen(serverName, info.getRegionName())) {
+ // Skip if region is on kill list
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("not assigning region (on kill list): " +
+ info.getRegionName());
}
+ return;
}
- storedInfo = master.serversToServerInfo.get(serverName);
- deadServer = master.deadServers.contains(serverName);
+
+ storedInfo = master.serverManager.getServerInfo(serverName);
+ deadServer = master.serverManager.isDead(serverName);
}
/*
@@ -366,13 +377,13 @@
* then:
*/
if (!deadServer &&
- ((storedInfo != null && storedInfo.getStartCode() != startCode) ||
- (storedInfo == null &&
- !master.unassignedRegions.containsKey(info) &&
- !master.pendingRegions.contains(info.getRegionName())
- )
+ ((storedInfo != null && storedInfo.getStartCode() != startCode) ||
+ (storedInfo == null &&
+ !regionManager.isUnassigned(info) &&
+ !regionManager.isPending(info.getRegionName())
)
- ) {
+ )
+ ) {
// The current assignment is invalid
if (LOG.isDebugEnabled()) {
@@ -380,25 +391,26 @@
" is not valid: storedInfo: " + storedInfo + ", startCode: " +
startCode + ", storedInfo.startCode: " +
((storedInfo != null)? storedInfo.getStartCode(): -1) +
- ", unassignedRegions: " + master.unassignedRegions.containsKey(info) +
+ ", unassignedRegions: " +
+ regionManager.isUnassigned(info) +
", pendingRegions: " +
- master.pendingRegions.contains(info.getRegionName()));
+ regionManager.isPending(info.getRegionName()));
}
// Recover the region server's log if there is one.
// This is only done from here if we are restarting and there is stale
// data in the meta region. Once we are on-line, dead server log
// recovery is handled by lease expiration and ProcessServerShutdown
- if (!master.initialMetaScanComplete && serverName.length() != 0) {
+ if (!regionManager.isInitialMetaScanComplete() && serverName.length() != 0) {
StringBuilder dirName = new StringBuilder("log_");
dirName.append(serverName.replace(":", "_"));
Path logDir = new Path(master.rootdir, dirName.toString());
try {
if (master.fs.exists(logDir)) {
- master.splitLogLock.lock();
+ regionManager.splitLogLock.lock();
try {
HLog.splitLog(master.rootdir, logDir, master.fs, master.conf);
} finally {
- master.splitLogLock.unlock();
+ regionManager.splitLogLock.unlock();
}
}
if (LOG.isDebugEnabled()) {
@@ -410,7 +422,18 @@
}
}
// Now get the region assigned
- master.unassignedRegions.put(info, ZERO_L);
+ regionManager.setUnassigned(info);
+ }
+ }
+
+ /**
+ * Notify the thread to die at the end of its next run
+ */
+ public void interruptIfAlive() {
+ synchronized(scannerLock){
+ if (isAlive()) {
+ super.interrupt();
+ }
}
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ChangeTableState.java Sat Feb 23 15:53:21 2008
@@ -92,13 +92,14 @@
LOG.debug("updated columns in row: " + i.getRegionName());
}
- if (online) { // Bring offline regions on-line
- if (!this.master.unassignedRegions.containsKey(i)) {
- this.master.unassignedRegions.put(i, ZERO_L);
+ if (online) {
+ // Bring offline regions on-line
+ if (!master.regionManager.isUnassigned(i)) {
+ master.regionManager.setUnassigned(i);
}
-
- } else { // Prevent region from getting assigned.
- this.master.unassignedRegions.remove(i);
+ } else {
+ // Prevent region from getting assigned.
+ master.regionManager.noLongerUnassigned(i);
}
}
@@ -118,18 +119,17 @@
HashMap<Text, HRegionInfo> localKillList =
new HashMap<Text, HRegionInfo>();
-
- synchronized (this.master.killList) {
- HashMap<Text, HRegionInfo> killedRegions =
- this.master.killList.get(serverName);
- if (killedRegions != null) {
- localKillList.putAll(killedRegions);
- }
+
+ Map<Text, HRegionInfo> killedRegions =
+ master.regionManager.getMarkedClosedNoReopen(serverName);
+ if (killedRegions != null) {
+ localKillList.putAll(killedRegions);
}
+
for (HRegionInfo i: e.getValue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("adding region " + i.getRegionName() +
- " to local kill list");
+ " to kill list");
}
localKillList.put(i.getRegionName(), i);
}
@@ -138,7 +138,7 @@
LOG.debug("inserted local kill list into kill list for server " +
serverName);
}
- this.master.killList.put(serverName, localKillList);
+ master.regionManager.markClosedNoReopenBulk(serverName, localKillList);
}
}
servedRegions.clear();
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/HMaster.java Sat Feb 23 15:53:21 2008
@@ -88,7 +88,6 @@
HMasterRegionInterface {
static final Log LOG = LogFactory.getLog(HMaster.class.getName());
- static final Long ZERO_L = Long.valueOf(0L);
/** {@inheritDoc} */
public long getProtocolVersion(String protocol,
@@ -118,25 +117,19 @@
final int threadWakeFrequency;
final int numRetries;
final long maxRegionOpenTime;
+ final int leaseTimeout;
volatile DelayQueue<RegionServerOperation> delayedToDoQueue =
new DelayQueue<RegionServerOperation>();
volatile BlockingQueue<RegionServerOperation> toDoQueue =
new LinkedBlockingQueue<RegionServerOperation>();
- final int leaseTimeout;
- private final Leases serverLeases;
private final Server server;
private final HServerAddress address;
final HConnection connection;
final int metaRescanInterval;
-
- volatile AtomicReference<HServerAddress> rootRegionLocation =
- new AtomicReference<HServerAddress>(null);
-
- final Lock splitLogLock = new ReentrantLock();
// A Sleeper that sleeps for threadWakeFrequency
protected final Sleeper sleeper;
@@ -153,87 +146,13 @@
return infoServer;
}
- volatile boolean rootScanned = false;
-
- private final RootScanner rootScannerThread;
- final Integer rootScannerLock = new Integer(0);
-
- /** Set by root scanner to indicate the number of meta regions */
- volatile AtomicInteger numberOfMetaRegions = new AtomicInteger();
-
- /** Work for the meta scanner is queued up here */
- volatile BlockingQueue<MetaRegion> metaRegionsToScan =
- new LinkedBlockingQueue<MetaRegion>();
-
- /** These are the online meta regions */
- volatile SortedMap<Text, MetaRegion> onlineMetaRegions =
- Collections.synchronizedSortedMap(new TreeMap<Text, MetaRegion>());
-
- /** Set by meta scanner after initial scan */
- volatile boolean initialMetaScanComplete = false;
-
- final MetaScanner metaScannerThread;
- final Integer metaScannerLock = new Integer(0);
-
- /** The map of known server names to server info */
- volatile Map<String, HServerInfo> serversToServerInfo =
- new ConcurrentHashMap<String, HServerInfo>();
-
- /** Set of known dead servers */
- volatile Set<String> deadServers =
- Collections.synchronizedSet(new HashSet<String>());
-
- /** SortedMap server load -> Set of server names */
- volatile SortedMap<HServerLoad, Set<String>> loadToServers =
- Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
-
- /** Map of server names -> server load */
- volatile Map<String, HServerLoad> serversToLoad =
- new ConcurrentHashMap<String, HServerLoad>();
-
- /**
- * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that
- * indicates the last time we *tried* to assign the region to a RegionServer.
- * If the timestamp is out of date, then we can try to reassign it.
- *
- * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
- * set of all known valid regions.
- *
- * <p>Items are removed from this list when a region server reports in that
- * the region has been deployed.
- */
- volatile SortedMap<HRegionInfo, Long> unassignedRegions =
- Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>());
-
- /**
- * Regions that have been assigned, and the server has reported that it has
- * started serving it, but that we have not yet recorded in the meta table.
- */
- volatile Set<Text> pendingRegions =
- Collections.synchronizedSet(new HashSet<Text>());
-
- /**
- * The 'killList' is a list of regions that are going to be closed, but not
- * reopened.
- */
- volatile Map<String, HashMap<Text, HRegionInfo>> killList =
- new ConcurrentHashMap<String, HashMap<Text, HRegionInfo>>();
-
- /** 'killedRegions' contains regions that are in the process of being closed */
- volatile Set<Text> killedRegions =
- Collections.synchronizedSet(new HashSet<Text>());
-
- /**
- * 'regionsToDelete' contains regions that need to be deleted, but cannot be
- * until the region server closes it
- */
- volatile Set<Text> regionsToDelete =
- Collections.synchronizedSet(new HashSet<Text>());
-
/** Set of tables currently in creation. */
private volatile Set<Text> tableInCreation =
Collections.synchronizedSet(new HashSet<Text>());
+ ServerManager serverManager;
+ RegionManager regionManager;
+
/** Build the HMaster out of a raw configuration item.
*
* @param conf - Configuration object
@@ -317,10 +236,7 @@
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
-
this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
- this.serverLeases = new Leases(this.leaseTimeout,
- conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
this.server = HbaseRPC.getServer(this, address.getBindAddress(),
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
@@ -335,35 +251,15 @@
this.metaRescanInterval =
conf.getInt("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
- // The root region
- this.rootScannerThread = new RootScanner(this);
-
- // Scans the meta table
- this.metaScannerThread = new MetaScanner(this);
-
- unassignRootRegion();
-
this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
+ serverManager = new ServerManager(this);
+ regionManager = new RegionManager(this);
+
// We're almost open for business
this.closed.set(false);
LOG.info("HMaster initialized on " + this.address.toString());
}
-
- /*
- * Unassign the root region.
- * This method would be used in case where root region server had died
- * without reporting in. Currently, we just flounder and never recover. We
- * could 'notice' dead region server in root scanner -- if we failed access
- * multiple times -- but reassigning root is catastrophic.
- *
- */
- void unassignRootRegion() {
- this.rootRegionLocation.set(null);
- if (!this.shutdownRequested) {
- this.unassignedRegions.put(HRegionInfo.rootRegionInfo, ZERO_L);
- }
- }
/**
* Checks to see if the file system is still accessible.
@@ -397,14 +293,14 @@
* @return Read-only map of servers to serverinfo.
*/
public Map<String, HServerInfo> getServersToServerInfo() {
- return Collections.unmodifiableMap(this.serversToServerInfo);
+ return serverManager.getServersToServerInfo();
}
/**
* @return Read-only map of servers to load.
*/
public Map<String, HServerLoad> getServersToLoad() {
- return Collections.unmodifiableMap(this.serversToLoad);
+ return serverManager.getServersToLoad();
}
/**
@@ -413,16 +309,20 @@
public HServerAddress getRootRegionLocation() {
HServerAddress rootServer = null;
if (!shutdownRequested && !closed.get()) {
- rootServer = this.rootRegionLocation.get();
+ rootServer = regionManager.getRootRegionLocation();
}
return rootServer;
}
+ public void waitForRootRegionLocation() {
+ regionManager.waitForRootRegionLocation();
+ }
+
/**
* @return Read-only map of online regions.
*/
public Map<Text, MetaRegion> getOnlineMetaRegions() {
- return Collections.unmodifiableSortedMap(this.onlineMetaRegions);
+ return regionManager.getOnlineMetaRegions();
}
/** Main processing loop */
@@ -434,84 +334,26 @@
/* Main processing loop */
try {
while (!closed.get()) {
- RegionServerOperation op = null;
- if (shutdownRequested && serversToServerInfo.size() == 0) {
+ // check if we should be shutting down
+ if (shutdownRequested && serverManager.numServers() == 0) {
startShutdown();
break;
}
- if (rootRegionLocation.get() != null) {
- // We can't process server shutdowns unless the root region is online
- op = this.delayedToDoQueue.poll();
- }
- if (op == null ) {
- try {
- op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- // continue
- }
- }
- if (op == null || closed.get()) {
- continue;
- }
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Main processing loop: " + op.toString());
- }
-
- if (!op.process()) {
- // Operation would have blocked because not all meta regions are
- // online. This could cause a deadlock, because this thread is waiting
- // for the missing meta region(s) to come back online, but since it
- // is waiting, it cannot process the meta region online operation it
- // is waiting for. So put this operation back on the queue for now.
- if (toDoQueue.size() == 0) {
- // The queue is currently empty so wait for a while to see if what
- // we need comes in first
- sleeper.sleep();
- }
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Put " + op.toString() + " back on queue");
- }
- toDoQueue.put(op);
- } catch (InterruptedException e) {
- throw new RuntimeException(
- "Putting into toDoQueue was interrupted.", e);
- }
- }
- } catch (Exception ex) {
- if (ex instanceof RemoteException) {
- try {
- ex = RemoteExceptionHandler.decodeRemoteException(
- (RemoteException)ex);
- } catch (IOException e) {
- ex = e;
- LOG.warn("main processing loop: " + op.toString(), e);
- }
- }
- if (!checkFileSystem()) {
- break;
- }
- LOG.warn("Processing pending operations: " + op.toString(), ex);
- try {
- toDoQueue.put(op);
- } catch (InterruptedException e) {
- throw new RuntimeException(
- "Putting into toDoQueue was interrupted.", e);
- } catch (Exception e) {
- LOG.error("main processing loop: " + op.toString(), e);
- }
+
+ // work on the TodoQueue. If that fails, we should shut down.
+ if (!processToDoQueue()) {
+ break;
}
}
} catch (Throwable t) {
LOG.fatal("Unhandled exception. Starting shutdown.", t);
- this.closed.set(true);
+ closed.set(true);
}
// The region servers won't all exit until we stop scanning the meta regions
- stopScanners();
+ regionManager.stopScanners();
// Wait for all the remaining region servers to report in.
- letRegionServersShutdown();
+ serverManager.letRegionServersShutdown();
/*
* Clean up and close up shop
@@ -525,24 +367,94 @@
}
}
server.stop(); // Stop server
- serverLeases.close(); // Turn off the lease monitor
-
+ serverManager.stop();
+ regionManager.stop();
+
// Join up with all threads
- try {
- if (rootScannerThread.isAlive()) {
- rootScannerThread.join(); // Wait for the root scanner to finish.
+ LOG.info("HMaster main thread exiting");
+ }
+
+ /**
+ * Try to get an operation off of the todo queue and perform it.
+ */
+ private boolean processToDoQueue() {
+ RegionServerOperation op = null;
+
+ // block until the root region is online
+ if (regionManager.getRootRegionLocation() != null) {
+ // We can't process server shutdowns unless the root region is online
+ op = delayedToDoQueue.poll();
+ }
+
+ // if there aren't any todo items in the queue, sleep for a bit.
+ if (op == null ) {
+ try {
+ op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // continue
}
- } catch (Exception iex) {
- LOG.warn("root scanner", iex);
}
+
+ // at this point, if there's still no todo operation, or we're supposed to
+ // be closed, return.
+ if (op == null || closed.get()) {
+ return true;
+ }
+
try {
- if (metaScannerThread.isAlive()) {
- metaScannerThread.join(); // Wait for meta scanner to finish.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Main processing loop: " + op.toString());
+ }
+
+ // perform the operation.
+ if (!op.process()) {
+ // Operation would have blocked because not all meta regions are
+ // online. This could cause a deadlock, because this thread is waiting
+ // for the missing meta region(s) to come back online, but since it
+ // is waiting, it cannot process the meta region online operation it
+ // is waiting for. So put this operation back on the queue for now.
+ if (toDoQueue.size() == 0) {
+ // The queue is currently empty so wait for a while to see if what
+ // we need comes in first
+ sleeper.sleep();
+ }
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Put " + op.toString() + " back on queue");
+ }
+ toDoQueue.put(op);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
+ }
+ }
+ } catch (Exception ex) {
+ // There was an exception performing the operation.
+ if (ex instanceof RemoteException) {
+ try {
+ ex = RemoteExceptionHandler.decodeRemoteException(
+ (RemoteException)ex);
+ } catch (IOException e) {
+ ex = e;
+ LOG.warn("main processing loop: " + op.toString(), e);
+ }
+ }
+ // make sure the filesystem is still ok. otherwise, we're toast.
+ if (!checkFileSystem()) {
+ return false;
+ }
+ LOG.warn("Processing pending operations: " + op.toString(), ex);
+ try {
+ // put the operation back on the queue... maybe it'll work next time.
+ toDoQueue.put(op);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
+ } catch (Exception e) {
+ LOG.error("main processing loop: " + op.toString(), e);
}
- } catch(Exception iex) {
- LOG.warn("meta scanner", iex);
}
- LOG.info("HMaster main thread exiting");
+ return true;
}
/*
@@ -555,13 +467,8 @@
private void startServiceThreads() {
String threadName = Thread.currentThread().getName();
try {
- Threads.setDaemonThreadRunning(this.rootScannerThread,
- threadName + ".rootScanner");
- Threads.setDaemonThreadRunning(this.metaScannerThread,
- threadName + ".metaScanner");
- // Leases are not the same as Chore threads. Set name differently.
- this.serverLeases.setName(threadName + ".leaseChecker");
- this.serverLeases.start();
+ regionManager.start();
+ serverManager.start();
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
if (port >= 0) {
@@ -596,70 +503,15 @@
/*
* Start shutting down the master
*/
- private void startShutdown() {
+ void startShutdown() {
closed.set(true);
- stopScanners();
+ regionManager.stopScanners();
synchronized(toDoQueue) {
toDoQueue.clear(); // Empty the queue
delayedToDoQueue.clear(); // Empty shut down queue
toDoQueue.notifyAll(); // Wake main thread
}
- synchronized (serversToServerInfo) {
- serversToServerInfo.notifyAll();
- }
- }
-
- /*
- * Stop the root and meta scanners so that the region servers serving meta
- * regions can shut down.
- */
- private void stopScanners() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("telling root scanner to stop");
- }
- synchronized(rootScannerLock) {
- if (rootScannerThread.isAlive()) {
- rootScannerThread.interrupt(); // Wake root scanner
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("telling meta scanner to stop");
- }
- synchronized(metaScannerLock) {
- if (metaScannerThread.isAlive()) {
- metaScannerThread.interrupt(); // Wake meta scanner
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("meta and root scanners notified");
- }
- }
-
- /*
- * Wait on regionservers to report in
- * with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice
- * the master is going down. Waits until all region servers come back with
- * a MSG_REGIONSERVER_STOP which will cancel their lease or until leases held
- * by remote region servers have expired.
- */
- private void letRegionServersShutdown() {
- if (!fsOk) {
- // Forget waiting for the region servers if the file system has gone
- // away. Just exit as quickly as possible.
- return;
- }
- synchronized (serversToServerInfo) {
- while (this.serversToServerInfo.size() > 0) {
- LOG.info("Waiting on following regionserver(s) to go down (or " +
- "region server lease expiration, whichever happens first): " +
- this.serversToServerInfo.values());
- try {
- serversToServerInfo.wait(threadWakeFrequency);
- } catch (InterruptedException e) {
- // continue
- }
- }
- }
+ serverManager.notifyServers();
}
/*
@@ -669,50 +521,11 @@
/** {@inheritDoc} */
@SuppressWarnings("unused")
public HbaseMapWritable regionServerStartup(HServerInfo serverInfo)
- throws IOException {
-
- String s = serverInfo.getServerAddress().toString().trim();
- LOG.info("received start message from: " + s);
-
- HServerLoad load = serversToLoad.remove(s);
- if (load != null) {
- // The startup message was from a known server.
- // Remove stale information about the server's load.
- Set<String> servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(s);
- loadToServers.put(load, servers);
- }
- }
-
- HServerInfo storedInfo = serversToServerInfo.remove(s);
- if (storedInfo != null && !closed.get()) {
- // The startup message was from a known server with the same name.
- // Timeout the old one right away.
- HServerAddress root = rootRegionLocation.get();
- if (root != null && root.equals(storedInfo.getServerAddress())) {
- unassignRootRegion();
- }
- delayedToDoQueue.put(new ProcessServerShutdown(this, storedInfo));
- }
-
- // record new server
-
- load = new HServerLoad();
- serverInfo.setLoad(load);
- serversToServerInfo.put(s, serverInfo);
- serversToLoad.put(s, load);
- Set<String> servers = loadToServers.get(load);
- if (servers == null) {
- servers = new HashSet<String>();
- }
- servers.add(s);
- loadToServers.put(load, servers);
-
- if (!closed.get()) {
- serverLeases.createLease(s, new ServerExpirer(s));
- }
+ throws IOException {
+ // register with server manager
+ serverManager.regionServerStartup(serverInfo);
+ // send back some config info
return createConfigurationSubset();
}
@@ -733,544 +546,7 @@
/** {@inheritDoc} */
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
throws IOException {
- String serverName = serverInfo.getServerAddress().toString().trim();
- if (msgs.length > 0) {
- if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
- synchronized (serversToServerInfo) {
- try {
- // HRegionServer is shutting down. Cancel the server's lease.
- // Note that canceling the server's lease takes care of updating
- // serversToServerInfo, etc.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Region server " + serverName +
- ": MSG_REPORT_EXITING -- cancelling lease");
- }
-
- if (cancelLease(serverName)) {
- // Only process the exit message if the server still has a lease.
- // Otherwise we could end up processing the server exit twice.
- LOG.info("Region server " + serverName +
- ": MSG_REPORT_EXITING -- lease cancelled");
- // Get all the regions the server was serving reassigned
- // (if we are not shutting down).
- if (!closed.get()) {
- for (int i = 1; i < msgs.length; i++) {
- HRegionInfo info = msgs[i].getRegionInfo();
- if (info.isRootRegion()) {
- rootRegionLocation.set(null);
- } else if (info.isMetaTable()) {
- onlineMetaRegions.remove(info.getStartKey());
- }
-
- this.unassignedRegions.put(info, ZERO_L);
- }
- }
- }
-
- // We don't need to return anything to the server because it isn't
- // going to do any more work.
- return new HMsg[0];
- } finally {
- serversToServerInfo.notifyAll();
- }
- }
- } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
- LOG.info("Region server " + serverName + " quiesced");
- quiescedMetaServers.incrementAndGet();
- }
- }
-
- if(quiescedMetaServers.get() >= serversToServerInfo.size()) {
- // If the only servers we know about are meta servers, then we can
- // proceed with shutdown
- LOG.info("All user tables quiesced. Proceeding with shutdown");
- startShutdown();
- }
-
- if (shutdownRequested && !closed.get()) {
- // Tell the server to stop serving any user regions
- return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
- }
-
- if (closed.get()) {
- // Tell server to shut down if we are shutting down. This should
- // happen after check of MSG_REPORT_EXITING above, since region server
- // will send us one of these messages after it gets MSG_REGIONSERVER_STOP
- return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
- }
-
- HServerInfo storedInfo = serversToServerInfo.get(serverName);
- if (storedInfo == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("received server report from unknown server: " + serverName);
- }
-
- // The HBaseMaster may have been restarted.
- // Tell the RegionServer to start over and call regionServerStartup()
-
- return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)};
-
- } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
-
- // This state is reachable if:
- //
- // 1) RegionServer A started
- // 2) RegionServer B started on the same machine, then
- // clobbered A in regionServerStartup.
- // 3) RegionServer A returns, expecting to work as usual.
- //
- // The answer is to ask A to shut down for good.
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("region server race condition detected: " + serverName);
- }
-
- synchronized (serversToServerInfo) {
- cancelLease(serverName);
- serversToServerInfo.notifyAll();
- }
- return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
-
- } else {
-
- // All's well. Renew the server's lease.
- // This will always succeed; otherwise, the fetch of serversToServerInfo
- // would have failed above.
-
- serverLeases.renewLease(serverName);
-
- // Refresh the info object and the load information
-
- serversToServerInfo.put(serverName, serverInfo);
-
- HServerLoad load = serversToLoad.get(serverName);
- if (load != null && !load.equals(serverInfo.getLoad())) {
- // We have previous information about the load on this server
- // and the load on this server has changed
-
- Set<String> servers = loadToServers.get(load);
-
- // Note that servers should never be null because loadToServers
- // and serversToLoad are manipulated in pairs
-
- servers.remove(serverName);
- loadToServers.put(load, servers);
- }
-
- // Set the current load information
-
- load = serverInfo.getLoad();
- serversToLoad.put(serverName, load);
- Set<String> servers = loadToServers.get(load);
- if (servers == null) {
- servers = new HashSet<String>();
- }
- servers.add(serverName);
- loadToServers.put(load, servers);
-
- // Next, process messages for this server
- return processMsgs(serverInfo, msgs);
- }
- }
-
- /** Cancel a server's lease and update its load information */
- private boolean cancelLease(final String serverName) {
- boolean leaseCancelled = false;
- HServerInfo info = serversToServerInfo.remove(serverName);
- if (info != null) {
- // Only cancel lease and update load information once.
- // This method can be called a couple of times during shutdown.
- if (rootRegionLocation.get() != null &&
- info.getServerAddress().equals(rootRegionLocation.get())) {
- unassignRootRegion();
- }
- LOG.info("Cancelling lease for " + serverName);
- serverLeases.cancelLease(serverName);
- leaseCancelled = true;
-
- // update load information
- HServerLoad load = serversToLoad.remove(serverName);
- if (load != null) {
- Set<String> servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- loadToServers.put(load, servers);
- }
- }
- }
- return leaseCancelled;
- }
-
- /**
- * Process all the incoming messages from a server that's contacted us.
- *
- * Note that we never need to update the server's load information because
- * that has already been done in regionServerReport.
- */
- private HMsg[] processMsgs(HServerInfo info, HMsg incomingMsgs[])
- throws IOException {
-
- ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
- String serverName = info.getServerAddress().toString();
- HashMap<Text, HRegionInfo> regionsToKill = null;
- regionsToKill = killList.remove(serverName);
-
- // Get reports on what the RegionServer did.
-
- for (int i = 0; i < incomingMsgs.length; i++) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received " + incomingMsgs[i].toString() + " from " +
- serverName);
- }
- HRegionInfo region = incomingMsgs[i].getRegionInfo();
-
- switch (incomingMsgs[i].getMsg()) {
-
- case HMsg.MSG_REPORT_PROCESS_OPEN:
- synchronized (unassignedRegions) {
- // Region server has acknowledged request to open region.
- // Extend region open time by max region open time.
- unassignedRegions.put(region,
- System.currentTimeMillis() + this.maxRegionOpenTime);
- }
- break;
-
- case HMsg.MSG_REPORT_OPEN:
- boolean duplicateAssignment = false;
- synchronized (unassignedRegions) {
- if (unassignedRegions.remove(region) == null) {
- if (region.getRegionName().compareTo(
- HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
- // Root region
- HServerAddress rootServer = rootRegionLocation.get();
- if (rootServer != null) {
- if (rootServer.toString().compareTo(serverName) == 0) {
- // A duplicate open report from the correct server
- break;
- }
- // We received an open report on the root region, but it is
- // assigned to a different server
- duplicateAssignment = true;
- }
- } else {
- // Not root region. If it is not a pending region, then we are
- // going to treat it as a duplicate assignment
- if (pendingRegions.contains(region.getRegionName())) {
- // A duplicate report from the correct server
- break;
- }
- // Although we can't tell for certain if this is a duplicate
- // report from the correct server, we are going to treat it
- // as such
- duplicateAssignment = true;
- }
- }
- if (duplicateAssignment) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("region server " + info.getServerAddress().toString()
- + " should not have opened region " + region.getRegionName());
- }
-
- // This Region should not have been opened.
- // Ask the server to shut it down, but don't report it as closed.
- // Otherwise the HMaster will think the Region was closed on purpose,
- // and then try to reopen it elsewhere; that's not what we want.
-
- returnMsgs.add(
- new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region));
-
- } else {
- LOG.info(info.getServerAddress().toString() + " serving " +
- region.getRegionName());
-
- if (region.getRegionName().compareTo(
- HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
- // Store the Root Region location (in memory)
- synchronized (rootRegionLocation) {
- this.rootRegionLocation.set(
- new HServerAddress(info.getServerAddress()));
- this.rootRegionLocation.notifyAll();
- }
- } else {
- // Note that the table has been assigned and is waiting for the
- // meta table to be updated.
-
- pendingRegions.add(region.getRegionName());
-
- // Queue up an update to note the region location.
-
- try {
- toDoQueue.put(new ProcessRegionOpen(this, info, region));
- } catch (InterruptedException e) {
- throw new RuntimeException(
- "Putting into toDoQueue was interrupted.", e);
- }
- }
- }
- }
- break;
-
- case HMsg.MSG_REPORT_CLOSE:
- LOG.info(info.getServerAddress().toString() + " no longer serving " +
- region.getRegionName());
-
- if (region.getRegionName().compareTo(
- HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
-
- // Root region
-
- if (region.isOffline()) {
- // Can't proceed without root region. Shutdown.
- LOG.fatal("root region is marked offline");
- shutdown();
- }
- unassignRootRegion();
-
- } else {
- boolean reassignRegion = !region.isOffline();
- boolean deleteRegion = false;
-
- if (killedRegions.remove(region.getRegionName())) {
- reassignRegion = false;
- }
-
- if (regionsToDelete.remove(region.getRegionName())) {
- reassignRegion = false;
- deleteRegion = true;
- }
-
- if (region.isMetaTable()) {
- // Region is part of the meta table. Remove it from onlineMetaRegions
- onlineMetaRegions.remove(region.getStartKey());
- }
-
- // NOTE: we cannot put the region into unassignedRegions as that
- // could create a race with the pending close if it gets
- // reassigned before the close is processed.
-
- unassignedRegions.remove(region);
-
- try {
- toDoQueue.put(new ProcessRegionClose(this, region, reassignRegion,
- deleteRegion));
-
- } catch (InterruptedException e) {
- throw new RuntimeException(
- "Putting into toDoQueue was interrupted.", e);
- }
- }
- break;
-
- case HMsg.MSG_REPORT_SPLIT:
- // A region has split.
-
- HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
- unassignedRegions.put(newRegionA, ZERO_L);
-
- HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
- unassignedRegions.put(newRegionB, ZERO_L);
-
- LOG.info("region " + region.getRegionName() +
- " split. New regions are: " + newRegionA.getRegionName() + ", " +
- newRegionB.getRegionName());
-
- if (region.isMetaTable()) {
- // A meta region has split.
-
- onlineMetaRegions.remove(region.getStartKey());
- numberOfMetaRegions.incrementAndGet();
- }
- break;
-
- default:
- throw new IOException(
- "Impossible state during msg processing. Instruction: " +
- incomingMsgs[i].getMsg());
- }
- }
-
- // Process the kill list
-
- if (regionsToKill != null) {
- for (HRegionInfo i: regionsToKill.values()) {
- returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
- killedRegions.add(i.getRegionName());
- }
- }
-
- // Figure out what the RegionServer ought to do, and write back.
- assignRegions(info, serverName, returnMsgs);
- return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
- }
-
- /*
- * Assigns regions to region servers attempting to balance the load across
- * all region servers
- *
- * @param info
- * @param serverName
- * @param returnMsgs
- */
- private void assignRegions(HServerInfo info, String serverName,
- ArrayList<HMsg> returnMsgs) {
-
- synchronized (this.unassignedRegions) {
-
- // We need to hold a lock on assign attempts while we figure out what to
- // do so that multiple threads do not execute this method in parallel
- // resulting in assigning the same region to multiple servers.
-
- long now = System.currentTimeMillis();
- Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
- for (Map.Entry<HRegionInfo, Long> e: this.unassignedRegions.entrySet()) {
- HRegionInfo i = e.getKey();
- if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
- !i.isMetaRegion()) {
- // Can't assign user regions until all meta regions have been assigned
- // and are on-line
- continue;
- }
- long diff = now - e.getValue().longValue();
- if (diff > this.maxRegionOpenTime) {
- regionsToAssign.add(e.getKey());
- }
- }
- int nRegionsToAssign = regionsToAssign.size();
- if (nRegionsToAssign <= 0) {
- // No regions to assign. Return.
- return;
- }
-
- if (this.serversToServerInfo.size() == 1) {
- assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
- // Finished. Return.
- return;
- }
-
- // Multiple servers in play.
- // We need to allocate regions only to most lightly loaded servers.
- HServerLoad thisServersLoad = info.getLoad();
- int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
- nRegionsToAssign -= nregions;
- if (nRegionsToAssign > 0) {
- // We still have more regions to assign. See how many we can assign
- // before this server becomes more heavily loaded than the next
- // most heavily loaded server.
- SortedMap<HServerLoad, Set<String>> heavyServers =
- new TreeMap<HServerLoad, Set<String>>();
- synchronized (this.loadToServers) {
- heavyServers.putAll(this.loadToServers.tailMap(thisServersLoad));
- }
- int nservers = 0;
- HServerLoad heavierLoad = null;
- for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
- Set<String> servers = e.getValue();
- nservers += servers.size();
- if (e.getKey().compareTo(thisServersLoad) == 0) {
- // This is the load factor of the server we are considering
- nservers -= 1;
- continue;
- }
-
- // If we get here, we are at the first load entry that is a
- // heavier load than the server we are considering
- heavierLoad = e.getKey();
- break;
- }
-
- nregions = 0;
- if (heavierLoad != null) {
- // There is a more heavily loaded server
- for (HServerLoad load =
- new HServerLoad(thisServersLoad.getNumberOfRequests(),
- thisServersLoad.getNumberOfRegions());
- load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
- load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
- // continue;
- }
- }
-
- if (nregions < nRegionsToAssign) {
- // There are some more heavily loaded servers
- // but we can't assign all the regions to this server.
- if (nservers > 0) {
- // There are other servers that can share the load.
- // Split regions that need assignment across the servers.
- nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
- / (1.0 * nservers));
- } else {
- // No other servers with same load.
- // Split regions over all available servers
- nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
- / (1.0 * serversToServerInfo.size()));
- }
- } else {
- // Assign all regions to this server
- nregions = nRegionsToAssign;
- }
-
- now = System.currentTimeMillis();
- for (HRegionInfo regionInfo: regionsToAssign) {
- LOG.info("assigning region " + regionInfo.getRegionName() +
- " to server " + serverName);
- this.unassignedRegions.put(regionInfo, Long.valueOf(now));
- returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
- if (--nregions <= 0) {
- break;
- }
- }
- }
- }
- }
-
- /*
- * @param nRegionsToAssign
- * @param thisServersLoad
- * @return How many regions we can assign to more lightly loaded servers
- */
- private int regionsPerServer(final int nRegionsToAssign,
- final HServerLoad thisServersLoad) {
-
- SortedMap<HServerLoad, Set<String>> lightServers =
- new TreeMap<HServerLoad, Set<String>>();
-
- synchronized (this.loadToServers) {
- lightServers.putAll(this.loadToServers.headMap(thisServersLoad));
- }
-
- int nRegions = 0;
- for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
- HServerLoad lightLoad = new HServerLoad(e.getKey().getNumberOfRequests(),
- e.getKey().getNumberOfRegions());
- do {
- lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
- nRegions += 1;
- } while (lightLoad.compareTo(thisServersLoad) <= 0
- && nRegions < nRegionsToAssign);
-
- nRegions *= e.getValue().size();
- if (nRegions >= nRegionsToAssign) {
- break;
- }
- }
- return nRegions;
- }
-
- /*
- * Assign all to the only server. An unlikely case but still possible.
- * @param regionsToAssign
- * @param serverName
- * @param returnMsgs
- */
- private void assignRegionsToOneServer(final Set<HRegionInfo> regionsToAssign,
- final String serverName, final ArrayList<HMsg> returnMsgs) {
- long now = System.currentTimeMillis();
- for (HRegionInfo regionInfo: regionsToAssign) {
- LOG.info("assigning region " + regionInfo.getRegionName() +
- " to the only server " + serverName);
- this.unassignedRegions.put(regionInfo, Long.valueOf(now));
- returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
- }
+ return serverManager.regionServerReport(serverInfo, msgs);
}
/*
@@ -1290,8 +566,7 @@
/** {@inheritDoc} */
public void createTable(HTableDescriptor desc)
- throws IOException {
-
+ throws IOException {
if (!isMasterRunning()) {
throw new MasterNotRunningException();
}
@@ -1301,7 +576,7 @@
try {
// We can not access meta regions if they have not already been
// assigned and scanned. If we timeout waiting, just shutdown.
- if (this.metaScannerThread.waitForMetaRegionsOrClose()) {
+ if (regionManager.waitForMetaRegionsOrClose()) {
break;
}
createTable(newRegion);
@@ -1320,7 +595,7 @@
Text tableName = newRegion.getTableDesc().getName();
if (tableInCreation.contains(tableName)) {
throw new TableExistsException("Table " + tableName + " in process "
- + "of being created");
+ + "of being created");
}
tableInCreation.add(tableName);
try {
@@ -1328,66 +603,32 @@
// table would sit should it exist. Open scanner on it. If a region
// for the table we want to create already exists, then table already
// created. Throw already-exists exception.
-
- MetaRegion m = null;
- synchronized (onlineMetaRegions) {
- m = (onlineMetaRegions.size() == 1 ?
- onlineMetaRegions.get(onlineMetaRegions.firstKey()) :
- (onlineMetaRegions.containsKey(newRegion.getRegionName()) ?
- onlineMetaRegions.get(newRegion.getRegionName()) :
- onlineMetaRegions.get(onlineMetaRegions.headMap(
- newRegion.getTableDesc().getName()).lastKey())));
- }
+ MetaRegion m = regionManager.getFirstMetaRegionForRegion(newRegion);
Text metaRegionName = m.getRegionName();
HRegionInterface srvr = connection.getHRegionConnection(m.getServer());
long scannerid = srvr.openScanner(metaRegionName, COL_REGIONINFO_ARRAY,
- tableName, System.currentTimeMillis(), null);
+ tableName, System.currentTimeMillis(), null);
try {
HbaseMapWritable data = srvr.next(scannerid);
// Test data and that the row for the data is for our table. If table
// does not exist, scanner will return row after where our table would
- // be inserted if it exists so look for exact match on table name.
-
+ // be inserted if it exists so look for exact match on table name.
if (data != null && data.size() > 0) {
for (Writable k: data.keySet()) {
if (HRegionInfo.getTableNameFromRegionName(
- ((HStoreKey) k).getRow()).equals(tableName)) {
-
+ ((HStoreKey) k).getRow()).equals(tableName)) {
// Then a region for this table already exists. Ergo table exists.
-
throw new TableExistsException(tableName.toString());
}
}
}
-
} finally {
srvr.close(scannerid);
}
- // 2. Create the HRegion
-
- HRegion region =
- HRegion.createHRegion(newRegion, this.rootdir, this.conf);
-
- // 3. Insert into meta
-
- HRegionInfo info = region.getRegionInfo();
- Text regionName = region.getRegionName();
- BatchUpdate b = new BatchUpdate(regionName);
- b.put(COL_REGIONINFO, Writables.getBytes(info));
- srvr.batchUpdate(metaRegionName, b);
-
- // 4. Close the new region to flush it to disk. Close its log file too.
-
- region.close();
- region.getLog().closeAndDelete();
-
- // 5. Get it assigned to a server
-
- this.unassignedRegions.put(info, ZERO_L);
-
+ regionManager.createRegion(newRegion, srvr, metaRegionName);
} finally {
tableInCreation.remove(newRegion.getTableDesc().getName());
}
@@ -1430,55 +671,12 @@
/** {@inheritDoc} */
public HServerAddress findRootRegion() {
- return rootRegionLocation.get();
+ return regionManager.getRootRegionLocation();
}
/*
* Managing leases
*/
-
- /** Instantiated to monitor the health of a region server */
- private class ServerExpirer implements LeaseListener {
- @SuppressWarnings("hiding")
- private String server;
-
- ServerExpirer(String server) {
- this.server = server;
- }
-
- /** {@inheritDoc} */
- public void leaseExpired() {
- LOG.info(server + " lease expired");
- // Remove the server from the known servers list and update load info
- HServerInfo info = serversToServerInfo.remove(server);
- if (info != null) {
- HServerAddress root = rootRegionLocation.get();
- if (root != null && root.equals(info.getServerAddress())) {
- unassignRootRegion();
- }
- String serverName = info.getServerAddress().toString();
- HServerLoad load = serversToLoad.remove(serverName);
- if (load != null) {
- Set<String> servers = loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- loadToServers.put(load, servers);
- }
- }
- deadServers.add(server);
- }
- synchronized (serversToServerInfo) {
- serversToServerInfo.notifyAll();
- }
-
- // NOTE: If the server was serving the root region, we cannot reassign it
- // here because the new server will start serving the root region before
- // the ProcessServerShutdown operation has a chance to split the log file.
- if (info != null) {
- delayedToDoQueue.put(new ProcessServerShutdown(HMaster.this, info));
- }
- }
- }
/**
* @return Return configuration being used by this server.
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/MetaScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/MetaScanner.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/MetaScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/MetaScanner.java Sat Feb 23 15:53:21 2008
@@ -23,6 +23,8 @@
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
@@ -36,12 +38,16 @@
* action would prevent other work from getting done.
*/
class MetaScanner extends BaseScanner {
+ /** Work for the meta scanner is queued up here */
+ private volatile BlockingQueue<MetaRegion> metaRegionsToScan =
+ new LinkedBlockingQueue<MetaRegion>();
+
private final List<MetaRegion> metaRegionsToRescan =
new ArrayList<MetaRegion>();
-
+
/** Constructor */
- public MetaScanner(HMaster master) {
- super(master, false, master.metaRescanInterval, master.closed);
+ public MetaScanner(HMaster master, RegionManager regionManager) {
+ super(master, regionManager, false, master.metaRescanInterval, master.closed);
}
private boolean scanOneMetaRegion(MetaRegion region) {
@@ -49,8 +55,8 @@
// caused by the server going away. Wait until next rescan interval when
// things should be back to normal
boolean scanSuccessful = false;
- while (!master.closed.get() && !master.rootScanned &&
- master.rootRegionLocation.get() == null) {
+ while (!master.closed.get() && !regionManager.isInitialRootScanComplete() &&
+ regionManager.getRootRegionLocation() == null) {
master.sleeper.sleep();
}
if (master.closed.get()) {
@@ -59,9 +65,9 @@
try {
// Don't interrupt us while we're working
- synchronized (master.metaScannerLock) {
+ synchronized (scannerLock) {
scanRegion(region);
- master.onlineMetaRegions.put(region.getStartKey(), region);
+ regionManager.putMetaRegionOnline(region);
}
scanSuccessful = true;
} catch (IOException e) {
@@ -71,7 +77,7 @@
// so, either it won't be in the onlineMetaRegions list or its host
// address has changed and the containsValue will fail. If not
// found, best thing to do here is probably return.
- if (!master.onlineMetaRegions.containsValue(region.getStartKey())) {
+ if (!regionManager.isMetaRegionOnline(region.getStartKey())) {
LOG.debug("Scanned region is no longer in map of online " +
"regions or its value has changed");
return scanSuccessful;
@@ -91,7 +97,7 @@
MetaRegion region = null;
while (!master.closed.get() && region == null && !metaRegionsScanned()) {
try {
- region = master.metaRegionsToScan.poll(master.threadWakeFrequency,
+ region = metaRegionsToScan.poll(master.threadWakeFrequency,
TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue
@@ -105,16 +111,13 @@
}
}
}
- master.initialMetaScanComplete = true;
+ initialScanComplete = true;
return true;
}
@Override
protected void maintenanceScan() {
- ArrayList<MetaRegion> regions = new ArrayList<MetaRegion>();
- synchronized (master.onlineMetaRegions) {
- regions.addAll(master.onlineMetaRegions.values());
- }
+ List<MetaRegion> regions = regionManager.getListOfOnlineMetaRegions();
for (MetaRegion r: regions) {
scanOneMetaRegion(r);
}
@@ -126,8 +129,8 @@
* regions. This wakes up any threads that were waiting for this to happen.
*/
private synchronized boolean metaRegionsScanned() {
- if (!master.rootScanned ||
- master.numberOfMetaRegions.get() != master.onlineMetaRegions.size()) {
+ if (!regionManager.isInitialRootScanComplete() ||
+ regionManager.numMetaRegions() != regionManager.numOnlineMetaRegions()) {
return false;
}
LOG.info("all meta regions scanned");
@@ -141,8 +144,8 @@
*/
synchronized boolean waitForMetaRegionsOrClose() {
while (!master.closed.get()) {
- if (master.rootScanned &&
- master.numberOfMetaRegions.get() == master.onlineMetaRegions.size()) {
+ if (regionManager.isInitialRootScanComplete() &&
+ regionManager.numMetaRegions() == regionManager.numOnlineMetaRegions()) {
break;
}
@@ -153,5 +156,12 @@
}
}
return master.closed.get();
+ }
+
+ /**
+ * Add another meta region to scan to the queue.
+ */
+ void addMetaRegionToScan(MetaRegion m) throws InterruptedException {
+ metaRegionsToScan.add(m);
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionClose.java Sat Feb 23 15:53:21 2008
@@ -90,9 +90,7 @@
if (reassignRegion) {
LOG.info("reassign region: " + regionInfo.getRegionName());
-
- master.unassignedRegions.put(regionInfo, ZERO_L);
-
+ master.regionManager.setUnassigned(regionInfo);
} else if (deleteRegion) {
try {
HRegion.deleteRegion(master.fs, master.rootdir, regionInfo);
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionOpen.java Sat Feb 23 15:53:21 2008
@@ -90,23 +90,23 @@
// It's a meta region.
MetaRegion m = new MetaRegion(this.serverAddress,
this.regionInfo.getRegionName(), this.regionInfo.getStartKey());
- if (!master.initialMetaScanComplete) {
+ if (!master.regionManager.isInitialMetaScanComplete()) {
// Put it on the queue to be scanned for the first time.
try {
LOG.debug("Adding " + m.toString() + " to regions to scan");
- master.metaRegionsToScan.put(m);
+ master.regionManager.addMetaRegionToScan(m);
} catch (InterruptedException e) {
throw new RuntimeException(
- "Putting into metaRegionsToScan was interrupted.", e);
+ "Putting into metaRegionsToScan was interrupted.", e);
}
} else {
// Add it to the online meta regions
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
- master.onlineMetaRegions.put(this.regionInfo.getStartKey(), m);
+ master.regionManager.putMetaRegionOnline(m);
}
}
// If updated successfully, remove from pending list.
- master.pendingRegions.remove(regionInfo.getRegionName());
+ master.regionManager.noLongerPending(regionInfo.getRegionName());
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessRegionStatusChange.java Sat Feb 23 15:53:21 2008
@@ -56,7 +56,7 @@
available = false;
}
} else {
- if (!master.rootScanned || !metaTableAvailable()) {
+ if (!master.regionManager.isInitialRootScanComplete() || !metaTableAvailable()) {
// The root region has not been scanned or the meta table is not
// available so we can't proceed.
// Put the operation on the delayedToDoQueue
@@ -68,26 +68,18 @@
}
protected HRegionInterface getMetaServer() throws IOException {
- if (this.isMetaTable) {
- this.metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
+ if (isMetaTable) {
+ metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
} else {
- if (this.metaRegion == null) {
- synchronized (master.onlineMetaRegions) {
- metaRegion = master.onlineMetaRegions.size() == 1 ?
- master.onlineMetaRegions.get(master.onlineMetaRegions.firstKey()) :
- master.onlineMetaRegions.containsKey(regionInfo.getRegionName()) ?
- master.onlineMetaRegions.get(regionInfo.getRegionName()) :
- master.onlineMetaRegions.get(master.onlineMetaRegions.headMap(
- regionInfo.getRegionName()).lastKey());
- }
- this.metaRegionName = metaRegion.getRegionName();
+ if (metaRegion == null) {
+ metaRegion = master.regionManager.getFirstMetaRegionForRegion(regionInfo);
+ metaRegionName = metaRegion.getRegionName();
}
}
HServerAddress server = null;
if (isMetaTable) {
- server = master.rootRegionLocation.get();
-
+ server = master.getRootRegionLocation();
} else {
server = metaRegion.getServer();
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ProcessServerShutdown.java Sat Feb 23 15:53:21 2008
@@ -151,42 +151,30 @@
LOG.debug("removing meta region " + info.getRegionName() +
" from online meta regions");
}
- master.onlineMetaRegions.remove(info.getStartKey());
+ master.regionManager.offlineMetaRegion(info.getStartKey());
}
ToDoEntry todo = new ToDoEntry(row, info);
toDoList.add(todo);
- if (master.killList.containsKey(deadServerName)) {
- HashMap<Text, HRegionInfo> regionsToKill =
- new HashMap<Text, HRegionInfo>();
- synchronized (master.killList) {
- regionsToKill.putAll(master.killList.get(deadServerName));
+ if (master.regionManager.isMarkedClosedNoReopen(deadServerName, info.getRegionName())) {
+ master.regionManager.noLongerMarkedClosedNoReopen(deadServerName, info.getRegionName());
+ master.regionManager.noLongerUnassigned(info);
+ if (master.regionManager.isMarkedForDeletion(info.getRegionName())) {
+ // Delete this region
+ master.regionManager.regionDeleted(info.getRegionName());
+ todo.deleteRegion = true;
+ } else {
+ // Mark region offline
+ todo.regionOffline = true;
}
-
- if (regionsToKill.containsKey(info.getRegionName())) {
- regionsToKill.remove(info.getRegionName());
- master.killList.put(deadServerName, regionsToKill);
- master.unassignedRegions.remove(info);
- synchronized (master.regionsToDelete) {
- if (master.regionsToDelete.contains(info.getRegionName())) {
- // Delete this region
- master.regionsToDelete.remove(info.getRegionName());
- todo.deleteRegion = true;
- } else {
- // Mark region offline
- todo.regionOffline = true;
- }
- }
- }
-
} else {
// Get region reassigned
regions.add(info);
// If it was pending, remove.
// Otherwise will obstruct its getting reassigned.
- master.pendingRegions.remove(info.getRegionName());
+ master.regionManager.noLongerPending(info.getRegionName());
}
}
} finally {
@@ -211,27 +199,29 @@
// Get regions reassigned
for (HRegionInfo info: regions) {
- master.unassignedRegions.put(info, ZERO_L);
+ master.regionManager.setUnassigned(info);
}
}
@Override
protected boolean process() throws IOException {
LOG.info("process shutdown of server " + deadServer + ": logSplit: " +
- this.logSplit + ", rootRescanned: " + this.rootRescanned +
- ", numberOfMetaRegions: " + master.numberOfMetaRegions.get() +
- ", onlineMetaRegions.size(): " + master.onlineMetaRegions.size());
+ this.logSplit + ", rootRescanned: " + rootRescanned +
+ ", numberOfMetaRegions: " +
+ master.regionManager.numMetaRegions() +
+ ", onlineMetaRegions.size(): " +
+ master.regionManager.numOnlineMetaRegions());
if (!logSplit) {
// Process the old log file
if (master.fs.exists(oldLogDir)) {
- if (!master.splitLogLock.tryLock()) {
+ if (!master.regionManager.splitLogLock.tryLock()) {
return false;
}
try {
HLog.splitLog(master.rootdir, oldLogDir, master.fs, master.conf);
} finally {
- master.splitLogLock.unlock();
+ master.regionManager.splitLogLock.unlock();
}
}
logSplit = true;
@@ -253,23 +243,23 @@
if (master.closed.get()) {
return true;
}
- server = master.connection.getHRegionConnection(master.rootRegionLocation.get());
+ server = master.connection.getHRegionConnection(
+ master.getRootRegionLocation());
scannerId = -1L;
try {
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
- master.rootRegionLocation.get().getBindAddress());
+ master.getRootRegionLocation().getBindAddress());
}
scannerId =
server.openScanner(HRegionInfo.rootRegionInfo.getRegionName(),
- COLUMN_FAMILY_ARRAY, EMPTY_START_ROW,
- System.currentTimeMillis(), null);
+ COLUMN_FAMILY_ARRAY, EMPTY_START_ROW,
+ System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId,
- HRegionInfo.rootRegionInfo.getRegionName());
+ HRegionInfo.rootRegionInfo.getRegionName());
break;
-
} catch (IOException e) {
if (tries == numRetries - 1) {
throw RemoteExceptionHandler.checkIOException(e);
@@ -278,8 +268,8 @@
}
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning root region on " +
- master.rootRegionLocation.get().getBindAddress() + " finished " +
- Thread.currentThread().getName());
+ master.getRootRegionLocation().getBindAddress() +
+ " finished " + Thread.currentThread().getName());
}
rootRescanned = true;
}
@@ -296,34 +286,31 @@
if (master.closed.get()) {
return true;
}
- List<MetaRegion> regions = new ArrayList<MetaRegion>();
- synchronized (master.onlineMetaRegions) {
- regions.addAll(master.onlineMetaRegions.values());
- }
+ List<MetaRegion> regions = master.regionManager.getListOfOnlineMetaRegions();
for (MetaRegion r: regions) {
HRegionInterface server = null;
long scannerId = -1L;
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown scanning " +
- r.getRegionName() + " on " + r.getServer() + " " +
- Thread.currentThread().getName());
+ r.getRegionName() + " on " + r.getServer() + " " +
+ Thread.currentThread().getName());
}
server = master.connection.getHRegionConnection(r.getServer());
scannerId =
server.openScanner(r.getRegionName(), COLUMN_FAMILY_ARRAY,
- EMPTY_START_ROW, System.currentTimeMillis(), null);
+ EMPTY_START_ROW, System.currentTimeMillis(), null);
scanMetaRegion(server, scannerId, r.getRegionName());
if (LOG.isDebugEnabled()) {
LOG.debug("process server shutdown finished scanning " +
- r.getRegionName() + " on " + r.getServer() + " " +
- Thread.currentThread().getName());
+ r.getRegionName() + " on " + r.getServer() + " " +
+ Thread.currentThread().getName());
}
}
- master.deadServers.remove(deadServerName);
+ master.serverManager.removeDeadServer(deadServerName);
break;
} catch (IOException e) {