You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2007/12/07 22:23:56 UTC
svn commit: r602226 - in /lucene/hadoop/trunk/src/contrib/hbase: ./
src/java/org/apache/hadoop/hbase/
Author: jimk
Date: Fri Dec 7 13:23:54 2007
New Revision: 602226
URL: http://svn.apache.org/viewvc?rev=602226&view=rev
Log:
HADOOP-2338 Fix NullPointerException in master server.
Modified:
lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Dec 7 13:23:54 2007
@@ -60,6 +60,7 @@
(Bryan Duxbury via Stack)
HADOOP-2365 Result of HashFunction.hash() contains all identical values
HADOOP-2362 Leaking hdfs file handle on region split
+ HADOOP-2338 Fix NullPointerException in master server.
IMPROVEMENTS
HADOOP-2401 Add convenience put method that takes writable
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri Dec 7 13:23:54 2007
@@ -31,8 +31,6 @@
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
-import java.util.Timer;
-import java.util.TimerTask;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@@ -74,6 +72,7 @@
HMasterRegionInterface {
static final Log LOG = LogFactory.getLog(HMaster.class.getName());
+ static final Long ZERO_L = Long.valueOf(0L);
/** {@inheritDoc} */
public long getProtocolVersion(String protocol,
@@ -93,6 +92,8 @@
// started here in HMaster rather than have them have to know about the
// hosting class
volatile AtomicBoolean closed = new AtomicBoolean(true);
+ volatile AtomicBoolean shutdownRequested = new AtomicBoolean(false);
+ volatile AtomicInteger quiescedMetaServers = new AtomicInteger(0);
volatile boolean fsOk;
Path dir;
HBaseConfiguration conf;
@@ -102,9 +103,9 @@
int numRetries;
long maxRegionOpenTime;
- DelayQueue<ProcessServerShutdown> shutdownQueue =
- new DelayQueue<ProcessServerShutdown>();
- BlockingQueue<RegionServerOperation> msgQueue =
+ DelayQueue<RegionServerOperation> delayedToDoQueue =
+ new DelayQueue<RegionServerOperation>();
+ BlockingQueue<RegionServerOperation> toDoQueue =
new LinkedBlockingQueue<RegionServerOperation>();
int leaseTimeout;
@@ -424,8 +425,7 @@
|| killedRegions.contains(info.getRegionName()) // queued for offline
|| regionsToDelete.contains(info.getRegionName())) { // queued for delete
- unassignedRegions.remove(info.getRegionName());
- assignAttempts.remove(info.getRegionName());
+ unassignedRegions.remove(info);
return;
}
HServerInfo storedInfo = null;
@@ -458,7 +458,7 @@
if (!deadServer &&
((storedInfo != null && storedInfo.getStartCode() != startCode) ||
(storedInfo == null &&
- !unassignedRegions.containsKey(info.getRegionName()) &&
+ !unassignedRegions.containsKey(info) &&
!pendingRegions.contains(info.getRegionName())
)
)
@@ -495,8 +495,7 @@
}
}
// Now get the region assigned
- unassignedRegions.put(info.getRegionName(), info);
- assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
+ unassignedRegions.put(info, ZERO_L);
}
}
}
@@ -818,8 +817,9 @@
new ConcurrentHashMap<String, HServerLoad>();
/**
- * The 'unassignedRegions' table maps from a region name to a HRegionInfo
- * record, which includes the region's table, its id, and its start/end keys.
+ * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that
+ * indicates the last time we *tried* to assign the region to a RegionServer.
+ * If the timestamp is out of date, then we can try to reassign it.
*
* We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
* set of all known valid regions.
@@ -827,15 +827,8 @@
* <p>Items are removed from this list when a region server reports in that
* the region has been deployed.
*/
- final SortedMap<Text, HRegionInfo> unassignedRegions =
- Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
-
- /**
- * The 'assignAttempts' table maps from regions to a timestamp that indicates
- * the last time we *tried* to assign the region to a RegionServer. If the
- * timestamp is out of date, then we can try to reassign it.
- */
- final Map<Text, Long> assignAttempts = new ConcurrentHashMap<Text, Long>();
+ final SortedMap<HRegionInfo, Long> unassignedRegions =
+ Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>());
/**
* Regions that have been assigned, and the server has reported that it has
@@ -978,10 +971,7 @@
*/
void unassignRootRegion() {
this.rootRegionLocation.set(null);
- this.unassignedRegions.put(HRegionInfo.rootRegionInfo.getRegionName(),
- HRegionInfo.rootRegionInfo);
- this.assignAttempts.put(HRegionInfo.rootRegionInfo.getRegionName(),
- Long.valueOf(0L));
+ this.unassignedRegions.put(HRegionInfo.rootRegionInfo, ZERO_L);
}
/**
@@ -1030,7 +1020,11 @@
* @return Location of the <code>-ROOT-</code> region.
*/
public HServerAddress getRootRegionLocation() {
- return this.rootRegionLocation.get();
+ HServerAddress rootServer = null;
+ if (!shutdownRequested.get() && !closed.get()) {
+ rootServer = this.rootRegionLocation.get();
+ }
+ return rootServer;
}
/**
@@ -1054,11 +1048,11 @@
if (rootRegionLocation.get() != null) {
// We can't process server shutdowns unless the root region is online
- op = this.shutdownQueue.poll();
+ op = this.delayedToDoQueue.poll();
}
if (op == null ) {
try {
- op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+ op = toDoQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue
}
@@ -1077,7 +1071,7 @@
// for the missing meta region(s) to come back online, but since it
// is waiting, it cannot process the meta region online operation it
// is waiting for. So put this operation back on the queue for now.
- if (msgQueue.size() == 0) {
+ if (toDoQueue.size() == 0) {
// The queue is currently empty so wait for a while to see if what
// we need comes in first
sleeper.sleep();
@@ -1086,9 +1080,10 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Put " + op.toString() + " back on queue");
}
- msgQueue.put(op);
+ toDoQueue.put(op);
} catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
}
}
} catch (Exception ex) {
@@ -1106,9 +1101,10 @@
}
LOG.warn("Processing pending operations: " + op.toString(), ex);
try {
- msgQueue.put(op);
+ toDoQueue.put(op);
} catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
}
}
}
@@ -1255,7 +1251,7 @@
if (root != null && root.equals(storedInfo.getServerAddress())) {
unassignRootRegion();
}
- shutdownQueue.put(new ProcessServerShutdown(storedInfo));
+ delayedToDoQueue.put(new ProcessServerShutdown(storedInfo));
}
// record new server
@@ -1302,48 +1298,70 @@
throws IOException {
String serverName = serverInfo.getServerAddress().toString().trim();
long serverLabel = getServerLabel(serverName);
- if (msgs.length > 0 && msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
- synchronized (serversToServerInfo) {
- try {
- // HRegionServer is shutting down. Cancel the server's lease.
- // Note that canceling the server's lease takes care of updating
- // serversToServerInfo, etc.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Region server " + serverName +
- ": MSG_REPORT_EXITING -- cancelling lease");
- }
+// if (LOG.isDebugEnabled()) {
+// LOG.debug("received heartbeat from " + serverName);
+// }
+ if (msgs.length > 0) {
+ if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
+ synchronized (serversToServerInfo) {
+ try {
+ // HRegionServer is shutting down. Cancel the server's lease.
+ // Note that canceling the server's lease takes care of updating
+ // serversToServerInfo, etc.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Region server " + serverName +
+ ": MSG_REPORT_EXITING -- cancelling lease");
+ }
- if (cancelLease(serverName, serverLabel)) {
- // Only process the exit message if the server still has a lease.
- // Otherwise we could end up processing the server exit twice.
- LOG.info("Region server " + serverName +
- ": MSG_REPORT_EXITING -- lease cancelled");
- // Get all the regions the server was serving reassigned
- // (if we are not shutting down).
- if (!closed.get()) {
- for (int i = 1; i < msgs.length; i++) {
- HRegionInfo info = msgs[i].getRegionInfo();
- if (info.getTableDesc().getName().equals(ROOT_TABLE_NAME)) {
- rootRegionLocation.set(null);
- } else if (info.getTableDesc().getName().equals(META_TABLE_NAME)) {
- onlineMetaRegions.remove(info.getStartKey());
- }
+ if (cancelLease(serverName, serverLabel)) {
+ // Only process the exit message if the server still has a lease.
+ // Otherwise we could end up processing the server exit twice.
+ LOG.info("Region server " + serverName +
+ ": MSG_REPORT_EXITING -- lease cancelled");
+ // Get all the regions the server was serving reassigned
+ // (if we are not shutting down).
+ if (!closed.get()) {
+ for (int i = 1; i < msgs.length; i++) {
+ HRegionInfo info = msgs[i].getRegionInfo();
+ if (info.isRootRegion()) {
+ rootRegionLocation.set(null);
+ } else if (info.isMetaTable()) {
+ onlineMetaRegions.remove(info.getStartKey());
+ }
- this.unassignedRegions.put(info.getRegionName(), info);
- this.assignAttempts.put(info.getRegionName(), Long.valueOf(0L));
+ this.unassignedRegions.put(info, ZERO_L);
+ }
}
}
- }
- // We don't need to return anything to the server because it isn't
- // going to do any more work.
- return new HMsg[0];
- } finally {
- serversToServerInfo.notifyAll();
+ // We don't need to return anything to the server because it isn't
+ // going to do any more work.
+ return new HMsg[0];
+ } finally {
+ serversToServerInfo.notifyAll();
+ }
+ }
+ } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
+ LOG.info("Region server " + serverName + " quiesced");
+ if(quiescedMetaServers.incrementAndGet() == serversToServerInfo.size()) {
+ // If the only servers we know about are meta servers, then we can
+ // proceed with shutdown
+ LOG.info("All user tables quiesced. Proceeding with shutdown");
+ closed.set(true);
+ synchronized(toDoQueue) {
+ toDoQueue.clear(); // Empty the queue
+ delayedToDoQueue.clear(); // Empty shut down queue
+ toDoQueue.notifyAll(); // Wake main thread
+ }
}
}
}
+ if (shutdownRequested.get() && !closed.get()) {
+ // Tell the server to stop serving any user regions
+ return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
+ }
+
if (closed.get()) {
// Tell server to shut down if we are shutting down. This should
// happen after check of MSG_REPORT_EXITING above, since region server
@@ -1476,62 +1494,86 @@
switch (incomingMsgs[i].getMsg()) {
case HMsg.MSG_REPORT_PROCESS_OPEN:
- synchronized (this.assignAttempts) {
+ synchronized (unassignedRegions) {
// Region server has acknowledged request to open region.
- // Extend region open time by 1/2 max region open time.
- assignAttempts.put(region.getRegionName(),
- Long.valueOf(assignAttempts.get(
- region.getRegionName()).longValue() +
- (this.maxRegionOpenTime / 2)));
+ // Extend region open time by max region open time.
+ unassignedRegions.put(region,
+ System.currentTimeMillis() + this.maxRegionOpenTime);
}
break;
case HMsg.MSG_REPORT_OPEN:
- HRegionInfo regionInfo = unassignedRegions.get(region.getRegionName());
-
- if (regionInfo == null) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("region server " + info.getServerAddress().toString()
- + " should not have opened region " + region.getRegionName());
+ boolean duplicateAssignment = false;
+ synchronized (unassignedRegions) {
+ if (unassignedRegions.remove(region) == null) {
+ if (region.getRegionName().compareTo(
+ HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
+ // Root region
+ HServerAddress rootServer = rootRegionLocation.get();
+ if (rootServer != null) {
+ if (rootServer.toString().compareTo(serverName) == 0) {
+ // A duplicate open report from the correct server
+ break;
+ }
+ // We received an open report on the root region, but it is
+ // assigned to a different server
+ duplicateAssignment = true;
+ }
+ } else {
+ // Not root region. If it is not a pending region, then we are
+ // going to treat it as a duplicate assignment
+ if (pendingRegions.contains(region.getRegionName())) {
+ // A duplicate report from the correct server
+ break;
+ }
+ // Although we can't tell for certain if this is a duplicate
+ // report from the correct server, we are going to treat it
+ // as such
+ duplicateAssignment = true;
+ }
}
+ if (duplicateAssignment) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("region server " + info.getServerAddress().toString()
+ + " should not have opened region " + region.getRegionName());
+ }
- // This Region should not have been opened.
- // Ask the server to shut it down, but don't report it as closed.
- // Otherwise the HMaster will think the Region was closed on purpose,
- // and then try to reopen it elsewhere; that's not what we want.
-
- returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region));
+ // This Region should not have been opened.
+ // Ask the server to shut it down, but don't report it as closed.
+ // Otherwise the HMaster will think the Region was closed on purpose,
+ // and then try to reopen it elsewhere; that's not what we want.
- } else {
- LOG.info(info.getServerAddress().toString() + " serving " +
- region.getRegionName());
+ returnMsgs.add(
+ new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region));
- if (region.getRegionName().compareTo(
- HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
- // Store the Root Region location (in memory)
- synchronized (rootRegionLocation) {
- this.rootRegionLocation.set(
- new HServerAddress(info.getServerAddress()));
- this.rootRegionLocation.notifyAll();
- }
} else {
- // Note that the table has been assigned and is waiting for the meta
- // table to be updated.
+ LOG.info(info.getServerAddress().toString() + " serving " +
+ region.getRegionName());
- pendingRegions.add(region.getRegionName());
+ if (region.getRegionName().compareTo(
+ HRegionInfo.rootRegionInfo.getRegionName()) == 0) {
+ // Store the Root Region location (in memory)
+ synchronized (rootRegionLocation) {
+ this.rootRegionLocation.set(
+ new HServerAddress(info.getServerAddress()));
+ this.rootRegionLocation.notifyAll();
+ }
+ } else {
+ // Note that the table has been assigned and is waiting for the
+ // meta table to be updated.
- // Queue up an update to note the region location.
+ pendingRegions.add(region.getRegionName());
- try {
- msgQueue.put(new ProcessRegionOpen(info, region));
- } catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
- }
- }
- // Remove from unassigned list so we don't assign it to someone else
- this.unassignedRegions.remove(region.getRegionName());
- this.assignAttempts.remove(region.getRegionName());
+ // Queue up an update to note the region location.
+
+ try {
+ toDoQueue.put(new ProcessRegionOpen(info, region));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
+ }
+ }
+ }
}
break;
@@ -1559,19 +1601,24 @@
deleteRegion = true;
}
+ if (region.isMetaTable()) {
+ // Region is part of the meta table. Remove it from onlineMetaRegions
+ onlineMetaRegions.remove(region.getStartKey());
+ }
+
// NOTE: we cannot put the region into unassignedRegions as that
// could create a race with the pending close if it gets
// reassigned before the close is processed.
- unassignedRegions.remove(region.getRegionName());
- assignAttempts.remove(region.getRegionName());
+ unassignedRegions.remove(region);
try {
- msgQueue.put(new ProcessRegionClose(region, reassignRegion,
+ toDoQueue.put(new ProcessRegionClose(region, reassignRegion,
deleteRegion));
} catch (InterruptedException e) {
- throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
}
}
break;
@@ -1580,12 +1627,10 @@
// A region has split.
HRegionInfo newRegionA = incomingMsgs[++i].getRegionInfo();
- unassignedRegions.put(newRegionA.getRegionName(), newRegionA);
- assignAttempts.put(newRegionA.getRegionName(), Long.valueOf(0L));
+ unassignedRegions.put(newRegionA, ZERO_L);
HRegionInfo newRegionB = incomingMsgs[++i].getRegionInfo();
- unassignedRegions.put(newRegionB.getRegionName(), newRegionB);
- assignAttempts.put(newRegionB.getRegionName(), Long.valueOf(0L));
+ unassignedRegions.put(newRegionB, ZERO_L);
LOG.info("region " + region.getRegionName() +
" split. New regions are: " + newRegionA.getRegionName() + ", " +
@@ -1631,15 +1676,22 @@
private void assignRegions(HServerInfo info, String serverName,
ArrayList<HMsg> returnMsgs) {
- synchronized (this.assignAttempts) {
+ synchronized (this.unassignedRegions) {
// We need to hold a lock on assign attempts while we figure out what to
// do so that multiple threads do not execute this method in parallel
// resulting in assigning the same region to multiple servers.
long now = System.currentTimeMillis();
- Set<Text> regionsToAssign = new HashSet<Text>();
- for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
+ Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
+ for (Map.Entry<HRegionInfo, Long> e: this.unassignedRegions.entrySet()) {
+ HRegionInfo i = e.getKey();
+ if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
+ !i.isMetaRegion()) {
+ // Can't assign user regions until all meta regions have been assigned
+ // and are on-line
+ continue;
+ }
long diff = now - e.getValue().longValue();
if (diff > this.maxRegionOpenTime) {
regionsToAssign.add(e.getKey());
@@ -1720,11 +1772,10 @@
}
now = System.currentTimeMillis();
- for (Text regionName: regionsToAssign) {
- HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
- LOG.info("assigning region " + regionName + " to server " +
- serverName);
- this.assignAttempts.put(regionName, Long.valueOf(now));
+ for (HRegionInfo regionInfo: regionsToAssign) {
+ LOG.info("assigning region " + regionInfo.getRegionName() +
+ " to server " + serverName);
+ this.unassignedRegions.put(regionInfo, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
if (--nregions <= 0) {
break;
@@ -1773,14 +1824,13 @@
* @param serverName
* @param returnMsgs
*/
- private void assignRegionsToOneServer(final Set<Text> regionsToAssign,
+ private void assignRegionsToOneServer(final Set<HRegionInfo> regionsToAssign,
final String serverName, final ArrayList<HMsg> returnMsgs) {
long now = System.currentTimeMillis();
- for (Text regionName: regionsToAssign) {
- HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
- LOG.info("assigning region " + regionName + " to the only server " +
- serverName);
- this.assignAttempts.put(regionName, Long.valueOf(now));
+ for (HRegionInfo regionInfo: regionsToAssign) {
+ LOG.info("assigning region " + regionInfo.getRegionName() +
+ " to the only server " + serverName);
+ this.unassignedRegions.put(regionInfo, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
}
}
@@ -1789,10 +1839,63 @@
* Some internal classes to manage msg-passing and region server operations
*/
- private abstract class RegionServerOperation {
- RegionServerOperation() {}
+ private abstract class RegionServerOperation implements Delayed {
+ private long expire;
+
+ protected RegionServerOperation() {
+ // Set the future time at which we expect to be released from the
+ // DelayQueue we're inserted in on lease expiration.
+ this.expire = System.currentTimeMillis() + leaseTimeout / 2;
+ }
+
+ /** {@inheritDoc} */
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(this.expire - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ /** {@inheritDoc} */
+ public int compareTo(Delayed o) {
+ return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
+ - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
+ }
+
+ protected void requeue() {
+ this.expire = System.currentTimeMillis() + leaseTimeout / 2;
+ delayedToDoQueue.put(this);
+ }
+
+ protected boolean rootAvailable() {
+ boolean available = true;
+ if (rootRegionLocation.get() == null) {
+ available = false;
+ requeue();
+ }
+ return available;
+ }
- abstract boolean process() throws IOException;
+ protected boolean metaTableAvailable() {
+ boolean available = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("numberOfMetaRegions: " + numberOfMetaRegions.get() +
+ ", onlineMetaRegions.size(): " + onlineMetaRegions.size());
+ }
+ if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
+ // We can't proceed because not all of the meta regions are online.
+ // We can't block either because that would prevent the meta region
+ // online message from being processed. In order to prevent spinning
+ // in the run queue, put this request on the delay queue to give
+ // other threads the opportunity to get the meta regions on-line.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Requeuing because not all meta regions are online");
+ }
+ available = false;
+ requeue();
+ }
+ return available;
+ }
+
+ protected abstract boolean process() throws IOException;
}
/**
@@ -1800,15 +1903,13 @@
* The region server's log file needs to be split up for each region it was
* serving, and the regions need to get reassigned.
*/
- private class ProcessServerShutdown extends RegionServerOperation
- implements Delayed {
- private long expire;
+ private class ProcessServerShutdown extends RegionServerOperation {
private HServerAddress deadServer;
private String deadServerName;
private Path oldLogDir;
- private transient boolean logSplit;
- private transient boolean rootChecked;
- private transient boolean rootRescanned;
+ private boolean logSplit;
+ private boolean rootChecked;
+ private boolean rootRescanned;
private class ToDoEntry {
boolean deleteRegion;
@@ -1824,7 +1925,10 @@
}
}
- ProcessServerShutdown(HServerInfo serverInfo) {
+ /**
+ * @param serverInfo
+ */
+ public ProcessServerShutdown(HServerInfo serverInfo) {
super();
this.deadServer = serverInfo.getServerAddress();
this.deadServerName = this.deadServer.toString();
@@ -1838,24 +1942,9 @@
dirName.append("_");
dirName.append(deadServer.getPort());
this.oldLogDir = new Path(dir, dirName.toString());
- // Set the future time at which we expect to be released from the
- // DelayQueue we're inserted in on lease expiration.
- this.expire = System.currentTimeMillis() + leaseTimeout / 2;
}
/** {@inheritDoc} */
- public long getDelay(TimeUnit unit) {
- return unit.convert(this.expire - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- }
-
- /** {@inheritDoc} */
- public int compareTo(Delayed o) {
- return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
- - o.getDelay(TimeUnit.MILLISECONDS)).intValue();
- }
-
- /** {@inheritDoc} */
@Override
public String toString() {
return "ProcessServerShutdown of " + this.deadServer.toString();
@@ -1866,7 +1955,7 @@
Text regionName) throws IOException {
ArrayList<ToDoEntry> toDoList = new ArrayList<ToDoEntry>();
- TreeMap<Text, HRegionInfo> regions = new TreeMap<Text, HRegionInfo>();
+ HashSet<HRegionInfo> regions = new HashSet<HRegionInfo>();
try {
while (true) {
@@ -1958,8 +2047,7 @@
if (regionsToKill.containsKey(info.getRegionName())) {
regionsToKill.remove(info.getRegionName());
killList.put(deadServerName, regionsToKill);
- unassignedRegions.remove(info.getRegionName());
- assignAttempts.remove(info.getRegionName());
+ unassignedRegions.remove(info);
synchronized (regionsToDelete) {
if (regionsToDelete.contains(info.getRegionName())) {
// Delete this region
@@ -1974,7 +2062,7 @@
} else {
// Get region reassigned
- regions.put(info.getRegionName(), info);
+ regions.add(info);
// If it was pending, remove.
// Otherwise will obstruct its getting reassigned.
@@ -2008,16 +2096,13 @@
}
// Get regions reassigned
- for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
- Text region = e.getKey();
- HRegionInfo regionInfo = e.getValue();
- unassignedRegions.put(region, regionInfo);
- assignAttempts.put(region, Long.valueOf(0L));
+ for (HRegionInfo info: regions) {
+ unassignedRegions.put(info, ZERO_L);
}
}
@Override
- boolean process() throws IOException {
+ protected boolean process() throws IOException {
LOG.info("process shutdown of server " + deadServer + ": logSplit: " +
this.logSplit + ", rootChecked: " + this.rootChecked +
", rootRescanned: " + this.rootRescanned + ", numberOfMetaRegions: " +
@@ -2040,30 +2125,12 @@
}
if (!rootChecked) {
- boolean rootRegionUnavailable = false;
- if (rootRegionLocation.get() == null) {
- rootRegionUnavailable = true;
-
- } else if (deadServer.equals(rootRegionLocation.get())) {
- // We should never get here because whenever an object of this type
- // is created, a check is made to see if it is the root server.
- // and unassignRootRegion() is called then. However, in the
- // unlikely event that we do end up here, let's do the right thing.
- unassignRootRegion();
- rootRegionUnavailable = true;
- }
- if (rootRegionUnavailable) {
- // We can't do anything until the root region is on-line, put
- // us back on the delay queue. Reset the future time at which
- // we expect to be released from the DelayQueue we're inserted
- // in on lease expiration.
- this.expire = System.currentTimeMillis() + leaseTimeout / 2;
- shutdownQueue.put(this);
-
- // Return true so run() does not put us back on the msgQueue
+ if (!rootAvailable()) {
+ // Return true so that worker does not put this request back on the
+ // toDoQueue.
+ // rootAvailable() has already put it on the delayedToDoQueue
return true;
}
- rootChecked = true;
}
if (!rootRescanned) {
@@ -2114,27 +2181,14 @@
}
rootRescanned = true;
}
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("numberOfMetaRegions: " + numberOfMetaRegions.get() +
- ", onlineMetaRegions.size(): " + onlineMetaRegions.size());
- }
- if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
- // We can't proceed because not all of the meta regions are online.
- // We can't block either because that would prevent the meta region
- // online message from being processed. In order to prevent spinning
- // in the run queue, put this request on the delay queue to give
- // other threads the opportunity to get the meta regions on-line.
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Requeuing shutdown because not all meta regions are online");
- }
- this.expire = System.currentTimeMillis() + leaseTimeout / 2;
- shutdownQueue.put(this);
-
- // Return true so run() does not put us back on the msgQueue
+
+ if (!metaTableAvailable()) {
+ // We can't proceed because not all meta regions are online.
+ // metaAvailable() has put this request on the delayedToDoQueue
+ // Return true so that worker does not put this on the toDoQueue
return true;
}
+
for (int tries = 0; tries < numRetries; tries++) {
try {
if (closed.get()) {
@@ -2181,33 +2235,95 @@
}
/**
+ * Abstract class that performs common operations for
+ * @see #ProcessRegionClose and @see #ProcessRegionOpen
+ */
+ private abstract class ProcessRegionStatusChange
+ extends RegionServerOperation {
+
+ protected final boolean isMetaTable;
+ protected final HRegionInfo regionInfo;
+ private MetaRegion metaRegion;
+ protected Text metaRegionName;
+
+ /**
+ * @param regionInfo
+ */
+ public ProcessRegionStatusChange(HRegionInfo regionInfo) {
+ super();
+ this.regionInfo = regionInfo;
+ this.isMetaTable = regionInfo.isMetaTable();
+ this.metaRegion = null;
+ this.metaRegionName = null;
+ }
+
+ protected boolean metaRegionAvailable() {
+ boolean available = true;
+ if (isMetaTable) {
+ // This operation is for the meta table
+ if (!rootAvailable()) {
+ // But we can't proceed unless the root region is available
+ available = false;
+ }
+ } else {
+ if (!rootScanned || !metaTableAvailable()) {
+ // The root region has not been scanned or the meta table is not
+ // available so we can't proceed.
+ // Put the operation on the delayedToDoQueue
+ requeue();
+ available = false;
+ }
+ }
+ return available;
+ }
+
+ protected HRegionInterface getMetaServer() throws IOException {
+ if (this.isMetaTable) {
+ this.metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
+ } else {
+ if (this.metaRegion == null) {
+ synchronized (onlineMetaRegions) {
+ metaRegion = onlineMetaRegions.size() == 1 ?
+ onlineMetaRegions.get(onlineMetaRegions.firstKey()) :
+ onlineMetaRegions.containsKey(regionInfo.getRegionName()) ?
+ onlineMetaRegions.get(regionInfo.getRegionName()) :
+ onlineMetaRegions.get(onlineMetaRegions.headMap(
+ regionInfo.getRegionName()).lastKey());
+ }
+ this.metaRegionName = metaRegion.getRegionName();
+ }
+ }
+
+ HServerAddress server = null;
+ if (isMetaTable) {
+ server = rootRegionLocation.get();
+
+ } else {
+ server = metaRegion.getServer();
+ }
+ return connection.getHRegionConnection(server);
+ }
+
+ }
+ /**
* ProcessRegionClose is instantiated when a region server reports that it
* has closed a region.
*/
- private class ProcessRegionClose extends RegionServerOperation {
- private HRegionInfo regionInfo;
+ private class ProcessRegionClose extends ProcessRegionStatusChange {
private boolean reassignRegion;
private boolean deleteRegion;
- private boolean rootRegion;
- ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion,
+ /**
+ * @param regionInfo
+ * @param reassignRegion
+ * @param deleteRegion
+ */
+ public ProcessRegionClose(HRegionInfo regionInfo, boolean reassignRegion,
boolean deleteRegion) {
- super();
-
- this.regionInfo = regionInfo;
+ super(regionInfo);
this.reassignRegion = reassignRegion;
this.deleteRegion = deleteRegion;
-
- // If the region closing down is a meta region then we need to update
- // the ROOT table
-
- if (this.regionInfo.getTableDesc().getName().equals(META_TABLE_NAME)) {
- this.rootRegion = true;
-
- } else {
- this.rootRegion = false;
- }
}
/** {@inheritDoc} */
@@ -2217,7 +2333,7 @@
}
@Override
- boolean process() throws IOException {
+ protected boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
if (closed.get()) {
return true;
@@ -2226,50 +2342,15 @@
// Mark the Region as unavailable in the appropriate meta table
- Text metaRegionName;
- HRegionInterface server;
- if (rootRegion) {
- if (rootRegionLocation.get() == null || !rootScanned) {
- // We can't proceed until the root region is online and has been
- // scanned
- return false;
- }
- metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
- server = connection.getHRegionConnection(rootRegionLocation.get());
- onlineMetaRegions.remove(regionInfo.getStartKey());
-
- } else {
- if (!rootScanned ||
- numberOfMetaRegions.get() != onlineMetaRegions.size()) {
-
- // We can't proceed because not all of the meta regions are online.
- // We can't block either because that would prevent the meta region
- // online message from being processed. So return false to have this
- // operation requeued.
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Requeuing close because rootScanned=" +
- rootScanned + ", numberOfMetaRegions=" +
- numberOfMetaRegions.get() + ", onlineMetaRegions.size()=" +
- onlineMetaRegions.size());
- }
- return false;
- }
-
- MetaRegion r = null;
- synchronized (onlineMetaRegions) {
- if (onlineMetaRegions.containsKey(regionInfo.getRegionName())) {
- r = onlineMetaRegions.get(regionInfo.getRegionName());
-
- } else {
- r = onlineMetaRegions.get(onlineMetaRegions.headMap(
- regionInfo.getRegionName()).lastKey());
- }
- }
- metaRegionName = r.getRegionName();
- server = connection.getHRegionConnection(r.getServer());
+ if (!metaRegionAvailable()) {
+ // We can't proceed unless the meta region we are going to update
+ // is online. metaRegionAvailable() has put this operation on the
+ // delayedToDoQueue, so return true so the operation is not put
+ // back on the toDoQueue
+ return true;
}
+ HRegionInterface server = getMetaServer();
try {
BatchUpdate b = new BatchUpdate(rand.nextLong());
long lockid = b.startUpdate(regionInfo.getRegionName());
@@ -2298,8 +2379,7 @@
if (reassignRegion) {
LOG.info("reassign region: " + regionInfo.getRegionName());
- unassignedRegions.put(regionInfo.getRegionName(), regionInfo);
- assignAttempts.put(regionInfo.getRegionName(), Long.valueOf(0L));
+ unassignedRegions.put(regionInfo, ZERO_L);
} else if (deleteRegion) {
try {
@@ -2320,19 +2400,18 @@
* serving a region. This applies to all meta and user regions except the
* root region which is handled specially.
*/
- private class ProcessRegionOpen extends RegionServerOperation {
- private final boolean rootRegion;
- private final HRegionInfo region;
+ private class ProcessRegionOpen extends ProcessRegionStatusChange {
private final HServerAddress serverAddress;
private final byte [] startCode;
- ProcessRegionOpen(HServerInfo info, HRegionInfo region)
+ /**
+ * @param info
+ * @param regionInfo
+ * @throws IOException
+ */
+ public ProcessRegionOpen(HServerInfo info, HRegionInfo regionInfo)
throws IOException {
- // If true, the region which just came on-line is a META region.
- // We need to look in the ROOT region for its information. Otherwise,
- // its just an ordinary region. Look for it in the META table.
- this.rootRegion = region.getTableDesc().getName().equals(META_TABLE_NAME);
- this.region = region;
+ super(regionInfo);
this.serverAddress = info.getServerAddress();
this.startCode = Writables.longToBytes(info.getStartCode());
}
@@ -2344,72 +2423,40 @@
}
@Override
- boolean process() throws IOException {
+ protected boolean process() throws IOException {
for (int tries = 0; tries < numRetries; tries++) {
if (closed.get()) {
return true;
}
- LOG.info(region.toString() + " open on " +
+ LOG.info(regionInfo.toString() + " open on " +
this.serverAddress.toString());
- // Register the newly-available Region's location.
- Text metaRegionName;
- HRegionInterface server;
- if (this.rootRegion) {
- if (rootRegionLocation.get() == null || !rootScanned) {
- // We can't proceed until root region is online and scanned
- if (LOG.isDebugEnabled()) {
- LOG.debug("root region: " +
- ((rootRegionLocation.get() != null)?
- rootRegionLocation.get().toString(): "null") +
- ", rootScanned: " + rootScanned);
- }
- return false;
- }
- metaRegionName = HRegionInfo.rootRegionInfo.getRegionName();
- server = connection.getHRegionConnection(rootRegionLocation.get());
- } else {
- if (!rootScanned ||
- numberOfMetaRegions.get() != onlineMetaRegions.size()) {
- // We can't proceed because not all of the meta regions are online.
- // We can't block either because that would prevent the meta region
- // online message from being processed. So return false to have this
- // operation requeued.
- if (LOG.isDebugEnabled()) {
- LOG.debug("Requeuing open because rootScanned: " +
- rootScanned + ", numberOfMetaRegions: " +
- numberOfMetaRegions.get() + ", onlineMetaRegions.size(): " +
- onlineMetaRegions.size());
- }
- return false;
- }
-
- MetaRegion r = null;
- synchronized (onlineMetaRegions) {
- r = onlineMetaRegions.containsKey(region.getRegionName()) ?
- onlineMetaRegions.get(region.getRegionName()) :
- onlineMetaRegions.get(onlineMetaRegions.headMap(
- region.getRegionName()).lastKey());
- }
- metaRegionName = r.getRegionName();
- server = connection.getHRegionConnection(r.getServer());
+ if (!metaRegionAvailable()) {
+ // We can't proceed unless the meta region we are going to update
+ // is online. metaRegionAvailable() has put this operation on the
+ // delayedToDoQueue, so return true so the operation is not put
+ // back on the toDoQueue
+ return true;
}
+
+ // Register the newly-available Region's location.
- LOG.info("updating row " + region.getRegionName() + " in table " +
+ HRegionInterface server = getMetaServer();
+ LOG.info("updating row " + regionInfo.getRegionName() + " in table " +
metaRegionName + " with startcode " +
Writables.bytesToLong(this.startCode) + " and server "+
serverAddress.toString());
try {
BatchUpdate b = new BatchUpdate(rand.nextLong());
- long lockid = b.startUpdate(region.getRegionName());
+ long lockid = b.startUpdate(regionInfo.getRegionName());
b.put(lockid, COL_SERVER,
Writables.stringToBytes(serverAddress.toString()));
b.put(lockid, COL_STARTCODE, startCode);
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
- if (region.getTableDesc().getName().equals(META_TABLE_NAME)) {
+ if (isMetaTable) {
// It's a meta region.
MetaRegion m = new MetaRegion(this.serverAddress,
- this.region.getRegionName(), this.region.getStartKey());
+ this.regionInfo.getRegionName(), this.regionInfo.getStartKey());
if (!initialMetaScanComplete) {
// Put it on the queue to be scanned for the first time.
try {
@@ -2422,11 +2469,11 @@
} else {
// Add it to the online meta regions
LOG.debug("Adding to onlineMetaRegions: " + m.toString());
- onlineMetaRegions.put(this.region.getStartKey(), m);
+ onlineMetaRegions.put(this.regionInfo.getStartKey(), m);
}
}
// If updated successfully, remove from pending list.
- pendingRegions.remove(region.getRegionName());
+ pendingRegions.remove(regionInfo.getRegionName());
break;
} catch (IOException e) {
if (tries == numRetries - 1) {
@@ -2449,19 +2496,8 @@
/** {@inheritDoc} */
public void shutdown() {
- TimerTask tt = new TimerTask() {
- @Override
- public void run() {
- closed.set(true);
- synchronized(msgQueue) {
- msgQueue.clear(); // Empty the queue
- shutdownQueue.clear(); // Empty shut down queue
- msgQueue.notifyAll(); // Wake main thread
- }
- }
- };
- Timer t = new Timer(getName() + "-Shutdown");
- t.schedule(tt, 10);
+ LOG.info("Cluster shutdown requested. Starting to quiesce servers");
+ this.shutdownRequested.set(true);
}
/** {@inheritDoc} */
@@ -2563,8 +2599,7 @@
// 5. Get it assigned to a server
- this.unassignedRegions.put(regionName, info);
- this.assignAttempts.put(regionName, Long.valueOf(0L));
+ this.unassignedRegions.put(info, ZERO_L);
} finally {
tableInCreation.remove(newRegion.getTableDesc().getName());
@@ -2838,14 +2873,12 @@
}
if (online) { // Bring offline regions on-line
- if (!unassignedRegions.containsKey(i.getRegionName())) {
- unassignedRegions.put(i.getRegionName(), i);
- assignAttempts.put(i.getRegionName(), Long.valueOf(0L));
+ if (!unassignedRegions.containsKey(i)) {
+ unassignedRegions.put(i, ZERO_L);
}
} else { // Prevent region from getting assigned.
- unassignedRegions.remove(i.getRegionName());
- assignAttempts.remove(i.getRegionName());
+ unassignedRegions.remove(i);
}
}
@@ -3069,7 +3102,7 @@
// here because the new server will start serving the root region before
// the ProcessServerShutdown operation has a chance to split the log file.
if (info != null) {
- shutdownQueue.put(new ProcessServerShutdown(info));
+ delayedToDoQueue.put(new ProcessServerShutdown(info));
}
}
}
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Fri Dec 7 13:23:54 2007
@@ -48,6 +48,9 @@
/** Stop serving the specified region and don't report back that it's closed */
public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
+
+ /** Stop serving user regions */
+ public static final byte MSG_REGIONSERVER_QUIESCE = 7;
// Messages sent from the region server to the master
@@ -72,9 +75,12 @@
* region server is shutting down
*
* note that this message is followed by MSG_REPORT_CLOSE messages for each
- * region the region server was serving.
+ * region the region server was serving, unless it was told to quiesce.
*/
public static final byte MSG_REPORT_EXITING = 104;
+
+ /** region server has closed all user regions but is still serving meta regions */
+ public static final byte MSG_REPORT_QUIESCED = 105;
byte msg;
HRegionInfo info;
@@ -148,6 +154,10 @@
message.append("MSG_REGION_CLOSE_WITHOUT_REPORT : ");
break;
+ case MSG_REGIONSERVER_QUIESCE:
+ message.append("MSG_REGIONSERVER_QUIESCE : ");
+ break;
+
case MSG_REPORT_PROCESS_OPEN:
message.append("MSG_REPORT_PROCESS_OPEN : ");
break;
@@ -166,6 +176,10 @@
case MSG_REPORT_EXITING:
message.append("MSG_REPORT_EXITING : ");
+ break;
+
+ case MSG_REPORT_QUIESCED:
+ message.append("MSG_REPORT_QUIESCED : ");
break;
default:
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionInfo.java Fri Dec 7 13:23:54 2007
@@ -193,6 +193,21 @@
return tableDesc;
}
+ /** @return true if this is the root region */
+ public boolean isRootRegion() {
+ return this.tableDesc.isRootRegion();
+ }
+
+ /** @return true if this is the meta table */
+ public boolean isMetaTable() {
+ return this.tableDesc.isMetaTable();
+ }
+
+ /** @return true if this region is a meta region */
+ public boolean isMetaRegion() {
+ return this.tableDesc.isMetaRegion();
+ }
+
/**
* @return True if has been split and has daughters.
*/
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Dec 7 13:23:54 2007
@@ -81,6 +81,8 @@
// Chore threads need to know about the hosting class.
protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
+ protected final AtomicBoolean quiesced = new AtomicBoolean(false);
+
// Go down hard. Used if file system becomes unavailable and also in
// debugging and unit tests.
protected volatile boolean abortRequested;
@@ -652,6 +654,7 @@
* load/unload instructions.
*/
public void run() {
+ boolean quiesceRequested = false;
try {
init(reportForDuty());
long lastMsg = 0;
@@ -682,6 +685,16 @@
HMsg msgs[] =
this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
lastMsg = System.currentTimeMillis();
+
+ if (this.quiesced.get() && onlineRegions.size() == 0) {
+ // We've just told the master we're exiting because we aren't
+ // serving any regions. So set the stop bit and exit.
+ LOG.info("Server quiesced and not serving any regions. " +
+ "Starting shutdown");
+ stopRequested.set(true);
+ continue;
+ }
+
// Queue up the HMaster's instruction stream for processing
boolean restart = false;
for(int i = 0; i < msgs.length && !stopRequested.get() &&
@@ -689,9 +702,7 @@
switch(msgs[i].getMsg()) {
case HMsg.MSG_CALL_SERVER_STARTUP:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got call server startup message");
- }
+ LOG.info("Got call server startup message");
// We the MSG_CALL_SERVER_STARTUP on startup but we can also
// get it when the master is panicing because for instance
// the HDFS has been yanked out from under it. Be wary of
@@ -725,11 +736,22 @@
break;
case HMsg.MSG_REGIONSERVER_STOP:
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got regionserver stop message");
- }
+ LOG.info("Got regionserver stop message");
stopRequested.set(true);
break;
+
+ case HMsg.MSG_REGIONSERVER_QUIESCE:
+ if (!quiesceRequested) {
+ LOG.info("Got quiesce server message");
+ try {
+ toDo.put(new ToDoEntry(msgs[i]));
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Putting into msgQueue was " +
+ "interrupted.", e);
+ }
+ quiesceRequested = true;
+ }
+ break;
default:
if (fsOk) {
@@ -1101,6 +1123,10 @@
try {
LOG.info(e.msg.toString());
switch(e.msg.getMsg()) {
+
+ case HMsg.MSG_REGIONSERVER_QUIESCE:
+ closeUserRegions();
+ break;
case HMsg.MSG_REGION_OPEN:
// Open a region
@@ -1149,12 +1175,19 @@
}
}
- void openRegion(final HRegionInfo regionInfo) throws IOException {
+ void openRegion(final HRegionInfo regionInfo) {
HRegion region = onlineRegions.get(regionInfo.getRegionName());
if(region == null) {
- region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
- this.log, FileSystem.get(conf), conf, regionInfo, null,
- this.cacheFlusher);
+ try {
+ region = new HRegion(new Path(this.conf.get(HConstants.HBASE_DIR)),
+ this.log, FileSystem.get(conf), conf, regionInfo, null,
+ this.cacheFlusher);
+
+ } catch (IOException e) {
+ LOG.error("error opening region " + regionInfo.getRegionName(), e);
+ reportClose(region);
+ return;
+ }
this.lock.writeLock().lock();
try {
this.log.setSequenceNumber(region.getMinSequenceId());
@@ -1206,6 +1239,45 @@
}
}
return regionsToClose;
+ }
+
+ /** Called as the first stage of cluster shutdown. */
+ void closeUserRegions() {
+ ArrayList<HRegion> regionsToClose = new ArrayList<HRegion>();
+ this.lock.writeLock().lock();
+ try {
+ synchronized (onlineRegions) {
+ for (Iterator<Map.Entry<Text, HRegion>> i =
+ onlineRegions.entrySet().iterator();
+ i.hasNext();) {
+ Map.Entry<Text, HRegion> e = i.next();
+ HRegion r = e.getValue();
+ if (!r.getRegionInfo().isMetaRegion()) {
+ regionsToClose.add(r);
+ i.remove();
+ }
+ }
+ }
+ } finally {
+ this.lock.writeLock().unlock();
+ }
+ for(HRegion region: regionsToClose) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("closing region " + region.getRegionName());
+ }
+ try {
+ region.close(false);
+ } catch (IOException e) {
+ LOG.error("error closing region " + region.getRegionName(),
+ RemoteExceptionHandler.checkIOException(e));
+ }
+ }
+ this.quiesced.set(true);
+ if (onlineRegions.size() == 0) {
+ outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_EXITING));
+ } else {
+ outboundMsgs.add(new HMsg(HMsg.MSG_REPORT_QUIESCED));
+ }
}
//
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java?rev=602226&r1=602225&r2=602226&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HTableDescriptor.java Fri Dec 7 13:23:54 2007
@@ -52,7 +52,8 @@
HColumnDescriptor.CompressionType.NONE, false, Integer.MAX_VALUE,
null));
-
+ private boolean rootregion;
+ private boolean metaregion;
private Text name;
// TODO: Does this need to be a treemap? Can it be a HashMap?
private final TreeMap<Text, HColumnDescriptor> families;
@@ -69,6 +70,8 @@
/** Used to construct the table descriptors for root and meta tables */
private HTableDescriptor(Text name, HColumnDescriptor family) {
+ rootregion = name.equals(HConstants.ROOT_TABLE_NAME);
+ this.metaregion = true;
this.name = new Text(name);
this.families = new TreeMap<Text, HColumnDescriptor>();
families.put(family.getName(), family);
@@ -92,13 +95,30 @@
* <code>[a-zA-Z_0-9]
*/
public HTableDescriptor(String name) {
+ this();
Matcher m = LEGAL_TABLE_NAME.matcher(name);
if (m == null || !m.matches()) {
throw new IllegalArgumentException(
"Table names can only contain 'word characters': i.e. [a-zA-Z_0-9");
}
- this.name = new Text(name);
- this.families = new TreeMap<Text, HColumnDescriptor>();
+ this.name.set(name);
+ this.rootregion = false;
+ this.metaregion = false;
+ }
+
+ /** @return true if this is the root region */
+ public boolean isRootRegion() {
+ return rootregion;
+ }
+
+ /** @return true if table is the meta table */
+ public boolean isMetaTable() {
+ return metaregion && !rootregion;
+ }
+
+ /** @return true if this is a meta region (part of the root or meta tables) */
+ public boolean isMetaRegion() {
+ return metaregion;
}
/** @return name of table */
@@ -165,6 +185,8 @@
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
+ out.writeBoolean(rootregion);
+ out.writeBoolean(metaregion);
name.write(out);
out.writeInt(families.size());
for(Iterator<HColumnDescriptor> it = families.values().iterator();
@@ -175,6 +197,8 @@
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
+ this.rootregion = in.readBoolean();
+ this.metaregion = in.readBoolean();
this.name.readFields(in);
int numCols = in.readInt();
families.clear();