You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/05 21:16:13 UTC
svn commit: r1369645 [2/3] - in /hbase/branches/0.89-fb: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/executor/
src/main/java/org/apache/hadoop/hbase/master/ src/main/java...
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Sun Aug 5 19:16:11 2012
@@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.util.Envi
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.LegacyRootZNodeUpdater;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.io.Text;
@@ -68,8 +69,8 @@ import org.apache.hadoop.io.Text;
public class RegionManager {
protected static final Log LOG = LogFactory.getLog(RegionManager.class);
- private AtomicReference<HServerAddress> rootRegionLocation =
- new AtomicReference<HServerAddress>(null);
+ private AtomicReference<HServerInfo> rootRegionLocation =
+ new AtomicReference<HServerInfo>(null);
private final RootScanner rootScannerThread;
final MetaScanner metaScannerThread;
@@ -81,6 +82,9 @@ public class RegionManager {
private final NavigableMap<byte [], MetaRegion> onlineMetaRegions =
new ConcurrentSkipListMap<byte [], MetaRegion>(Bytes.BYTES_COMPARATOR);
+ private final NavigableMap<byte [], MetaRegion> metaRegionLocationsBeforeScan =
+ new TreeMap<byte [], MetaRegion>(Bytes.BYTES_COMPARATOR);
+
private static final byte[] OVERLOADED = Bytes.toBytes("Overloaded");
private static final byte [] META_REGION_PREFIX = Bytes.toBytes(".META.,");
@@ -90,9 +94,9 @@ public class RegionManager {
/**
* Map key -> tableName, value -> ThrottledRegionReopener
* An entry is created in the map before an alter operation is performed on the
- * table. It is cleared when all the regions have reopened.
+ * table. It is cleared when all the regions have reopened.
*/
- private final Map<String, ThrottledRegionReopener> tablesReopeningRegions =
+ private final Map<String, ThrottledRegionReopener> tablesReopeningRegions =
new ConcurrentHashMap<String, ThrottledRegionReopener>();
/**
* Map of region name to RegionState for regions that are in transition such as
@@ -109,7 +113,7 @@ public class RegionManager {
*/
final SortedMap<String, RegionState> regionsInTransition =
Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
-
+
// regions in transition are also recorded in ZK using the zk wrapper
final ZooKeeperWrapper zkWrapper;
@@ -162,13 +166,17 @@ public class RegionManager {
private final int zooKeeperNumRetries;
private final int zooKeeperPause;
- /**
- * Set of region servers which send heart beat in the first period of time
+ /**
+ * Set of region servers which send heart beat in the first period of time
* during the master boots. Hold the best locality regions for these
* region servers.
*/
private Set<String> quickStartRegionServerSet = new HashSet<String>();
+ private boolean stoppedScanners = false;
+
+ private LegacyRootZNodeUpdater legacyRootZNodeUpdater;
+
RegionManager(HMaster master) throws IOException {
Configuration conf = master.getConfiguration();
@@ -195,7 +203,8 @@ public class RegionManager {
zooKeeperPause = conf.getInt(HConstants.ZOOKEEPER_PAUSE,
HConstants.DEFAULT_ZOOKEEPER_PAUSE);
- reassignRootRegion();
+ legacyRootZNodeUpdater = new LegacyRootZNodeUpdater(zkWrapper, master,
+ rootRegionLocation);
}
public LoadBalancer getLoadBalancer() {
@@ -208,6 +217,7 @@ public class RegionManager {
"RegionManager.rootScanner");
Threads.setDaemonThreadRunning(metaScannerThread,
"RegionManager.metaScanner");
+ Threads.setDaemonThreadRunning(legacyRootZNodeUpdater, null);
}
public AssignmentManager getAssignmentManager() {
@@ -244,11 +254,11 @@ public class RegionManager {
}
}
- /*
+ /**
* Assigns regions to region servers attempting to balance the load across all
* region servers. Note that no synchronization is necessary as the caller
* (ServerManager.processMsgs) already owns the monitor for the RegionManager.
- *
+ *
* @param info
* @param mostLoadedRegions
* @param returnMsgs
@@ -260,7 +270,7 @@ public class RegionManager {
// be assigned when the region server reports next
return;
}
-
+
if (this.master.shouldAssignRegionsWithFavoredNodes()) {
// assign regions with favored nodes
assignRegionsWithFavoredNodes(info, mostLoadedRegions, returnMsgs);
@@ -269,7 +279,7 @@ public class RegionManager {
assignRegionsWithoutFavoredNodes(info, mostLoadedRegions, returnMsgs);
}
}
-
+
private void assignRegionsWithFavoredNodes(HServerInfo regionServer,
HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
// get the regions that are waiting for assignment for that region server
@@ -285,26 +295,38 @@ public class RegionManager {
}
}
+ /**
+ * @return true if there is a single regionserver online, or if there is any other reason to
+ * remove restrictions on assigning .META./-ROOT- to the same regionserver (e.g. if there
+ * are blacklisted regionservers during testing).
+ */
+ private boolean isSingleRegionServer() {
+ // If there are blacklisted servers (unit tests only), treat the situation as if there is
+ // just a single host, otherwise we might keep trying to assign regions to blacklisted
+ // regionservers.
+ return master.numServers() == 1 || master.getServerManager().hasBlacklistedServersInTest();
+ }
+
private void assignRegionsWithoutFavoredNodes(HServerInfo info,
HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
// the region may assigned to this region server
Set<RegionState> regionsToAssign = null;
HServerLoad thisServersLoad = info.getLoad();
- boolean isSingleServer = this.master.numServers() == 1;
+ boolean isSingleServer = isSingleRegionServer();
boolean holdRegionForBestRegionServer = false;
boolean assignmentByLocality = false;
-
- // only check assignmentByLocality when the
+
+ // only check assignmentByLocality when the
// PreferredRegionToRegionServerMapping is not null;
if (this.master.getPreferredRegionToRegionServerMapping() != null) {
long masterRunningTime = System.currentTimeMillis()
- - this.master.getMasterStartupTime();
- holdRegionForBestRegionServer =
+ - this.master.getMasterStartupTime();
+ holdRegionForBestRegionServer =
masterRunningTime < this.master.getHoldRegionForBestLocalityPeriod();
- assignmentByLocality =
+ assignmentByLocality =
masterRunningTime < this.master.getApplyPreferredAssignmentPeriod();
-
+
// once it has passed the ApplyPreferredAssignmentPeriod, clear up
// the quickStartRegionServerSet and PreferredRegionToRegionServerMapping
// and it won't check the assignmentByLocality anymore.
@@ -313,7 +335,7 @@ public class RegionManager {
this.master.clearPreferredRegionToRegionServerMapping();
}
}
-
+
if (assignmentByLocality) {
// have to add . at the end of host name
String hostName = info.getHostname();
@@ -331,7 +353,7 @@ public class RegionManager {
isSingleServer, preferredAssignment, assignmentByLocality,
holdRegionForBestRegionServer,
quickStartRegionServerSet);
-
+
if (regionsToAssign.isEmpty()) {
// There are no regions waiting to be assigned.
if (!assignmentByLocality
@@ -340,7 +362,7 @@ public class RegionManager {
this.loadBalancer.loadBalancing(info, mostLoadedRegions, returnMsgs);
}
} else {
- // if there's only one server or assign the region by locality,
+ // if there's only one server or assign the region by locality,
// just give the regions to this server
if (isSingleServer || assignmentByLocality
|| preferredAssignment.booleanValue()) {
@@ -356,11 +378,11 @@ public class RegionManager {
/*
* Make region assignments taking into account multiple servers' loads.
- *
+ *
* Note that no synchronization is needed while we iterate over
* regionsInTransition because this method is only called by assignRegions
* whose caller owns the monitor for RegionManager
- *
+ *
* TODO: This code is unintelligible. REWRITE. Add TESTS! St.Ack 09/30/2009
* @param thisServersLoad
* @param regionsToAssign
@@ -376,7 +398,7 @@ public class RegionManager {
isMetaAssign = true;
}
int nRegionsToAssign = regionsToAssign.size();
- int otherServersRegionsCount =
+ int otherServersRegionsCount =
regionsToGiveOtherServers(nRegionsToAssign, thisServersLoad);
nRegionsToAssign -= otherServersRegionsCount;
if (nRegionsToAssign > 0 || isMetaAssign) {
@@ -390,8 +412,8 @@ public class RegionManager {
int nservers = computeNextHeaviestLoad(thisServersLoad, heavierLoad);
int nregions = 0;
// Advance past any less-loaded servers
- for (HServerLoad load = new HServerLoad(thisServersLoad);
- load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
+ for (HServerLoad load = new HServerLoad(thisServersLoad);
+ load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
// continue;
}
@@ -444,7 +466,7 @@ public class RegionManager {
* Note that no synchronization is needed on regionsInTransition while
* iterating on it because the only caller is assignRegions whose caller owns
* the monitor for RegionManager
- *
+ *
* @param regionsToAssign
* @param serverName
* @param returnMsgs
@@ -501,6 +523,11 @@ public class RegionManager {
*/
private int regionsToGiveOtherServers(final int numUnassignedRegions,
final HServerLoad thisServersLoad) {
+ if (master.getServerManager().hasBlacklistedServersInTest()) {
+ // For unit testing. Otherwise, we will always think we should give regions to blacklisted
+ // servers, but will not actually assign any.
+ return 0;
+ }
SortedMap<HServerLoad, Collection<String>> lightServers =
master.getServerManager().getServersToLoad().getLightServers(thisServersLoad);
// Examine the list of servers that are more lightly loaded than this one.
@@ -533,13 +560,13 @@ public class RegionManager {
private Set<RegionState> regionsAwaitingAssignment(HServerInfo server) {
// set of regions we want to assign to this server
Set<RegionState> regionsToAssign = new HashSet<RegionState>();
- boolean isSingleServer = this.master.numServers() == 1;
+ boolean isSingleServer = isSingleRegionServer();
HServerAddress addr = server.getServerAddress();
boolean isMetaServer = isMetaServer(addr);
RegionState rootState = null;
boolean isPreferredAssignment = false;
boolean reassigningMetas =
- (numberOfMetaRegions.get() != onlineMetaRegions.size());
+ (numberOfMetaRegions.get() > onlineMetaRegions.size());
boolean isMetaOrRoot = isMetaServer || isRootServer(addr);
// Assign ROOT region if ROOT region is offline.
@@ -567,7 +594,7 @@ public class RegionManager {
// for the current region server
Set<HRegionInfo> preservedRegionsForCurrentRS =
assignmentManager.getTransientAssignments(addr);
-
+
synchronized (this.regionsInTransition) {
int nonPreferredAssignment = 0;
for (RegionState regionState : regionsInTransition.values()) {
@@ -598,8 +625,7 @@ public class RegionManager {
// Can't assign user regions until all meta regions have been assigned,
// the initial meta scan is done and there are enough online
// region servers
- if (reassigningMetas || !this.isInitialMetaScanComplete() ||
- !master.getServerManager().hasEnoughRegionServers()) {
+ if (reassigningMetas || !master.getServerManager().hasEnoughRegionServers()) {
LOG.debug("Cannot assign region " + regionInfo.getRegionNameAsString()
+ " because not all the META are online, "
+ "or the initial META scan is not completed, or there are no "
@@ -611,12 +637,12 @@ public class RegionManager {
if (!regionState.isUnassigned()) {
continue;
}
-
- if (preservedRegionsForCurrentRS == null ||
+
+ if (preservedRegionsForCurrentRS == null ||
!preservedRegionsForCurrentRS.contains(regionInfo)) {
- if (assignmentManager.hasTransientAssignment(regionInfo) ||
+ if (assignmentManager.hasTransientAssignment(regionInfo) ||
nonPreferredAssignment > this.maxAssignInOneGo) {
- // Hold the region for its favored nodes and limit the number of
+ // Hold the region for its favored nodes and limit the number of
// non preferred assignments for each region server.
continue;
}
@@ -626,22 +652,22 @@ public class RegionManager {
} else {
isPreferredAssignment = true;
}
-
+
// Assign the current region to the region server.
regionsToAssign.add(regionState);
LOG.debug("Going to assign user region " +
regionInfo.getRegionNameAsString() +
" to server " + server.getHostnamePort() + " in a " +
(isPreferredAssignment ? "": "non-") + "preferred way");
-
+
}
}
return regionsToAssign;
}
-
+
/**
* Get the set of regions that should be assignable in this pass.
- *
+ *
* Note that no synchronization on regionsInTransition is needed because the
* only caller (assignRegions, whose caller is ServerManager.processMsgs) owns
* the monitor for RegionManager
@@ -650,7 +676,6 @@ public class RegionManager {
boolean isSingleServer, MutableBoolean isPreferredAssignment,
boolean assignmentByLocality, boolean holdRegionForBestRegionserver,
Set<String> quickStartRegionServerSet) {
-
// set of regions we want to assign to this server
Set<RegionState> regionsToAssign = new HashSet<RegionState>();
@@ -735,13 +760,13 @@ public class RegionManager {
+ " is in transition but not enough servers yet");
continue;
}
-
+
// if we are holding it, don't give it away to any other server
if (assignmentManager.hasTransientAssignment(s.getRegionInfo())) {
continue;
}
if (assignmentByLocality && !i.isRootRegion() && !i.isMetaRegion()) {
- Text preferredHostNameTxt =
+ Text preferredHostNameTxt =
(Text)this.master.getPreferredRegionToRegionServerMapping().get(new Text(name));
if (hostName == null) {
@@ -750,18 +775,18 @@ public class RegionManager {
if (preferredHostNameTxt != null) {
String preferredHost = preferredHostNameTxt.toString();
if (hostName.startsWith(preferredHost)) {
- LOG.debug("Doing Preferred Region Assignment for : " + name +
+ LOG.debug("Doing Preferred Region Assignment for : " + name +
" to the " + hostName);
// add the region to its preferred region server.
regionsToAssign.add(s);
continue;
- } else if (holdRegionForBestRegionserver ||
+ } else if (holdRegionForBestRegionserver ||
quickStartRegionServerSet.contains(preferredHost)) {
continue;
}
}
}
- // Only assign a configured number unassigned region at one time in the
+ // Only assign a configured number unassigned region at one time in the
// non preferred assignment case.
if ((nonPreferredAssignmentCount++) < this.maxAssignInOneGo) {
regionsToAssign.add(s);
@@ -909,12 +934,15 @@ public class RegionManager {
}
/**
- * Stop the root and meta scanners so that the region servers serving meta
- * regions can shut down.
+ * Stop the root and meta scanners so that the region servers serving meta regions can shut down.
+ * Not thread-safe, but if called twice from the same thread, scanners will only be stopped once.
*/
public void stopScanners() {
- this.rootScannerThread.interruptAndStop();
- this.metaScannerThread.interruptAndStop();
+ if (!stoppedScanners) {
+ this.rootScannerThread.interruptAndStop();
+ this.metaScannerThread.interruptAndStop();
+ stoppedScanners = true;
+ }
}
/**
@@ -944,7 +972,7 @@ public class RegionManager {
public boolean areAllMetaRegionsOnline() {
synchronized (onlineMetaRegions) {
return (rootRegionLocation.get() != null &&
- numberOfMetaRegions.get() == onlineMetaRegions.size());
+ numberOfMetaRegions.get() <= onlineMetaRegions.size());
}
}
@@ -956,17 +984,21 @@ public class RegionManager {
*/
public MetaRegion getFirstMetaRegionForRegion(HRegionInfo newRegion) {
synchronized (onlineMetaRegions) {
- if (onlineMetaRegions.size() == 0) {
- return null;
- } else if (onlineMetaRegions.size() == 1) {
- return onlineMetaRegions.get(onlineMetaRegions.firstKey());
- } else {
- if (onlineMetaRegions.containsKey(newRegion.getRegionName())) {
- return onlineMetaRegions.get(newRegion.getRegionName());
- }
- return onlineMetaRegions.get(onlineMetaRegions.headMap(
- newRegion.getRegionName()).lastKey());
+ return getMetaRegionPointingTo(onlineMetaRegions, newRegion);
+ }
+ }
+
+ static MetaRegion getMetaRegionPointingTo(NavigableMap<byte[], MetaRegion> metaRegions,
+ HRegionInfo newRegion) {
+ if (metaRegions.isEmpty()) {
+ return null;
+ } else if (metaRegions.size() == 1) {
+ return metaRegions.get(metaRegions.firstKey());
+ } else {
+ if (metaRegions.containsKey(newRegion.getRegionName())) {
+ return metaRegions.get(newRegion.getRegionName());
}
+ return metaRegions.get(metaRegions.headMap(newRegion.getRegionName()).lastKey());
}
}
@@ -985,7 +1017,7 @@ public class RegionManager {
throw new NotAllMetaRegionsOnlineException(
Bytes.toString(HConstants.ROOT_TABLE_NAME));
}
- metaRegions.add(new MetaRegion(rootRegionLocation.get(),
+ metaRegions.add(new MetaRegion(rootRegionLocation.get().getServerAddress(),
HRegionInfo.ROOT_REGIONINFO));
} else {
if (!areAllMetaRegionsOnline()) {
@@ -1041,7 +1073,7 @@ public class RegionManager {
throws IOException {
createRegion(newRegion, server, metaRegionName, null);
}
-
+
/**
* Create a new HRegion, put a row for it into META (or ROOT), and mark the
* new region unassigned so that it will get assigned to a region server.
@@ -1068,7 +1100,7 @@ public class RegionManager {
// 3.1 Put the region info into meta table.
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
Writables.getBytes(info));
-
+
// 3.2 Put the favorite nodes into meta.
if (favoriteNodeList != null) {
String favoredNodes = RegionPlacement.getFavoredNodes(favoriteNodeList);
@@ -1083,10 +1115,10 @@ public class RegionManager {
// 4. Close the new region to flush it to disk. Close its log file too.
region.close();
region.getLog().closeAndDelete();
-
+
// After all regions are created, the caller will schedule
// the meta scanner to run immediately and assign out the
- // regions.
+ // regions.
}
/**
@@ -1309,11 +1341,15 @@ public class RegionManager {
}
/**
- * Set a region to unassigned
+ * Set a region to unassigned. Always writes the region's unassigned znode.
* @param info Region to set unassigned
* @param force if true mark region unassigned whatever its current state
*/
- public void setUnassigned(HRegionInfo info, boolean force) {
+ void setUnassigned(HRegionInfo info, boolean force) {
+ setUnassignedGeneral(true, info, force);
+ }
+
+ void setUnassignedGeneral(boolean writeToZK, HRegionInfo info, boolean force) {
RegionState s = null;
long t0, t1, t2, t3;
t0 = System.currentTimeMillis();
@@ -1329,9 +1365,11 @@ public class RegionManager {
// should never happen
LOG.error("Error creating event data for " + HBaseEventType.M2ZK_REGION_OFFLINE, e);
}
- zkWrapper.createOrUpdateUnassignedRegion(info.getEncodedName(), data);
- LOG.debug("Created/updated UNASSIGNED zNode " + info.getRegionNameAsString() +
- " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+ if (writeToZK) {
+ zkWrapper.createOrUpdateUnassignedRegion(info.getEncodedName(), data);
+ LOG.debug("Created/updated UNASSIGNED zNode " + info.getRegionNameAsString() +
+ " in state " + HBaseEventType.M2ZK_REGION_OFFLINE);
+ }
s = new RegionState(info, RegionState.State.UNASSIGNED);
regionsInTransition.put(info.getRegionNameAsString(), s);
}
@@ -1444,7 +1482,7 @@ public class RegionManager {
// marked offline so that it opens on the preferred server.
this.assignmentManager.executeAssignmentPlan(regionInfo);
}
- }
+ }
}
/**
@@ -1492,35 +1530,17 @@ public class RegionManager {
}
}
}
- /**
- * Add a meta region to the scan queue
- * @param m MetaRegion that needs to get scanned
- */
- public void addMetaRegionToScan(MetaRegion m) {
- metaScannerThread.addMetaRegionToScan(m);
- }
-
- /**
- * Check if the initial root scan has been completed.
- * @return true if scan completed, false otherwise
- */
- public boolean isInitialRootScanComplete() {
- return rootScannerThread.isInitialScanComplete();
- }
-
- /**
- * Check if the initial meta scan has been completed.
- * @return true if meta completed, false otherwise
- */
- public boolean isInitialMetaScanComplete() {
- return metaScannerThread.isInitialScanComplete();
- }
/**
* Get the root region location.
* @return HServerAddress describing root region server.
*/
public HServerAddress getRootRegionLocation() {
+ return HServerInfo.getAddress(rootRegionLocation.get());
+ }
+
+ /** Returns root region location as a server info object (with a start code) */
+ public HServerInfo getRootServerInfo() {
return rootRegionLocation.get();
}
@@ -1538,7 +1558,8 @@ public class RegionManager {
// Cycle rather than hold here in case master is closed meantime.
rootRegionLocation.wait(this.master.getThreadWakeFrequency());
} catch (InterruptedException e) {
- // continue
+ LOG.error("Interrupted when waiting for root region location");
+ continue;
}
}
}
@@ -1575,9 +1596,9 @@ public class RegionManager {
}
}
- private void writeRootRegionLocationToZooKeeper(HServerAddress address) {
+ private void writeRootRegionLocationToZooKeeper(HServerInfo hsi) {
for (int attempt = 0; attempt < zooKeeperNumRetries; ++attempt) {
- if (master.getZooKeeperWrapper().writeRootRegionLocation(address)) {
+ if (master.getZooKeeperWrapper().writeRootRegionLocation(hsi)) {
return;
}
@@ -1585,21 +1606,21 @@ public class RegionManager {
}
LOG.error("Failed to write root region location to ZooKeeper after " +
- zooKeeperNumRetries + " retries, shutting down");
+ zooKeeperNumRetries + " retries, shutting down the cluster");
- this.master.shutdown();
+ this.master.requestClusterShutdown();
}
/**
* Set the root region location.
* @param address Address of the region server where the root lives
*/
- public void setRootRegionLocation(HServerAddress address) {
- writeRootRegionLocationToZooKeeper(address);
+ public void setRootRegionLocation(HServerInfo hsi) {
+ writeRootRegionLocationToZooKeeper(hsi);
synchronized (rootRegionLocation) {
// the root region has been assigned, remove it from transition in ZK
zkWrapper.deleteUnassignedRegion(HRegionInfo.ROOT_REGIONINFO.getEncodedName());
- rootRegionLocation.set(new HServerAddress(address));
+ rootRegionLocation.set(new HServerInfo(hsi));
rootRegionLocation.notifyAll();
}
}
@@ -1996,7 +2017,7 @@ public class RegionManager {
// Only move the region if the other server is under-loaded and the
// current server is overloaded.
- if (serverLoad - regionsUnassigned > avgLoadPlusSlop &&
+ if (serverLoad - regionsUnassigned > avgLoadPlusSlop &&
otherLoad.getNumberOfRegions() < avgLoadMinusSlop) {
if (unassignRegion(info, region, returnMsgs)) {
// Need to override transient assignment that may have been added
@@ -2032,7 +2053,7 @@ public class RegionManager {
HServerInfo other =
master.getServerManager().getHServerInfo(server);
if (other == null ||
- master.getServerManager().isDead(other.getServerName())) {
+ master.getServerManager().isDeadProcessingPending(other.getServerName())) {
return null;
}
return master.getServerManager().getServersToLoad()
@@ -2085,7 +2106,7 @@ public class RegionManager {
DefaultLoadBalancer() {
super();
}
-
+
/**
* Balance server load by unassigning some regions.
*
@@ -2167,10 +2188,10 @@ public class RegionManager {
int lowSrvCount = serverLoadMap.numServersByLoad(lowestServerLoad);
int numSrvRegs = srvLoad.getNumberOfRegions();
int numMoveToLowLoaded = (avgLoadMinusSlop - lowestLoad) * lowSrvCount;
-
- int numRegionsToClose = numSrvRegs - (int)Math.floor(avgLoad);
+
+ int numRegionsToClose = numSrvRegs - (int)Math.floor(avgLoad);
numRegionsToClose = Math.min(numRegionsToClose, numMoveToLowLoaded);
-
+
if (LOG.isDebugEnabled()) {
LOG.debug("Server(s) are carrying only " + lowestLoad + " regions. " +
"Server " + srvName + " is most loaded (" + numSrvRegs +
@@ -2393,7 +2414,7 @@ public class RegionManager {
/**
* Method used to do housekeeping for holding regions for a RegionServer going
* down for a restart
- *
+ *
* @param regionServer
* the RegionServer going down for a restart
* @param regions
@@ -2401,7 +2422,7 @@ public class RegionManager {
*/
public void addRegionServerForRestart(final HServerInfo regionServer,
Set<HRegionInfo> regions) {
- LOG.debug("Holding regions of restartng server: " +
+ LOG.debug("Holding regions of restartng server: " +
regionServer.getServerName());
HServerAddress addr = regionServer.getServerAddress();
for (HRegionInfo region : regions) {
@@ -2410,7 +2431,7 @@ public class RegionManager {
}
/**
- * Create a reopener for this table, if one exists, return the existing throttler.
+ * Create a reopener for this table, if one exists, return the existing throttler.
* @param tableName
* @return
*/
@@ -2418,10 +2439,10 @@ public class RegionManager {
if (!tablesReopeningRegions.containsKey(tableName)) {
ThrottledRegionReopener throttledReopener = new ThrottledRegionReopener(tableName, this.master, this);
tablesReopeningRegions.put(tableName, throttledReopener);
- }
+ }
return tablesReopeningRegions.get(tableName);
}
-
+
/**
* Return the throttler for this table
* @param tableName
@@ -2430,7 +2451,7 @@ public class RegionManager {
public ThrottledRegionReopener getThrottledReopener(String tableName) {
return tablesReopeningRegions.get(tableName);
}
-
+
/**
* Delete the throttler when the operation is complete
* @param tableName
@@ -2444,10 +2465,10 @@ public class RegionManager {
LOG.debug("Tried to delete a throttled reopener, but it does not exist.");
}
}
-
+
/**
- * When the region is opened, check if it is reopening and notify the throttler
- * for further processing.
+ * When the region is opened, check if it is reopening and notify the throttler
+ * for further processing.
* @param region
*/
public void notifyRegionReopened(HRegionInfo region) {
@@ -2456,4 +2477,78 @@ public class RegionManager {
tablesReopeningRegions.get(tableName).notifyRegionOpened(region);
}
}
+
+ MetaScanner getMetaScanner() {
+ return metaScannerThread;
+ }
+
+ /**
+ * Composes a map of .META. region locations for both online .META. regions and regions that
+ * we know are assigned to regionservers, but have not been scanned yet. This is used on master
+ * startup to write pending region location changes from the ZK unassigned directory to .META.
+ */
+ NavigableMap<byte[], MetaRegion> getAllMetaRegionLocations() {
+ NavigableMap<byte[], MetaRegion> m =
+ new TreeMap<byte[], MetaRegion>(Bytes.BYTES_COMPARATOR);
+ m.putAll(metaRegionLocationsBeforeScan);
+ m.putAll(onlineMetaRegions);
+ return m;
+ }
+
+ /**
+ * Modifies region state in regionsInTransition based on the initial scan of the ZK unassigned
+ * directory.
+ * @param event event type written by the regionserver to the znode
+ * @param regionInfo region info
+ * @param serverName regionserver name
+ */
+ void setRegionStateOnRecovery(HBaseEventType event, HRegionInfo regionInfo, String serverName) {
+ String regionName = regionInfo.getRegionNameAsString();
+ String stateStr = null;
+ if (event == HBaseEventType.RS2ZK_REGION_CLOSING ||
+ event == HBaseEventType.RS2ZK_REGION_CLOSED) {
+ synchronized (regionsInTransition) {
+ RegionState s = regionsInTransition.get(regionName);
+ if (s == null) {
+ s = new RegionState(regionInfo, RegionState.State.PENDING_CLOSE);
+ regionsInTransition.put(regionName, s);
+ } else {
+ s.setClosing(serverName, s.isOfflined());
+ s.setPendingClose();
+ }
+ stateStr = s.toString();
+ }
+ }
+
+ if (event == HBaseEventType.RS2ZK_REGION_OPENED ||
+ event == HBaseEventType.RS2ZK_REGION_OPENING) {
+ synchronized (regionsInTransition) {
+ RegionState s = regionsInTransition.get(regionName);
+ if (s == null) {
+ s = new RegionState(regionInfo, RegionState.State.PENDING_OPEN);
+ regionsInTransition.put(regionName, s);
+ } else {
+ s.setUnassigned();
+ s.setPendingOpen(serverName);
+ }
+ stateStr = s.toString();
+ }
+ }
+
+ if (stateStr != null) {
+ LOG.info("Set state in regionsInTransition: " + stateStr);
+ }
+ }
+
+ /** Recovers root region location from ZK. Should only be called on master startup. */
+ void recoverRootRegionLocationFromZK() {
+ HServerInfo rootLocationInZK = zkWrapper.readRootRegionServerInfo();
+ if (rootLocationInZK != null) {
+ synchronized (rootRegionLocation) {
+ rootRegionLocation.set(rootLocationInZK);
+ rootRegionLocation.notifyAll();
+ }
+ }
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionServerOperation.java Sun Aug 5 19:16:11 2012
@@ -81,23 +81,16 @@ abstract class RegionServerOperation imp
- o.getDelay(TimeUnit.MILLISECONDS)).intValue();
}
- private long whenToExpire() {
- return System.currentTimeMillis() + this.delay;
- }
-
protected boolean rootAvailable() {
- boolean available = true;
- if (this.master.getRegionManager().getRootRegionLocation() == null) {
- available = false;
- }
- return available;
+ return master.getRegionManager().getRootRegionLocation() != null;
}
protected boolean metaTableAvailable() {
boolean available = true;
- if ((master.getRegionManager().numMetaRegions() !=
- master.getRegionManager().numOnlineMetaRegions()) ||
- master.getRegionManager().metaRegionsInTransition()) {
+ int numMetaRegions = master.getRegionManager().numMetaRegions();
+ if (numMetaRegions == 0 ||
+ numMetaRegions > master.getRegionManager().numOnlineMetaRegions() ||
+ master.getRegionManager().metaRegionsInTransition()) {
// We can't proceed because not all of the meta regions are online.
// We can't block either because that would prevent the meta region
// online message from being processed. In order to prevent spinning
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RootScanner.java Sun Aug 5 19:16:11 2012
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.master;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import java.io.IOException;
@@ -36,7 +37,6 @@ class RootScanner extends BaseScanner {
/**
* Don't retry if we get an error while scanning. Errors are most often
- *
* caused by the server going away. Wait until next rescan interval when
* things should be back to normal.
* @return True if successfully scanned.
@@ -50,9 +50,9 @@ class RootScanner extends BaseScanner {
try {
// Don't interrupt us while we're working
synchronized(scannerLock) {
- if (master.getRegionManager().getRootRegionLocation() != null) {
- scanRegion(new MetaRegion(master.getRegionManager().getRootRegionLocation(),
- HRegionInfo.ROOT_REGIONINFO));
+ HServerAddress rootRegionLocation = master.getRegionManager().getRootRegionLocation();
+ if (rootRegionLocation != null) {
+ scanRegion(new MetaRegion(rootRegionLocation, HRegionInfo.ROOT_REGIONINFO));
}
}
} catch (IOException e) {
@@ -76,12 +76,6 @@ class RootScanner extends BaseScanner {
}
@Override
- protected boolean initialScan() {
- this.initialScanComplete = scanRoot();
- return initialScanComplete;
- }
-
- @Override
protected void maintenanceScan() {
scanRoot();
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Sun Aug 5 19:16:11 2012
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -115,6 +114,13 @@ public class ServerManager {
private final OldLogsCleaner oldLogCleaner;
/**
+ * A lock that controls simultaneous changes and lookup to the dead server set and the server to
+ * server info map. Required so that we don't reassign the same region both in expireServer
+ * and in the base scanner.
+ */
+ final Object deadServerStatusLock = new Object();
+
+ /**
* A set of host:port pairs representing regionservers that are blacklisted
* from region assignment. Used for unit tests only. Please do not use this
* for production, because in a real situation a blacklisted server might
@@ -247,7 +253,7 @@ public class ServerManager {
*/
private void checkIsDead(final String serverName, final String what)
throws YouAreDeadException {
- if (!isDead(serverName)) return;
+ if (!isDeadProcessingPending(serverName)) return;
String message = "Server " + what + " rejected; currently processing " +
serverName + " as dead server";
LOG.debug(message);
@@ -300,6 +306,7 @@ public class ServerManager {
// Could not set a watch, undo the above changes and re-throw.
serversToLoad.updateServerLoad(serverName, oldServerLoad);
undoMapUpdate(serversToServerInfo, serverName, oldServerInfo);
+ LOG.error("Could not set watch on regionserver znode for " + serverName);
throw ex;
}
}
@@ -577,20 +584,18 @@ public class ServerManager {
// currently opening regions, leave it alone till all are open.
if (openingCount < this.nobalancingCount) {
if (!blacklistedRSHostPortSetForTest.contains(
- serverInfo.getHostnamePort())) {
+ serverInfo.getHostnamePort()) || serversToServerInfo.size() <= 1) {
// Production code path.
master.getRegionManager().assignRegions(serverInfo,
mostLoadedRegions, returnMsgs);
- } else if (mostLoadedRegions.length > 0) {
+ } else {
// UNIT TESTS ONLY.
// We just don't assign anything to "blacklisted" regionservers as
// required by a unit test (for determinism). This is OK because
// another regionserver will get these regions in response to a
// heartbeat.
- LOG.debug("[UNIT TEST ONLY] Not assigning regions "
- + Arrays.toString(mostLoadedRegions) + " to regionserver "
- + serverInfo.getHostnamePort()
- + " because it is blacklisted.");
+ LOG.debug("[UNIT TEST ONLY] Not assigning regions to blacklisted regionserver "
+ + serverInfo.getHostnamePort());
}
}
@@ -621,8 +626,13 @@ public class ServerManager {
ArrayList<HMsg> msgsForServer = pendingMsgsToSvrsMap.get(serverInfo);
if (msgsForServer == null) {
- msgsForServer = pendingMsgsToSvrsMap.putIfAbsent(serverInfo,
- new ArrayList<HMsg>());
+ msgsForServer = new ArrayList<HMsg>();
+ ArrayList<HMsg> newMsgsForServer =
+ pendingMsgsToSvrsMap.putIfAbsent(serverInfo, msgsForServer);
+ if (newMsgsForServer != null) {
+ // There is already a list of messages for this server, use it.
+ msgsForServer = newMsgsForServer;
+ }
}
synchronized(msgsForServer) {
@@ -708,13 +718,14 @@ public class ServerManager {
public void processRegionOpen(HServerInfo serverInfo,
HRegionInfo region, ArrayList<HMsg> returnMsgs) {
boolean duplicateAssignment = false;
- synchronized (master.getRegionManager()) {
- if (!this.master.getRegionManager().isUnassigned(region) &&
- !this.master.getRegionManager().isPendingOpen(region.getRegionNameAsString())) {
+ RegionManager regionManager = master.getRegionManager();
+ synchronized (regionManager) {
+ if (!regionManager.isUnassigned(region) &&
+ !regionManager.isPendingOpen(region.getRegionNameAsString())) {
if (region.isRootRegion()) {
// Root region
HServerAddress rootServer =
- this.master.getRegionManager().getRootRegionLocation();
+ regionManager.getRootRegionLocation();
if (rootServer != null) {
if (rootServer.compareTo(serverInfo.getServerAddress()) == 0) {
// A duplicate open report from the correct server
@@ -728,7 +739,7 @@ public class ServerManager {
// Not root region. If it is not a pending region, then we are
// going to treat it as a duplicate assignment, although we can't
// tell for certain that's the case.
- if (this.master.getRegionManager().isPendingOpen(
+ if (regionManager.isPendingOpen(
region.getRegionNameAsString())) {
// A duplicate report from the correct server
return;
@@ -751,25 +762,25 @@ public class ServerManager {
if (region.isRootRegion()) {
// it was assigned, and it's not a duplicate assignment, so take it out
// of the unassigned list.
- this.master.getRegionManager().removeRegion(region);
+ regionManager.removeRegion(region);
// Store the Root Region location (in memory)
HServerAddress rootServer = serverInfo.getServerAddress();
this.master.getServerConnection().setRootRegionLocation(
new HRegionLocation(region, rootServer));
- this.master.getRegionManager().setRootRegionLocation(rootServer);
+ regionManager.setRootRegionLocation(serverInfo);
// Increase the region opened counter
this.master.getMetrics().incRegionsOpened();
} else {
// Note that the table has been assigned and is waiting for the
// meta table to be updated.
- this.master.getRegionManager().setOpen(region.getRegionNameAsString());
+ regionManager.setOpen(region.getRegionNameAsString());
RegionServerOperation op =
- new ProcessRegionOpen(master, serverInfo, region);
- this.master.getRegionServerOperationQueue().put(op);
+ new ProcessRegionOpen(master, serverInfo, region);
+ master.getRegionServerOperationQueue().put(op);
}
}
- this.master.getRegionManager().notifyRegionReopened(region);
+ regionManager.notifyRegionReopened(region);
}
}
@@ -980,9 +991,11 @@ public class ServerManager {
LOG.warn("Already processing shutdown of " + serverName);
return;
}
- // Remove the server from the known servers lists and update load info
- this.serversToServerInfo.remove(serverName);
- serversToLoad.removeServerLoad(serverName);
+ synchronized (deadServerStatusLock) {
+ // Remove the server from the known servers lists and update load info
+ this.serversToServerInfo.remove(serverName);
+ serversToLoad.removeServerLoad(serverName);
+ }
// Add to dead servers and queue a shutdown processing.
LOG.debug("Added=" + serverName +
" to dead servers, added shutdown processing operation");
@@ -1006,7 +1019,7 @@ public class ServerManager {
* @param serverName
* @return true if server is dead
*/
- public boolean isDead(final String serverName) {
+ public boolean isDeadProcessingPending(final String serverName) {
return isDead(serverName, false);
}
@@ -1099,7 +1112,7 @@ public class ServerManager {
public void run() {
try {
while (true) {
- boolean waitingForMoreServersInRackToTimeOut =
+ boolean waitingForMoreServersInRackToTimeOut =
expireTimedOutServers(timeout, maxServersToExpirePerRack);
if (waitingForMoreServersInRackToTimeOut) {
sleep(shortTimeout/2);
@@ -1292,7 +1305,7 @@ public class ServerManager {
continue; //server vanished
}
// re-check - just in case the server reported
- if (curTime > load.expireAfter) {
+ if (curTime > load.expireAfter) { // debug
LOG.info("Expiring server " + si.getServerName() +
" no report for last " + (curTime - load.lastLoadRefreshTime));
this.expireServer(si);
@@ -1302,4 +1315,11 @@ public class ServerManager {
return waitingForMoreServersInRackToTimeOut;
}
+ boolean hasBlacklistedServersInTest() {
+ return !blacklistedRSHostPortSetForTest.isEmpty();
+ }
+
+ public static void clearRSBlacklistInTest() {
+ blacklistedRSHostPortSetForTest.clear();
+ }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKClusterStateRecovery.java Sun Aug 5 19:16:11 2012
@@ -18,16 +18,23 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
/**
@@ -49,11 +56,20 @@ public class ZKClusterStateRecovery {
private Set<String> liveRSNamesAtStartupUnmodifiable;
private final HMaster master;
+ private final RegionManager regionManager;
+ private final ServerManager serverManager;
private final ZooKeeperWrapper zkw;
+ private final String unassignedZNode;
+
+ /** A snapshot of the list of unassigned znodes */
+ private List<String> unassignedNodes;
public ZKClusterStateRecovery(HMaster master, ServerConnection connection) {
this.master = master;
zkw = master.getZooKeeperWrapper();
+ regionManager = master.getRegionManager();
+ serverManager = master.getServerManager();
+ this.unassignedZNode = zkw.getRegionInTransitionZNode();
}
/**
@@ -94,7 +110,7 @@ public class ZKClusterStateRecovery {
}
try {
- master.getServerManager().recordNewServer(serverInfo);
+ serverManager.recordNewServer(serverInfo);
} catch (IOException ex) {
if (ex.getCause() instanceof NoNodeException) {
// This regionserver has disappeared, don't try to register it. This will also ensure
@@ -118,4 +134,200 @@ public class ZKClusterStateRecovery {
return liveRSNamesAtStartupUnmodifiable;
}
+ private HRegionInfo parseUnassignedZNode(String regionName, byte[] nodeData,
+ RegionTransitionEventData hbEventData) throws IOException {
+ String znodePath = zkw.getZNode(unassignedZNode, regionName);
+ if (nodeData == null) {
+ // This znode does not seem to exist anymore.
+ LOG.error("znode for region " + regionName + " disappeared while scanning unassigned " +
+ "directory, skipping");
+ return null;
+ }
+
+ Writables.getWritable(nodeData, hbEventData);
+
+ HMsg msg = hbEventData.getHmsg();
+ if (msg == null) {
+ LOG.warn("HMsg is not present in unassigned znode data, skipping: " + znodePath);
+ return null;
+ }
+
+ HRegionInfo hri = msg.getRegionInfo();
+ if (hri == null) {
+ LOG.warn("Region info read from znode is null, skipping: " + znodePath);
+ return null;
+ }
+
+ if (!hri.getEncodedName().equals(regionName)) {
+ LOG.warn("Region name read from znode data (" + hri.getEncodedName() + ") " +
+ "must be the same as znode name: " + regionName + ". Skipping.");
+ return null;
+ }
+ return hri;
+ }
+
+ /**
+ * Read znode path as part of scanning the unassigned directory.
+ * @param regionName the region name to read the unassigned znode for
+ * @return znode data or null if the znode no longer exists
+ * @throws IOException in case of a ZK error
+ */
+ private byte[] getUnassignedZNodeAndSetWatch(String regionName)
+ throws IOException {
+ final String znodePath = zkw.getZNode(unassignedZNode, regionName);
+ try {
+ return zkw.readUnassignedZNodeAndSetWatch(znodePath);
+ } catch (IOException ex) {
+ if (ex.getCause() instanceof KeeperException) {
+ KeeperException ke = (KeeperException) ex.getCause();
+ if (ke.code() == KeeperException.Code.NONODE) {
+ LOG.warn("Unassigned node is missing: " + znodePath + ", ignoring");
+ return null;
+ }
+ }
+ throw ex;
+ }
+ }
+
+ /**
+ * Goes through the unassigned node directory in ZK.
+ */
+ private void processUnassignedNodes() throws IOException {
+ LOG.info("Processing unassigned znode directory on master startup");
+ for (String unassignedRegion : unassignedNodes) {
+ if (master.isStopped()) {
+ break;
+ }
+
+ final String znodePath = zkw.getZNode(unassignedZNode, unassignedRegion);
+ // Get znode and set watch
+ byte[] nodeData = getUnassignedZNodeAndSetWatch(znodePath);
+ if (nodeData == null) {
+ // The node disappeared.
+ continue;
+ }
+
+ HBaseEventType rsState = HBaseEventType.fromByte(nodeData[0]);
+ RegionTransitionEventData hbEventData = new RegionTransitionEventData();
+ HRegionInfo hri = parseUnassignedZNode(unassignedRegion, nodeData, hbEventData);
+ if (hri == null) {
+ // Could not parse the znode. Error message already logged.
+ continue;
+ }
+
+ LOG.info("Found unassigned znode: state=" + rsState + ", region=" +
+ hri.getRegionNameAsString() + ", rs=" + hbEventData.getRsName());
+ boolean openedOrClosed = rsState == HBaseEventType.RS2ZK_REGION_OPENED ||
+ rsState == HBaseEventType.RS2ZK_REGION_CLOSED;
+
+ if (rsState == HBaseEventType.RS2ZK_REGION_CLOSING ||
+ rsState == HBaseEventType.RS2ZK_REGION_OPENING ||
+ openedOrClosed) {
+ regionManager.setRegionStateOnRecovery(rsState, hri, hbEventData.getRsName());
+ if (openedOrClosed) {
+ master.getUnassignedWatcher().handleRegionStateInZK(znodePath, nodeData, false);
+ }
+ } else if (rsState == HBaseEventType.M2ZK_REGION_OFFLINE) {
+ // Write to ZK = false; override previous state ("force") = true.
+ regionManager.setUnassignedGeneral(false, hri, true);
+ } else {
+ LOG.warn("Invalid unassigned znode state: " + rsState + " for region " + unassignedRegion);
+ }
+ }
+ }
+
+ /**
+ * Ensures that -ROOT- and .META. are assigned and persists region locations from OPENED and
+ * CLOSED nodes in the ZK unassigned directory in respectively -ROOT- (for .META. regions) and
+ * .META. (for user regions).
+ */
+ private void recoverRegionStateFromZK() throws IOException {
+ if (!isStopped()) {
+ regionManager.recoverRootRegionLocationFromZK();
+ }
+
+ if (!isStopped()) {
+ unassignedNodes = master.getUnassignedWatcher().getUnassignedDirSnapshot();
+ }
+
+ if (!isStopped()) {
+ processUnassignedNodes();
+ }
+
+ if (!isStopped()) {
+ master.getUnassignedWatcher().drainZKEventQueue();
+ }
+
+ if (!isStopped()) {
+ ensureRootAssigned();
+ }
+ }
+
+ private void ensureRootAssigned() {
+ HServerInfo rootServerInfo = regionManager.getRootServerInfo();
+ boolean reassignRoot = true;
+ if (rootServerInfo != null) {
+ // Root appears assigned. Check if it is assigned to an unknown server that we are not
+ // processing as dead. In that case we do need to reassign. This logic is similar to
+ // what is done in BaseScanner.checkAssigned.
+ String serverName = rootServerInfo.getServerName();
+ if (regionManager.regionIsInTransition(
+ HRegionInfo.ROOT_REGIONINFO.getRegionNameAsString())) {
+ // Already in transition, we will wait until it is assigned.
+ reassignRoot = false;
+ LOG.info("Not assigning root because it is already in transition");
+ } else {
+ boolean processingRootServerAsDead;
+ HServerInfo rootRSInfo;
+ synchronized (serverManager.deadServerStatusLock) {
+ // Synchronizing to avoid a race condition with ServerManager.expireServer.
+ processingRootServerAsDead =
+ serverManager.isDeadProcessingPending(serverName);
+ rootRSInfo = serverManager.getServerInfo(serverName);
+ }
+ reassignRoot = !processingRootServerAsDead && rootRSInfo == null;
+ LOG.info("reassignRoot=" + reassignRoot +
+ ", processingRootServerAsDead=" + processingRootServerAsDead +
+ ", rootRSInfo=" + rootRSInfo);
+ }
+ }
+
+ if (reassignRoot) {
+ regionManager.reassignRootRegion();
+ }
+ }
+
+ public void backgroundRecoverRegionStateFromZK() {
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ recoverRegionStateFromZK();
+ } catch (Throwable ex) {
+ LOG.error(ex);
+ master.stop("Failed to recover region assignment from ZK");
+ }
+ }
+ }, "backgroundRecoverRegionStateFromZK");
+ t.setDaemon(true);
+ t.start();
+ }
+
+ /**
+ * Return true if there are no live regionservers. Assumes that
+ * {@link #registerLiveRegionServers} has been called. Only used for testing. No decisions are
+ * made based on the boolean "is cluster startup" flag.
+ */
+ boolean isClusterStartup() throws IOException {
+ return liveRSNamesAtStartup.isEmpty();
+ }
+
+ private boolean isStopped() {
+ return master.isStopped();
+ }
+
+ public boolean wasLiveRegionServerAtStartup(String serverName) {
+ return liveRSNamesAtStartup.contains(serverName);
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ZKUnassignedWatcher.java Sun Aug 5 19:16:11 2012
@@ -20,16 +20,20 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
+import org.apache.hadoop.hbase.util.DrainableQueue;
+import org.apache.hadoop.hbase.util.ParamCallable;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.hbase.zookeeper.ZNodePathAndData;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper.ZNodePathAndData;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
@@ -42,51 +46,47 @@ import org.apache.zookeeper.Watcher.Even
public class ZKUnassignedWatcher implements Watcher {
private static final Log LOG = LogFactory.getLog(ZKUnassignedWatcher.class);
- private ZooKeeperWrapper zkWrapper;
- String serverName;
- ServerManager serverManager;
-
- public static void start(Configuration conf, HMaster master)
- throws IOException {
- new ZKUnassignedWatcher(conf, master);
+ private final ZooKeeperWrapper zkWrapper;
+ private final String serverName;
+ private final ServerManager serverManager;
+ private final String unassignedZNode;
+
+ private DrainableQueue<ZNodePathAndData> delayedZKEvents =
+ new DrainableQueue<ZNodePathAndData>("delayedZKEvents");
+
+ private List<String> unassignedDirSnapshot = new ArrayList<String>();
+
+ private ParamCallable<ZNodePathAndData> processEvent = new ParamCallable<ZNodePathAndData>() {
+ @Override
+ public void call(ZNodePathAndData pathAndData) {
+ try {
+ handleRegionStateInZK(pathAndData.getzNodePath(), pathAndData.getData(), false);
+ } catch (IOException e) {
+ LOG.error("Could not process event from ZooKeeper", e);
+ }
+ }
+ };
+
+ ZKUnassignedWatcher(HMaster master) throws IOException {
LOG.debug("Started ZKUnassigned watcher");
- }
-
- public ZKUnassignedWatcher(Configuration conf, HMaster master)
- throws IOException {
this.serverName = master.getHServerAddress().toString();
this.serverManager = master.getServerManager();
- zkWrapper = ZooKeeperWrapper.getInstance(conf, master.getZKWrapperName());
- String unassignedZNode = zkWrapper.getRegionInTransitionZNode();
+ zkWrapper = ZooKeeperWrapper.getInstance(master.getConfiguration(), master.getZKWrapperName());
+ unassignedZNode = zkWrapper.getRegionInTransitionZNode();
- // If the UNASSIGNED ZNode exists and this is a fresh cluster start, then
- // delete it.
- final boolean unassignedNodeExists =
- zkWrapper.exists(unassignedZNode, false);
- LOG.debug(getClass().getSimpleName() + " constructor: " +
- "unassignedNodeExists=" + unassignedNodeExists + ", " +
- "isClusterStartup=" + master.isClusterStartup());
+ // Set a watch on Zookeeper's UNASSIGNED node if it exists.
+ zkWrapper.registerListener(this);
- if (master.isClusterStartup() && unassignedNodeExists) {
- LOG.info("Cluster start, but found " + unassignedZNode + ", deleting it.");
+ if (zkWrapper.exists(unassignedZNode, false)) {
+ // The unassigned directory already exists in ZK. Take a snapshot of unassigned regions.
try {
- zkWrapper.deleteZNode(unassignedZNode, true);
- } catch (KeeperException e) {
- LOG.error("Could not delete znode " + unassignedZNode, e);
- throw new IOException(e);
- } catch (InterruptedException e) {
- LOG.error("Could not delete znode " + unassignedZNode, e);
- throw new IOException(e);
+ unassignedDirSnapshot = zkWrapper.listChildrenAndWatchForNewChildren(unassignedZNode);
+ } catch (KeeperException ke) {
+ throw new IOException(ke);
}
+ } else {
+ zkWrapper.createZNodeIfNotExists(unassignedZNode); // create and watch
}
-
- // If the UNASSIGNED ZNode does not exist, create it.
- zkWrapper.createZNodeIfNotExists(unassignedZNode);
-
- // TODO: get the outstanding changes in UNASSIGNED
-
- // Set a watch on Zookeeper's UNASSIGNED node if it exists.
- zkWrapper.registerListener(this);
}
/**
@@ -105,9 +105,7 @@ public class ZKUnassignedWatcher impleme
}
// check if the path is for the UNASSIGNED directory we care about
- if(event.getPath() == null ||
- !event.getPath().startsWith(zkWrapper.getZNodePathForHBase(
- zkWrapper.getRegionInTransitionZNode()))) {
+ if (event.getPath() == null || !event.getPath().startsWith(unassignedZNode)) {
return;
}
@@ -142,7 +140,7 @@ public class ZKUnassignedWatcher impleme
for(ZNodePathAndData zNodePathAndData : newZNodes) {
LOG.debug("Handling updates for znode: " + zNodePathAndData.getzNodePath());
handleRegionStateInZK(zNodePathAndData.getzNodePath(),
- zNodePathAndData.getData());
+ zNodePathAndData.getData(), true);
}
}
}
@@ -162,20 +160,34 @@ public class ZKUnassignedWatcher impleme
*/
private void handleRegionStateInZK(String zNodePath) throws IOException {
byte[] data = zkWrapper.readZNode(zNodePath, null);
- handleRegionStateInZK(zNodePath, data);
+ handleRegionStateInZK(zNodePath, data, true);
}
- private void handleRegionStateInZK(String zNodePath, byte[] data) {
+ void handleRegionStateInZK(String zNodePath, byte[] data, boolean canDefer) throws IOException {
// a null value is set when a node is created, we don't need to handle this
if(data == null) {
return;
}
- String rgnInTransitNode = zkWrapper.getRegionInTransitionZNode();
+
String region = zNodePath.substring(
- zNodePath.indexOf(rgnInTransitNode) + rgnInTransitNode.length() + 1);
+ zNodePath.indexOf(unassignedZNode) + unassignedZNode.length() + 1);
+
HBaseEventType rsEvent = HBaseEventType.fromByte(data[0]);
LOG.debug("Got event type [ " + rsEvent + " ] for region " + region);
+ RegionTransitionEventData rt = new RegionTransitionEventData();
+ Writables.getWritable(data, rt);
+
+ if (canDefer) {
+ ZNodePathAndData pathAndData = new ZNodePathAndData(zNodePath, data);
+ if (delayedZKEvents.enqueue(pathAndData)) {
+ // We will process this event after the initial scan of the unassigned directory is done.
+ LOG.debug("ZK-EVENT-PROCESS: deferring processing of event " + rsEvent + ", path "
+ + zNodePath + " until master startup is complete");
+ return;
+ }
+ }
+
// if the node was CLOSED then handle it
if(rsEvent == HBaseEventType.RS2ZK_REGION_CLOSED) {
new MasterCloseRegionHandler(rsEvent, serverManager, serverName, region, data).submit();
@@ -187,4 +199,16 @@ public class ZKUnassignedWatcher impleme
data).submit();
}
}
+
+ void drainZKEventQueue() {
+ LOG.info("Draining ZK unassigned event queue");
+ delayedZKEvents.drain(processEvent);
+ LOG.info("Finished draining ZK unassigned event queue");
+ }
+
+ /** @return a snapshot of the ZK unassigned directory taken when we set the watch */
+ List<String> getUnassignedDirSnapshot() {
+ return unassignedDirSnapshot;
+ }
+
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterCloseRegionHandler.java Sun Aug 5 19:16:11 2012
@@ -19,18 +19,11 @@
*/
package org.apache.hadoop.hbase.master.handler;
-import java.io.IOException;
-import java.util.ArrayList;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.util.Writables;
/**
* This is the event handler for all events relating to closing regions on the
@@ -41,31 +34,23 @@ import org.apache.hadoop.hbase.util.Writ
public class MasterCloseRegionHandler extends HBaseEventHandler
{
private static final Log LOG = LogFactory.getLog(MasterCloseRegionHandler.class);
-
- private String regionName;
- protected byte[] serializedData;
- RegionTransitionEventData hbEventData;
- ServerManager serverManager;
-
- public MasterCloseRegionHandler(HBaseEventType eventType,
- ServerManager serverManager,
- String serverName,
- String regionName,
+
+ public MasterCloseRegionHandler(HBaseEventType eventType,
+ ServerManager serverManager,
+ String serverName,
+ String regionName,
byte[] serializedData) {
- super(false, serverName, eventType);
- this.regionName = regionName;
- this.serializedData = serializedData;
- this.serverManager = serverManager;
+ super(false, serverName, eventType, regionName, serializedData, serverManager);
}
/**
- * Handle the various events relating to closing regions. We can get the
+ * Handle the various events relating to closing regions. We can get the
* following events here:
* - RS_REGION_CLOSING : No-op
- * - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown
- * state, find the RS to open this region. This could
- * be a part of a region move, or just that the RS has
- * died. Should result in a M_REQUEST_OPENREGION event
+ * - RS_REGION_CLOSED : The region is closed. If we are not in a shutdown
+ * state, find the RS to open this region. This could
+ * be a part of a region move, or just that the RS has
+ * died. Should result in a M_REQUEST_OPENREGION event
* getting created.
*/
@Override
@@ -75,31 +60,19 @@ public class MasterCloseRegionHandler ex
// handle RS_REGION_CLOSED events
handleRegionClosedEvent();
}
-
+
private void handleRegionClosedEvent() {
- try {
- if(hbEventData == null) {
- hbEventData = new RegionTransitionEventData();
- Writables.getWritable(serializedData, hbEventData);
- }
- } catch (IOException e) {
- LOG.error("Could not deserialize additional args for Close region", e);
- }
-
+ ensureEventDataAvailable();
String serverName = hbEventData.getRsName();
HServerInfo serverInfo = serverManager.getServerInfo(serverName);
-
- // process the region close - this will cause the reopening of the
+
+ // process the region close - this will cause the reopening of the
// region as a part of the heartbeat of some RS
- serverManager.processRegionClose(serverInfo,
+ serverManager.processRegionClose(serverInfo,
hbEventData.getHmsg().getRegionInfo());
-
- LOG.info("Processed close of region " +
+
+ LOG.info("Processed close of region " +
hbEventData.getHmsg().getRegionInfo().getRegionNameAsString() +
" by region server: " + serverName);
}
-
- public String getRegionName() {
- return regionName;
- }
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/handler/MasterOpenRegionHandler.java Sun Aug 5 19:16:11 2012
@@ -19,18 +19,14 @@
*/
package org.apache.hadoop.hbase.master.handler;
-import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.executor.HBaseEventHandler;
-import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.ServerManager;
-import org.apache.hadoop.hbase.util.Writables;
/**
* This is the event handler for all events relating to opening regions on the
@@ -43,33 +39,24 @@ import org.apache.hadoop.hbase.util.Writ
*/
public class MasterOpenRegionHandler extends HBaseEventHandler {
private static final Log LOG = LogFactory.getLog(MasterOpenRegionHandler.class);
- // other args passed in a byte array form
- protected byte[] serializedData;
- private String regionName;
- private RegionTransitionEventData hbEventData;
- ServerManager serverManager;
-
- public MasterOpenRegionHandler(HBaseEventType eventType,
- ServerManager serverManager,
- String serverName,
- String regionName,
- byte[] serData) {
- super(false, serverName, eventType);
- this.regionName = regionName;
- this.serializedData = serData;
- this.serverManager = serverManager;
+ public MasterOpenRegionHandler(HBaseEventType eventType,
+ ServerManager serverManager,
+ String serverName,
+ String regionName,
+ byte[] serData) {
+ super(false, serverName, eventType, regionName, serData, serverManager);
}
/**
- * Handle the various events relating to opening regions. We can get the
+ * Handle the various events relating to opening regions. We can get the
* following events here:
- * - RS_REGION_OPENING : Keep track to see how long the region open takes.
- * If the RS is taking too long, then revert the
- * region back to closed state so that it can be
+ * - RS_REGION_OPENING : Keep track to see how long the region open takes.
+ * If the RS is taking too long, then revert the
+ * region back to closed state so that it can be
* re-assigned.
- * - RS_REGION_OPENED : The region is opened. Add an entry into META for
- * the RS having opened this region. Then delete this
+ * - RS_REGION_OPENED : The region is opened. Add an entry into META for
+ * the RS having opened this region. Then delete this
* entry in ZK.
*/
@Override
@@ -83,12 +70,12 @@ public class MasterOpenRegionHandler ext
handleRegionOpenedEvent();
}
}
-
+
private void handleRegionOpeningEvent() {
- // TODO: not implemented.
+ // TODO: not implemented.
LOG.debug("NO-OP call to handling region opening event");
- // Keep track to see how long the region open takes. If the RS is taking too
- // long, then revert the region back to closed state so that it can be
+ // Keep track to see how long the region open takes. If the RS is taking too
+ // long, then revert the region back to closed state so that it can be
// re-assigned.
}
@@ -105,27 +92,4 @@ public class MasterOpenRegionHandler ext
}
}
- private void ensureEventDataAvailable() {
- if (hbEventData != null) {
- return;
- }
-
- try {
- hbEventData = new RegionTransitionEventData();
- Writables.getWritable(serializedData, hbEventData);
- } catch (IOException e) {
- LOG.error("Could not deserialize additional args for Open region", e);
- throw new RuntimeException(e);
- }
- }
-
- public String getRegionName() {
- return regionName;
- }
-
- public String getRegionServerName() {
- ensureEventDataAvailable();
- return hbEventData.getRsName();
- }
-
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sun Aug 5 19:16:11 2012
@@ -1617,7 +1617,9 @@ public class HRegionServer implements HR
Threads.shutdown(this.cacheFlusher);
Threads.shutdown(this.hlogRoller);
this.compactSplitThread.join();
- this.replicationHandler.join();
+ if (replicationHandler != null) {
+ this.replicationHandler.join();
+ }
}
private boolean getMaster() {
@@ -1655,8 +1657,8 @@ public class HRegionServer implements HR
private HServerAddress readMasterAddressFromZK() {
HServerAddress masterAddress = null;
try {
- masterAddress = zooKeeperWrapper.readAddressOrThrow(
- zooKeeperWrapper.masterElectionZNode, zooKeeperWrapper);
+ masterAddress = HServerInfo.getAddress(zooKeeperWrapper.readAddressOrThrow(
+ zooKeeperWrapper.masterElectionZNode, zooKeeperWrapper));
} catch (KeeperException e) {
LOG.fatal(UNABLE_TO_READ_MASTER_ADDRESS_ERR_MSG, e);
forceAbort();
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceFileLogWriter.java Sun Aug 5 19:16:11 2012
@@ -56,11 +56,11 @@ public class SequenceFileLogWriter imple
}
@Override
- public void init(FileSystem fs, Path path, Configuration conf)
+ public void init(FileSystem fs, Path path, Configuration conf)
throws IOException {
// Create a SF.Writer instance.
try {
- this.generateWriter(fs,path,conf);
+ this.generateWriter(fs,path,conf);
} catch (InvocationTargetException ite) {
// function was properly called, but threw it's own exception
throw new IOException(ite.getCause());
@@ -139,9 +139,13 @@ public class SequenceFileLogWriter imple
this.writer.sync();
if (this.syncFs != null) {
try {
- this.syncFs.invoke(this.writer, HLog.NO_ARGS);
+ this.syncFs.invoke(this.writer, HLog.NO_ARGS);
} catch (Exception e) {
- throw new IOException("Reflection", e);
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ }
+ throw new IOException("Reflection: could not call method " + syncFs.getName()
+ + " with no arguments on " + writer.getClass().getName(), e);
}
}
}
@@ -158,20 +162,20 @@ public class SequenceFileLogWriter imple
public OutputStream getDFSCOutputStream() {
return this.dfsClient_out;
}
-
- // To be backward compatible; we still need to call the old sequence file
- // interface.
- private void generateWriter(FileSystem fs, Path path, Configuration conf)
+
+ // To be backward compatible; we still need to call the old sequence file
+ // interface.
+ private void generateWriter(FileSystem fs, Path path, Configuration conf)
throws InvocationTargetException, Exception {
- boolean forceSync =
- conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
- if (forceSync) {
+ boolean forceSync =
+ conf.getBoolean("hbase.regionserver.hlog.writer.forceSync", false);
+ if (forceSync) {
// call the new create api with force sync flag
this.writer = (SequenceFile.Writer) SequenceFile.class
.getMethod("createWriter", new Class[] {FileSystem.class,
Configuration.class, Path.class, Class.class, Class.class,
Integer.TYPE, Short.TYPE, Long.TYPE, Boolean.TYPE,
- CompressionType.class, CompressionCodec.class, Metadata.class,
+ CompressionType.class, CompressionCodec.class, Metadata.class,
Boolean.TYPE})
.invoke(null, new Object[] {fs, conf, path, HLog.getKeyClass(conf),
WALEdit.class,
@@ -187,8 +191,8 @@ public class SequenceFileLogWriter imple
forceSync
});
- } else {
- // still need to keep old interface to be backward compatible
+ } else {
+ // still need to keep old interface to be backward compatible
// reflection for a version of SequenceFile.createWriter that doesn't
// automatically create the parent directory (see HBASE-2312)
this.writer = (SequenceFile.Writer) SequenceFile.class
@@ -208,6 +212,6 @@ public class SequenceFileLogWriter imple
SequenceFile.CompressionType.NONE, new DefaultCodec(),
new Metadata()
});
- }
+ }
}
}
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java?rev=1369645&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/DrainableQueue.java Sun Aug 5 19:16:11 2012
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.InterruptedIOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.StopStatus;
+
+/**
+ * Suppose we are responding events and have the option to process them in real time or to defer
+ * (buffer) them. Also suppose that we have an initial period when we have to buffer all the
+ * events, after which we have to process all the buffered events and switch to real-time
+ * processing, while events are still arriving. This class provides support for that use case. We
+ * are assuming that any number of threads can add elements to the queue, but only one thread can
+ * drain the queue and make the switch to real-time processing, and there are no other ways to take
+ * elements out of the queue.
+ *
+ * @param <T> queue element type
+ */
+public class DrainableQueue<T> {
+ private static final Log LOG = LogFactory.getLog(DrainableQueue.class);
+
+ private final BlockingQueue<T> queue = new LinkedBlockingQueue<T>();
+ private final String name;
+
+ /** True if the queue has been completely drained. */
+ private boolean drained = false;
+
+ /** Making sure that only one thread can drain the queue at a time */
+ private Object drainLock = new Object();
+
+ /** We will stop draining the queue if this stop status is set to true */
+ private StopStatus drainStop;
+
+ public DrainableQueue(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Enqueue an event if we are still in the "deferred processing" mode for this queue. Even if
+ * we have already started draining the queue, we still enqueue events if the draining process
+ * has not completed.
+ */
+ public synchronized boolean enqueue(T event) throws InterruptedIOException {
+ if (!drained) {
+ try {
+ queue.put(event);
+ } catch (InterruptedException ex) {
+ String msg = "Could not add event to queue " + name;
+ LOG.error(msg, ex);
+ throw new InterruptedIOException(msg + ": " + ex.getMessage());
+ }
+ // Enqueued the event.
+ return true;
+ }
+ // Event not accepted, tell the caller to process it in real time.
+ return false;
+ }
+
+ /**
+ * Find the top element in the queue if it is present. Used while draining the queue. If there
+ * are no elements left in the queue, the "drained" status is set. Does not remove the element
+ * from the queue.
+ *
+ * @return the head of the queue or null if the queue has been drained.
+ */
+ private T peek() throws InterruptedException {
+ synchronized (this) {
+ if (queue.isEmpty()) {
+ drained = true;
+ return null;
+ }
+ }
+ // We are assuming that no elements could be taken out of the queue between the isEmpty check
+ // above and here, because the only thread that can do that this thread, which is draining
+ // the queue.
+ return queue.peek();
+ }
+
+ public void drain(ParamCallable<T> processor) {
+ synchronized (drainLock) {
+ if (drainStop != null && drainStop.isStopped()) {
+ LOG.error("Stopping draining event queue " + name + " because we are shutting down");
+ return;
+ }
+ T event;
+ try {
+ while ((event = peek()) != null) {
+ processor.call(event);
+ // Assuming that this will always succeed as we are the only thread allowed to take
+ // elements out of the queue, and we verified above that queue was not empty.
+ // We cannot switch to real-time event processing before we finish processing the event
+ // that came out of the queue because new events may arrive while the event is still
+ // being processed, resulting in event re-ordering.
+ queue.remove();
+ }
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while draining " + name);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void stopDrainIfStopped(StopStatus drainStop) {
+ this.drainStop = drainStop;
+ }
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/JVMClusterUtil.java Sun Aug 5 19:16:11 2012
@@ -98,7 +98,7 @@ public class JVMClusterUtil {
/**
* Creates a {@link HMaster}. Call 'start' on the returned thread to make it
* run. Modifies the passed configuration -- the caller is responsible for
- * defensive copying.
+ * defensive copying.
* @param masterConf configuration to use
* @param hmc class to create an instance of
* @param masterId a unique identifier of a master within a mini-cluster
@@ -171,8 +171,7 @@ public class JVMClusterUtil {
if (masters != null) {
for (HMaster t : masters) {
if (t.isActiveMaster()) {
- // This will trigger cluster shutdown.
- t.shutdown();
+ t.requestClusterShutdown();
} else {
// This will only stop this particular master.
t.stop("normal shutdown");
@@ -193,7 +192,7 @@ public class JVMClusterUtil {
}
}
}
-
+
if (masters != null) {
for (HMaster t : masters) {
while (t.isAlive()) {
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java?rev=1369645&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/ParamCallable.java Sun Aug 5 19:16:11 2012
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+/** A callable object that takes an argument */
+public interface ParamCallable<T> {
+
+ void call(T arg);
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java?rev=1369645&r1=1369644&r2=1369645&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/util/Threads.java Sun Aug 5 19:16:11 2012
@@ -64,7 +64,9 @@ public class Threads {
*/
public static Thread setDaemonThreadRunning(final Thread t,
final String name, final UncaughtExceptionHandler handler) {
- t.setName(name);
+ if (name != null) {
+ t.setName(name);
+ }
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
}
@@ -181,15 +183,15 @@ public class Threads {
}
/**
- * Create a new CachedThreadPool with a bounded number as the maximum
+ * Create a new CachedThreadPool with a bounded number as the maximum
* thread size in the pool.
- *
+ *
* @param maxCachedThread the maximum thread could be created in the pool
* @param timeout the maximum time to wait
* @param unit the time unit of the timeout argument
* @param threadFactory the factory to use when creating new threads
- * @return threadPoolExecutor the cachedThreadPool with a bounded number
- * as the maximum thread size in the pool.
+ * @return threadPoolExecutor the cachedThreadPool with a bounded number
+ * as the maximum thread size in the pool.
*/
public static ThreadPoolExecutor getBoundedCachedThreadPool(
int maxCachedThread, long timeout, TimeUnit unit,