You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2008/02/24 00:53:31 UTC
svn commit: r630545 [2/2] - in /hadoop/hbase/trunk/src:
java/org/apache/hadoop/hbase/client/ java/org/apache/hadoop/hbase/master/
java/org/apache/hadoop/hbase/util/ test/org/apache/hadoop/hbase/
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=630545&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionManager.java Sat Feb 23 15:53:21 2008
@@ -0,0 +1,630 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Collections;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.io.Text;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegion;
+import org.apache.hadoop.hbase.HRegionInterface;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.util.Writables;
+
+/**
+ * Class to manage assigning regions to servers, state of root and meta, etc.
+ */
+class RegionManager implements HConstants {
+ protected static final Log LOG = LogFactory.getLog(RegionManager.class.getName());
+
+ private volatile AtomicReference<HServerAddress> rootRegionLocation =
+ new AtomicReference<HServerAddress>(null);
+
+ final Lock splitLogLock = new ReentrantLock();
+
+ private final RootScanner rootScannerThread;
+ final MetaScanner metaScannerThread;
+
+ /** Set by root scanner to indicate the number of meta regions */
+ private final AtomicInteger numberOfMetaRegions = new AtomicInteger();
+
+ /** These are the online meta regions */
+ private final SortedMap<Text, MetaRegion> onlineMetaRegions =
+ Collections.synchronizedSortedMap(new TreeMap<Text, MetaRegion>());
+
+ /**
+ * The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that
+ * indicates the last time we *tried* to assign the region to a RegionServer.
+ * If the timestamp is out of date, then we can try to reassign it.
+ *
+ * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
+ * set of all known valid regions.
+ *
+ * <p>Items are removed from this list when a region server reports in that
+ * the region has been deployed.
+ */
+ private final SortedMap<HRegionInfo, Long> unassignedRegions =
+ Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>());
+
+ /**
+ * Regions that have been assigned, and the server has reported that it has
+ * started serving it, but that we have not yet recorded in the meta table.
+ */
+ private final Set<Text> pendingRegions =
+ Collections.synchronizedSet(new HashSet<Text>());
+
+ /**
+ * The 'killList' is a list of regions that are going to be closed, but not
+ * reopened.
+ */
+ private final Map<String, Map<Text, HRegionInfo>> killList =
+ new ConcurrentHashMap<String, Map<Text, HRegionInfo>>();
+
+ /** 'killedRegions' contains regions that are in the process of being closed */
+ private final Set<Text> killedRegions =
+ Collections.synchronizedSet(new HashSet<Text>());
+
+ /**
+ * 'regionsToDelete' contains regions that need to be deleted, but cannot be
+ * until the region server closes it
+ */
+ private final Set<Text> regionsToDelete =
+ Collections.synchronizedSet(new HashSet<Text>());
+
+ private HMaster master;
+
+ RegionManager(HMaster master) {
+ this.master = master;
+
+ // The root region
+ rootScannerThread = new RootScanner(master, this);
+
+ // Scans the meta table
+ metaScannerThread = new MetaScanner(master, this);
+
+ unassignRootRegion();
+ }
+
+ void start() {
+ Threads.setDaemonThreadRunning(rootScannerThread,
+ "RegionManager.rootScanner");
+ Threads.setDaemonThreadRunning(metaScannerThread,
+ "RegionManager.metaScanner");
+ }
+
+ /*
+ * Unassign the root region.
+ * This method would be used in case where root region server had died
+ * without reporting in. Currently, we just flounder and never recover. We
+ * could 'notice' dead region server in root scanner -- if we failed access
+ * multiple times -- but reassigning root is catastrophic.
+ *
+ */
+ void unassignRootRegion() {
+ rootRegionLocation.set(null);
+ if (!master.shutdownRequested) {
+ unassignedRegions.put(HRegionInfo.rootRegionInfo, ZERO_L);
+ }
+ }
+
+ /*
+ * Assigns regions to region servers attempting to balance the load across
+ * all region servers
+ *
+ * @param info
+ * @param serverName
+ * @param returnMsgs
+ */
+ void assignRegions(HServerInfo info, String serverName,
+ ArrayList<HMsg> returnMsgs) {
+
+ synchronized (unassignedRegions) {
+
+ // We need to hold a lock on assign attempts while we figure out what to
+ // do so that multiple threads do not execute this method in parallel
+ // resulting in assigning the same region to multiple servers.
+
+ long now = System.currentTimeMillis();
+ Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
+ for (Map.Entry<HRegionInfo, Long> e: unassignedRegions.entrySet()) {
+ HRegionInfo i = e.getKey();
+ if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
+ !i.isMetaRegion()) {
+ // Can't assign user regions until all meta regions have been assigned
+ // and are on-line
+ continue;
+ }
+ long diff = now - e.getValue().longValue();
+ if (diff > master.maxRegionOpenTime) {
+ regionsToAssign.add(e.getKey());
+ }
+ }
+ int nRegionsToAssign = regionsToAssign.size();
+ if (nRegionsToAssign <= 0) {
+ // No regions to assign. Return.
+ return;
+ }
+
+ if (master.serverManager.numServers() == 1) {
+ assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
+ // Finished. Return.
+ return;
+ }
+
+ // Multiple servers in play.
+ // We need to allocate regions only to most lightly loaded servers.
+ HServerLoad thisServersLoad = info.getLoad();
+ int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
+ nRegionsToAssign -= nregions;
+ if (nRegionsToAssign > 0) {
+ // We still have more regions to assign. See how many we can assign
+ // before this server becomes more heavily loaded than the next
+ // most heavily loaded server.
+ SortedMap<HServerLoad, Set<String>> heavyServers =
+ new TreeMap<HServerLoad, Set<String>>();
+ synchronized (master.serverManager.loadToServers) {
+ heavyServers.putAll(
+ master.serverManager.loadToServers.tailMap(thisServersLoad));
+ }
+ int nservers = 0;
+ HServerLoad heavierLoad = null;
+ for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
+ Set<String> servers = e.getValue();
+ nservers += servers.size();
+ if (e.getKey().compareTo(thisServersLoad) == 0) {
+ // This is the load factor of the server we are considering
+ nservers -= 1;
+ continue;
+ }
+
+ // If we get here, we are at the first load entry that is a
+ // heavier load than the server we are considering
+ heavierLoad = e.getKey();
+ break;
+ }
+
+ nregions = 0;
+ if (heavierLoad != null) {
+ // There is a more heavily loaded server
+ for (HServerLoad load =
+ new HServerLoad(thisServersLoad.getNumberOfRequests(),
+ thisServersLoad.getNumberOfRegions());
+ load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
+ load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
+ // continue;
+ }
+ }
+
+ if (nregions < nRegionsToAssign) {
+ // There are some more heavily loaded servers
+ // but we can't assign all the regions to this server.
+ if (nservers > 0) {
+ // There are other servers that can share the load.
+ // Split regions that need assignment across the servers.
+ nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
+ / (1.0 * nservers));
+ } else {
+ // No other servers with same load.
+ // Split regions over all available servers
+ nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
+ / (1.0 * master.serverManager.numServers()));
+ }
+ } else {
+ // Assign all regions to this server
+ nregions = nRegionsToAssign;
+ }
+
+ now = System.currentTimeMillis();
+ for (HRegionInfo regionInfo: regionsToAssign) {
+ LOG.info("assigning region " + regionInfo.getRegionName() +
+ " to server " + serverName);
+ unassignedRegions.put(regionInfo, Long.valueOf(now));
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
+ if (--nregions <= 0) {
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ /*
+ * @param nRegionsToAssign
+ * @param thisServersLoad
+ * @return How many regions we can assign to more lightly loaded servers
+ */
+ private int regionsPerServer(final int nRegionsToAssign,
+ final HServerLoad thisServersLoad) {
+
+ SortedMap<HServerLoad, Set<String>> lightServers =
+ new TreeMap<HServerLoad, Set<String>>();
+
+ synchronized (master.serverManager.loadToServers) {
+ lightServers.putAll(master.serverManager.loadToServers.headMap(thisServersLoad));
+ }
+
+ int nRegions = 0;
+ for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
+ HServerLoad lightLoad = new HServerLoad(e.getKey().getNumberOfRequests(),
+ e.getKey().getNumberOfRegions());
+ do {
+ lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
+ nRegions += 1;
+ } while (lightLoad.compareTo(thisServersLoad) <= 0
+ && nRegions < nRegionsToAssign);
+
+ nRegions *= e.getValue().size();
+ if (nRegions >= nRegionsToAssign) {
+ break;
+ }
+ }
+ return nRegions;
+ }
+
+ /*
+ * Assign all to the only server. An unlikely case but still possible.
+ * @param regionsToAssign
+ * @param serverName
+ * @param returnMsgs
+ */
+ private void assignRegionsToOneServer(final Set<HRegionInfo> regionsToAssign,
+ final String serverName, final ArrayList<HMsg> returnMsgs) {
+ long now = System.currentTimeMillis();
+ for (HRegionInfo regionInfo: regionsToAssign) {
+ LOG.info("assigning region " + regionInfo.getRegionName() +
+ " to the only server " + serverName);
+ unassignedRegions.put(regionInfo, Long.valueOf(now));
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
+ }
+ }
+
+ /**
+ * @return Read-only map of online regions.
+ */
+ public Map<Text, MetaRegion> getOnlineMetaRegions() {
+ return Collections.unmodifiableSortedMap(onlineMetaRegions);
+ }
+
+ /*
+ * Stop the root and meta scanners so that the region servers serving meta
+ * regions can shut down.
+ */
+ public void stopScanners() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("telling root scanner to stop");
+ }
+ rootScannerThread.interruptIfAlive();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("telling meta scanner to stop");
+ }
+ metaScannerThread.interruptIfAlive();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("meta and root scanners notified");
+ }
+ }
+
+ /** Stop the region assigner */
+ public void stop() {
+ try {
+ if (rootScannerThread.isAlive()) {
+ rootScannerThread.join(); // Wait for the root scanner to finish.
+ }
+ } catch (Exception iex) {
+ LOG.warn("root scanner", iex);
+ }
+ try {
+ if (metaScannerThread.isAlive()) {
+ metaScannerThread.join(); // Wait for meta scanner to finish.
+ }
+ } catch(Exception iex) {
+ LOG.warn("meta scanner", iex);
+ }
+ }
+
+ public boolean waitForMetaRegionsOrClose() throws IOException {
+ return metaScannerThread.waitForMetaRegionsOrClose();
+ }
+
+ /**
+ * Search our map of online meta regions to find the first meta region that
+ * should contain a pointer to <i>newRegion</i>.
+ */
+ public MetaRegion getFirstMetaRegionForRegion(HRegionInfo newRegion) {
+ synchronized (onlineMetaRegions) {
+ if (onlineMetaRegions.size() == 0) {
+ return null;
+ } else if (onlineMetaRegions.size() == 1) {
+ return onlineMetaRegions.get(onlineMetaRegions.firstKey());
+ } else {
+ if (onlineMetaRegions.containsKey(newRegion.getRegionName())) {
+ return onlineMetaRegions.get(newRegion.getRegionName());
+ } else {
+ return onlineMetaRegions.get(onlineMetaRegions.headMap(
+ newRegion.getTableDesc().getName()).lastKey());
+ }
+ }
+ }
+ }
+
+ /**
+ * Get a set of all the meta regions that contain info about a given table.
+ */
+ public Set<MetaRegion> getMetaRegionsForTable(Text tableName) {
+ Text firstMetaRegion = null;
+ Set<MetaRegion> metaRegions = new HashSet<MetaRegion>();
+
+ synchronized (onlineMetaRegions) {
+ if (onlineMetaRegions.size() == 1) {
+ firstMetaRegion = onlineMetaRegions.firstKey();
+ } else if (onlineMetaRegions.containsKey(tableName)) {
+ firstMetaRegion = tableName;
+ } else {
+ firstMetaRegion = onlineMetaRegions.headMap(tableName).lastKey();
+ }
+ metaRegions.addAll(onlineMetaRegions.tailMap(firstMetaRegion).values());
+ }
+ return metaRegions;
+ }
+
+ public void createRegion(HRegionInfo newRegion, HRegionInterface server,
+ Text metaRegionName)
+ throws IOException {
+ // 2. Create the HRegion
+ HRegion region =
+ HRegion.createHRegion(newRegion, master.rootdir, master.conf);
+
+ // 3. Insert into meta
+ HRegionInfo info = region.getRegionInfo();
+ Text regionName = region.getRegionName();
+ BatchUpdate b = new BatchUpdate(regionName);
+ b.put(COL_REGIONINFO, Writables.getBytes(info));
+ server.batchUpdate(metaRegionName, b);
+
+ // 4. Close the new region to flush it to disk. Close its log file too.
+ region.close();
+ region.getLog().closeAndDelete();
+
+ // 5. Get it assigned to a server
+ unassignedRegions.put(info, ZERO_L);
+ }
+
+ /** Set a MetaRegion as online. */
+ public void putMetaRegionOnline(MetaRegion metaRegion) {
+ onlineMetaRegions.put(metaRegion.getStartKey(), metaRegion);
+ }
+
+ /** Get a list of online MetaRegions */
+ public List<MetaRegion> getListOfOnlineMetaRegions() {
+ List<MetaRegion> regions = new ArrayList<MetaRegion>();
+ synchronized(onlineMetaRegions) {
+ regions.addAll(onlineMetaRegions.values());
+ }
+ return regions;
+ }
+
+ /** count of online meta regions */
+ public int numOnlineMetaRegions() {
+ return onlineMetaRegions.size();
+ }
+
+ /** Check if a meta region is online by its name */
+ public boolean isMetaRegionOnline(Text startKey) {
+ return onlineMetaRegions.containsKey(startKey);
+ }
+
+ /** Set an online MetaRegion offline - remove it from the map. **/
+ public void offlineMetaRegion(Text startKey) {
+ onlineMetaRegions.remove(startKey);
+ }
+
+ /** Check if a region is unassigned */
+ public boolean isUnassigned(HRegionInfo info) {
+ return unassignedRegions.containsKey(info);
+ }
+
+ /** Check if a region is pending */
+ public boolean isPending(Text regionName) {
+ return pendingRegions.contains(regionName);
+ }
+
+ /** Set a region to unassigned */
+ public void setUnassigned(HRegionInfo info) {
+ unassignedRegions.put(info, ZERO_L);
+ }
+
+ /** Set a region to pending assignment */
+ public void setPending(Text regionName) {
+ pendingRegions.add(regionName);
+ }
+
+ /** Unset region's pending status */
+ public void noLongerPending(Text regionName) {
+ pendingRegions.remove(regionName);
+ }
+
+ /** Update the deadline for a region assignment to be completed */
+ public void updateAssignmentDeadline(HRegionInfo info) {
+ synchronized (unassignedRegions) {
+ // Region server has acknowledged request to open region.
+ // Extend region open time by max region open time.
+ unassignedRegions.put(info,
+ System.currentTimeMillis() + master.maxRegionOpenTime);
+ }
+ }
+
+ /** Unset a region's unassigned status */
+ public void noLongerUnassigned(HRegionInfo info) {
+ unassignedRegions.remove(info);
+ }
+
+ /** Mark a region to be closed and not reopened */
+ public void markClosedNoReopen(String serverName, HRegionInfo info) {
+ synchronized (killList) {
+ Map<Text, HRegionInfo> serverKillList = killList.get(serverName);
+ if (serverKillList != null) {
+ serverKillList.put(info.getRegionName(), info);
+ }
+ }
+ }
+
+ /** Mark a bunch of regions as closed not reopen at once for a server */
+ public void markClosedNoReopenBulk(String serverName,
+ Map<Text, HRegionInfo> map) {
+ killList.put(serverName, map);
+ }
+
+ /**
+ * Get a map of region names to region infos waiting to be offlined for a
+ * given server
+ */
+ public Map<Text, HRegionInfo> getMarkedClosedNoReopen(String serverName) {
+ return killList.get(serverName);
+ }
+
+ /**
+ * Check if a region is marked as closed not reopen.
+ */
+ public boolean isMarkedClosedNoReopen(String serverName, Text regionName) {
+ synchronized (killList) {
+ Map<Text, HRegionInfo> regionsToKill =
+ killList.get(serverName);
+ return (regionsToKill != null && regionsToKill.containsKey(regionName));
+ }
+ }
+
+ /**
+ * Mark a region as no longer waiting to be closed and not reopened.
+ */
+ public void noLongerMarkedClosedNoReopen(String serverName, Text regionName) {
+ synchronized (killList) {
+ Map<Text, HRegionInfo> serverKillList = killList.get(serverName);
+ if (serverKillList != null) {
+ serverKillList.remove(regionName);
+ }
+ }
+ }
+
+ /** Check if a region is closing */
+ public boolean isClosing(Text regionName) {
+ return killedRegions.contains(regionName);
+ }
+
+ /** Set a region as no longer closing (closed?) */
+ public void noLongerClosing(Text regionName) {
+ killedRegions.remove(regionName);
+ }
+
+ /** mark a region as closing */
+ public void setClosing(Text regionName) {
+ killedRegions.add(regionName);
+ }
+
+ /**
+ * Add a meta region to the scan queue
+ */
+ public void addMetaRegionToScan(MetaRegion m) throws InterruptedException {
+ metaScannerThread.addMetaRegionToScan(m);
+ }
+
+ /** Mark a region as to be deleted */
+ public void markRegionForDeletion(Text regionName) {
+ regionsToDelete.add(regionName);
+ }
+
+ /** Note that a region to delete has been deleted */
+ public void regionDeleted(Text regionName) {
+ regionsToDelete.remove(regionName);
+ }
+
+ /** Check if a region is marked for deletion */
+ public boolean isMarkedForDeletion(Text regionName) {
+ return regionsToDelete.contains(regionName);
+ }
+
+ public boolean isInitialRootScanComplete() {
+ return rootScannerThread.isInitialScanComplete();
+ }
+
+ public boolean isInitialMetaScanComplete() {
+ return metaScannerThread.isInitialScanComplete();
+ }
+
+ public HServerAddress getRootRegionLocation() {
+ return rootRegionLocation.get();
+ }
+
+ public void waitForRootRegionLocation() {
+ synchronized (rootRegionLocation) {
+ while(!master.closed.get() && rootRegionLocation.get() == null) {
+ // rootRegionLocation will be filled in when we get an 'open region'
+ // regionServerReport message from the HRegionServer that has been
+ // allocated the ROOT region below.
+ try {
+ rootRegionLocation.wait();
+ } catch (InterruptedException e) {
+ // continue
+ }
+ }
+ }
+ }
+
+ public int numMetaRegions() {
+ return numberOfMetaRegions.get();
+ }
+
+ public void incrementNumMetaRegions() {
+ numberOfMetaRegions.incrementAndGet();
+ }
+
+ public void setRootRegionLocation(HServerAddress address) {
+ synchronized (rootRegionLocation) {
+ rootRegionLocation.set(new HServerAddress(address));
+ rootRegionLocation.notifyAll();
+ }
+ }
+
+ public void setNumMetaRegions(int num) {
+ numberOfMetaRegions.set(num);
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RegionServerOperation.java Sat Feb 23 15:53:21 2008
@@ -62,7 +62,7 @@
protected boolean rootAvailable() {
boolean available = true;
- if (master.rootRegionLocation.get() == null) {
+ if (master.getRootRegionLocation() == null) {
available = false;
requeue();
}
@@ -72,10 +72,13 @@
protected boolean metaTableAvailable() {
boolean available = true;
if (LOG.isDebugEnabled()) {
- LOG.debug("numberOfMetaRegions: " + master.numberOfMetaRegions.get() +
- ", onlineMetaRegions.size(): " + master.onlineMetaRegions.size());
+ LOG.debug("numberOfMetaRegions: " +
+ master.regionManager.numMetaRegions() +
+ ", onlineMetaRegions.size(): " +
+ master.regionManager.numOnlineMetaRegions());
}
- if (master.numberOfMetaRegions.get() != master.onlineMetaRegions.size()) {
+ if (master.regionManager.numMetaRegions() !=
+ master.regionManager.numOnlineMetaRegions()) {
// We can't proceed because not all of the meta regions are online.
// We can't block either because that would prevent the meta region
// online message from being processed. In order to prevent spinning
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RootScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RootScanner.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RootScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/RootScanner.java Sat Feb 23 15:53:21 2008
@@ -27,8 +27,8 @@
/** Scanner for the <code>ROOT</code> HRegion. */
class RootScanner extends BaseScanner {
/** Constructor */
- public RootScanner(HMaster master) {
- super(master, true, master.metaRescanInterval, master.closed);
+ public RootScanner(HMaster master, RegionManager regionManager) {
+ super(master, regionManager, true, master.metaRescanInterval, master.closed);
}
private boolean scanRoot() {
@@ -36,27 +36,16 @@
// caused by the server going away. Wait until next rescan interval when
// things should be back to normal
boolean scanSuccessful = false;
- synchronized (master.rootRegionLocation) {
- while(!master.closed.get() && master.rootRegionLocation.get() == null) {
- // rootRegionLocation will be filled in when we get an 'open region'
- // regionServerReport message from the HRegionServer that has been
- // allocated the ROOT region below.
- try {
- master.rootRegionLocation.wait();
- } catch (InterruptedException e) {
- // continue
- }
- }
- }
+ master.waitForRootRegionLocation();
if (master.closed.get()) {
return scanSuccessful;
}
try {
// Don't interrupt us while we're working
- synchronized(master.rootScannerLock) {
- scanRegion(new MetaRegion(master.rootRegionLocation.get(),
- HRegionInfo.rootRegionInfo.getRegionName(), null));
+ synchronized(scannerLock) {
+ scanRegion(new MetaRegion(master.getRootRegionLocation(),
+ HRegionInfo.rootRegionInfo.getRegionName(), null));
}
scanSuccessful = true;
} catch (IOException e) {
@@ -74,8 +63,8 @@
@Override
protected boolean initialScan() {
- master.rootScanned = scanRoot();
- return master.rootScanned;
+ initialScanComplete = scanRoot();
+ return initialScanComplete;
}
@Override
Added: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=630545&view=auto
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java (added)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/ServerManager.java Sat Feb 23 15:53:21 2008
@@ -0,0 +1,674 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.Collections;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.LeaseListener;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.io.Text;
+
+/**
+ * The ServerManager class manages info about region servers - HServerInfo,
+ * load numbers, dying servers, etc.
+ */
+class ServerManager implements HConstants {
+ static final Log LOG = LogFactory.getLog(ServerManager.class.getName());
+
+ /** The map of known server names to server info */
+ private final Map<String, HServerInfo> serversToServerInfo =
+ new ConcurrentHashMap<String, HServerInfo>();
+
+ /** Set of known dead servers */
+ private final Set<String> deadServers =
+ Collections.synchronizedSet(new HashSet<String>());
+
+ /** SortedMap server load -> Set of server names */
+ final SortedMap<HServerLoad, Set<String>> loadToServers =
+ Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
+
+ /** Map of server names -> server load */
+ private final Map<String, HServerLoad> serversToLoad =
+ new ConcurrentHashMap<String, HServerLoad>();
+
+ private HMaster master;
+
+ private final Leases serverLeases;
+
+ public ServerManager(HMaster master) {
+ this.master = master;
+ serverLeases = new Leases(master.leaseTimeout,
+ master.conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+ }
+
+ /** Let the server manager know a new regionserver has come online */
+ public void regionServerStartup(HServerInfo serverInfo) {
+ String s = serverInfo.getServerAddress().toString().trim();
+ LOG.info("received start message from: " + s);
+
+ HServerLoad load = serversToLoad.remove(s);
+ if (load != null) {
+ // The startup message was from a known server.
+ // Remove stale information about the server's load.
+ Set<String> servers = loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(s);
+ loadToServers.put(load, servers);
+ }
+ }
+
+ HServerInfo storedInfo = serversToServerInfo.remove(s);
+ if (storedInfo != null && !master.closed.get()) {
+ // The startup message was from a known server with the same name.
+ // Timeout the old one right away.
+ HServerAddress root = master.getRootRegionLocation();
+ if (root != null && root.equals(storedInfo.getServerAddress())) {
+ master.regionManager.unassignRootRegion();
+ }
+ master.delayedToDoQueue.put(new ProcessServerShutdown(master, storedInfo));
+ }
+
+ // record new server
+ load = new HServerLoad();
+ serverInfo.setLoad(load);
+ serversToServerInfo.put(s, serverInfo);
+ serversToLoad.put(s, load);
+ Set<String> servers = loadToServers.get(load);
+ if (servers == null) {
+ servers = new HashSet<String>();
+ }
+ servers.add(s);
+ loadToServers.put(load, servers);
+
+ if (!master.closed.get()) {
+ serverLeases.createLease(s, new ServerExpirer(s));
+ }
+ }
+
+ /** {@inheritDoc} */
+ public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
+ throws IOException {
+ String serverName = serverInfo.getServerAddress().toString().trim();
+
+ if (msgs.length > 0) {
+ if (msgs[0].getMsg() == HMsg.MSG_REPORT_EXITING) {
+ processRegionServerExit(serverName, msgs);
+ return new HMsg[]{msgs[0]};
+ } else if (msgs[0].getMsg() == HMsg.MSG_REPORT_QUIESCED) {
+ LOG.info("Region server " + serverName + " quiesced");
+ master.quiescedMetaServers.incrementAndGet();
+ }
+ }
+
+ if(master.quiescedMetaServers.get() >= serversToServerInfo.size()) {
+ // If the only servers we know about are meta servers, then we can
+ // proceed with shutdown
+ LOG.info("All user tables quiesced. Proceeding with shutdown");
+ master.startShutdown();
+ }
+
+ if (master.shutdownRequested && !master.closed.get()) {
+ // Tell the server to stop serving any user regions
+ return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_QUIESCE)};
+ }
+
+ if (master.closed.get()) {
+ // Tell server to shut down if we are shutting down. This should
+ // happen after check of MSG_REPORT_EXITING above, since region server
+ // will send us one of these messages after it gets MSG_REGIONSERVER_STOP
+ return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
+ }
+
+ HServerInfo storedInfo = serversToServerInfo.get(serverName);
+ if (storedInfo == null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("received server report from unknown server: " + serverName);
+ }
+
+ // The HBaseMaster may have been restarted.
+ // Tell the RegionServer to start over and call regionServerStartup()
+ return new HMsg[]{new HMsg(HMsg.MSG_CALL_SERVER_STARTUP)};
+ } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
+ // This state is reachable if:
+ //
+ // 1) RegionServer A started
+ // 2) RegionServer B started on the same machine, then
+ // clobbered A in regionServerStartup.
+ // 3) RegionServer A returns, expecting to work as usual.
+ //
+ // The answer is to ask A to shut down for good.
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("region server race condition detected: " + serverName);
+ }
+
+ synchronized (serversToServerInfo) {
+ cancelLease(serverName);
+ serversToServerInfo.notifyAll();
+ }
+
+ return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
+ } else {
+ return processRegionServerAllsWell(serverName, serverInfo, msgs);
+ }
+ }
+
+ /** Region server is exiting */
+ private void processRegionServerExit(String serverName, HMsg[] msgs) {
+ synchronized (serversToServerInfo) {
+ try {
+ // HRegionServer is shutting down. Cancel the server's lease.
+ // Note that canceling the server's lease takes care of updating
+ // serversToServerInfo, etc.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Region server " + serverName +
+ ": MSG_REPORT_EXITING -- cancelling lease");
+ }
+
+ if (cancelLease(serverName)) {
+ // Only process the exit message if the server still has a lease.
+ // Otherwise we could end up processing the server exit twice.
+ LOG.info("Region server " + serverName +
+ ": MSG_REPORT_EXITING -- lease cancelled");
+ // Get all the regions the server was serving reassigned
+ // (if we are not shutting down).
+ if (!master.closed.get()) {
+ for (int i = 1; i < msgs.length; i++) {
+ HRegionInfo info = msgs[i].getRegionInfo();
+ if (info.isRootRegion()) {
+ master.regionManager.unassignRootRegion();
+ } else if (info.isMetaTable()) {
+ master.regionManager.offlineMetaRegion(info.getStartKey());
+ }
+
+ master.regionManager.setUnassigned(info);
+ }
+ }
+ }
+
+ // We don't need to return anything to the server because it isn't
+ // going to do any more work.
+/* return new HMsg[0];*/
+ } finally {
+ serversToServerInfo.notifyAll();
+ }
+ }
+ }
+
+ /** RegionServer is checking in, no exceptional circumstances */
+ private HMsg[] processRegionServerAllsWell(String serverName,
+ HServerInfo serverInfo, HMsg[] msgs)
+ throws IOException {
+ // All's well. Renew the server's lease.
+ // This will always succeed; otherwise, the fetch of serversToServerInfo
+ // would have failed above.
+ serverLeases.renewLease(serverName);
+
+ // Refresh the info object and the load information
+ serversToServerInfo.put(serverName, serverInfo);
+
+ HServerLoad load = serversToLoad.get(serverName);
+ if (load != null && !load.equals(serverInfo.getLoad())) {
+ // We have previous information about the load on this server
+ // and the load on this server has changed
+ Set<String> servers = loadToServers.get(load);
+
+ // Note that servers should never be null because loadToServers
+ // and serversToLoad are manipulated in pairs
+ servers.remove(serverName);
+ loadToServers.put(load, servers);
+ }
+
+ // Set the current load information
+ load = serverInfo.getLoad();
+ serversToLoad.put(serverName, load);
+ Set<String> servers = loadToServers.get(load);
+ if (servers == null) {
+ servers = new HashSet<String>();
+ }
+ servers.add(serverName);
+ loadToServers.put(load, servers);
+
+ // Next, process messages for this server
+ return processMsgs(serverName, serverInfo, msgs);
+ }
+
+ /**
+ * Process all the incoming messages from a server that's contacted us.
+ *
+ * Note that we never need to update the server's load information because
+ * that has already been done in regionServerReport.
+ */
+ private HMsg[] processMsgs(String serverName, HServerInfo serverInfo,
+ HMsg incomingMsgs[])
+ throws IOException {
+ ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
+ Map<Text, HRegionInfo> regionsToKill =
+ master.regionManager.getMarkedClosedNoReopen(serverName);
+
+ // Get reports on what the RegionServer did.
+ for (int i = 0; i < incomingMsgs.length; i++) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received " + incomingMsgs[i].toString() + " from " +
+ serverName);
+ }
+ HRegionInfo region = incomingMsgs[i].getRegionInfo();
+
+ switch (incomingMsgs[i].getMsg()) {
+ case HMsg.MSG_REPORT_PROCESS_OPEN:
+ master.regionManager.updateAssignmentDeadline(region);
+ break;
+
+ case HMsg.MSG_REPORT_OPEN:
+ processRegionOpen(serverName, serverInfo,
+ incomingMsgs[i].getRegionInfo(), returnMsgs);
+ break;
+
+ case HMsg.MSG_REPORT_CLOSE:
+ LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
+ region.getRegionName());
+
+ if (region.isRootRegion()) {
+ // Root region
+ if (region.isOffline()) {
+ // Can't proceed without root region. Shutdown.
+ LOG.fatal("root region is marked offline");
+ master.shutdown();
+ }
+ master.regionManager.unassignRootRegion();
+
+ } else {
+ boolean reassignRegion = !region.isOffline();
+ boolean deleteRegion = false;
+
+ if (master.regionManager.isClosing(region.getRegionName())) {
+ master.regionManager.noLongerClosing(region.getRegionName());
+ reassignRegion = false;
+ }
+
+ if (master.regionManager.isMarkedForDeletion(region.getRegionName())) {
+ master.regionManager.regionDeleted(region.getRegionName());
+ reassignRegion = false;
+ deleteRegion = true;
+ }
+
+ if (region.isMetaTable()) {
+ // Region is part of the meta table. Remove it from onlineMetaRegions
+ master.regionManager.offlineMetaRegion(region.getStartKey());
+ }
+
+ // NOTE: we cannot put the region into unassignedRegions as that
+ // could create a race with the pending close if it gets
+ // reassigned before the close is processed.
+
+ master.regionManager.noLongerUnassigned(region);
+
+ try {
+ master.toDoQueue.put(new ProcessRegionClose(master, region,
+ reassignRegion, deleteRegion));
+
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
+ }
+ }
+ break;
+
+ case HMsg.MSG_REPORT_SPLIT:
+ processSplitRegion(serverName, serverInfo, region, incomingMsgs[++i],
+ incomingMsgs[++i], returnMsgs);
+ break;
+
+ default:
+ throw new IOException(
+ "Impossible state during msg processing. Instruction: " +
+ incomingMsgs[i].getMsg());
+ }
+ }
+
+ // Process the kill list
+
+ if (regionsToKill != null) {
+ for (HRegionInfo i: regionsToKill.values()) {
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, i));
+ master.regionManager.setClosing(i.getRegionName());
+ }
+ }
+
+ // Figure out what the RegionServer ought to do, and write back.
+ master.regionManager.assignRegions(serverInfo, serverName, returnMsgs);
+ return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
+ }
+
+ /** A region has split. **/
+ private void processSplitRegion(String serverName, HServerInfo serverInfo,
+ HRegionInfo region, HMsg splitA, HMsg splitB, ArrayList<HMsg> returnMsgs) {
+
+ HRegionInfo newRegionA = splitA.getRegionInfo();
+ master.regionManager.setUnassigned(newRegionA);
+
+ HRegionInfo newRegionB = splitB.getRegionInfo();
+ master.regionManager.setUnassigned(newRegionB);
+
+ LOG.info("region " + region.getRegionName() + " split. New regions are: " +
+ newRegionA.getRegionName() + ", " + newRegionB.getRegionName());
+
+ if (region.isMetaTable()) {
+ // A meta region has split.
+ master.regionManager.offlineMetaRegion(region.getStartKey());
+ master.regionManager.incrementNumMetaRegions();
+ }
+ }
+
+ /** Region server is reporting that a region is now opened */
+ private void processRegionOpen(String serverName, HServerInfo serverInfo,
+ HRegionInfo region, ArrayList<HMsg> returnMsgs)
+ throws IOException {
+ boolean duplicateAssignment = false;
+
+ if (!master.regionManager.isUnassigned(region)) {
+ if (region.isRootRegion()) {
+ // Root region
+ HServerAddress rootServer = master.getRootRegionLocation();
+ if (rootServer != null) {
+ if (rootServer.toString().compareTo(serverName) == 0) {
+ // A duplicate open report from the correct server
+ return;
+ }
+ // We received an open report on the root region, but it is
+ // assigned to a different server
+ duplicateAssignment = true;
+ }
+ } else {
+ // Not root region. If it is not a pending region, then we are
+ // going to treat it as a duplicate assignment, although we can't
+ // tell for certain that's the case.
+ if (master.regionManager.isPending(region.getRegionName())) {
+ // A duplicate report from the correct server
+ return;
+ }
+ duplicateAssignment = true;
+ }
+ }
+
+ if (duplicateAssignment) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("region server " + serverInfo.getServerAddress().toString()
+ + " should not have opened region " + region.getRegionName());
+ }
+
+ // This Region should not have been opened.
+ // Ask the server to shut it down, but don't report it as closed.
+ // Otherwise the HMaster will think the Region was closed on purpose,
+ // and then try to reopen it elsewhere; that's not what we want.
+ returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region));
+ } else {
+ LOG.info(serverInfo.getServerAddress().toString() + " serving " +
+ region.getRegionName());
+
+ // it was assigned, and it's not a duplicate assignment, so take it out
+ // of the unassigned list.
+ master.regionManager.noLongerUnassigned(region);
+
+ if (region.isRootRegion()) {
+ // Store the Root Region location (in memory)
+ master.regionManager.setRootRegionLocation(serverInfo.getServerAddress());
+ } else {
+ // Note that the table has been assigned and is waiting for the
+ // meta table to be updated.
+ master.regionManager.setPending(region.getRegionName());
+
+ // Queue up an update to note the region location.
+ try {
+ master.toDoQueue.put(
+ new ProcessRegionOpen(master, serverInfo, region));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
+ }
+ }
+ }
+ }
+
+ /** Region server reporting that it has closed a region */
+ private void processRegionClose(String serverName, HServerInfo info,
+ HRegionInfo region) {
+ LOG.info(info.getServerAddress().toString() + " no longer serving " +
+ region.getRegionName());
+
+ if (region.isRootRegion()) {
+ if (region.isOffline()) {
+ // Can't proceed without root region. Shutdown.
+ LOG.fatal("root region is marked offline");
+ master.shutdown();
+ }
+ master.regionManager.unassignRootRegion();
+ } else {
+ boolean reassignRegion = !region.isOffline();
+ boolean deleteRegion = false;
+
+ if (master.regionManager.isClosing(region.getRegionName())) {
+ master.regionManager.noLongerClosing(region.getRegionName());
+ reassignRegion = false;
+ }
+
+ if (master.regionManager.isMarkedForDeletion(region.getRegionName())) {
+ master.regionManager.regionDeleted(region.getRegionName());
+ reassignRegion = false;
+ deleteRegion = true;
+ }
+
+ if (region.isMetaTable()) {
+ // Region is part of the meta table. Remove it from onlineMetaRegions
+ master.regionManager.offlineMetaRegion(region.getStartKey());
+ }
+
+ // NOTE: we cannot put the region into unassignedRegions as that
+ // could create a race with the pending close if it gets
+ // reassigned before the close is processed.
+ master.regionManager.noLongerUnassigned(region);
+
+ try {
+ master.toDoQueue.put(new ProcessRegionClose(master, region, reassignRegion,
+ deleteRegion));
+ } catch (InterruptedException e) {
+ throw new RuntimeException(
+ "Putting into toDoQueue was interrupted.", e);
+ }
+ }
+ }
+
+ /** Cancel a server's lease and update its load information */
+ private boolean cancelLease(final String serverName) {
+ boolean leaseCancelled = false;
+ HServerInfo info = serversToServerInfo.remove(serverName);
+ if (info != null) {
+ // Only cancel lease and update load information once.
+ // This method can be called a couple of times during shutdown.
+ if (master.getRootRegionLocation() != null &&
+ info.getServerAddress().equals(master.getRootRegionLocation())) {
+ master.regionManager.unassignRootRegion();
+ }
+ LOG.info("Cancelling lease for " + serverName);
+ serverLeases.cancelLease(serverName);
+ leaseCancelled = true;
+
+ // update load information
+ HServerLoad load = serversToLoad.remove(serverName);
+ if (load != null) {
+ Set<String> servers = loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(serverName);
+ loadToServers.put(load, servers);
+ }
+ }
+ }
+ return leaseCancelled;
+ }
+
+
+ /** compute the average load across all region servers */
+ public int averageLoad() {
+ return 0;
+ }
+
+ public int numServers() {
+ return serversToServerInfo.size();
+ }
+
+ /** get HServerInfo from a server address */
+ public HServerInfo getServerInfo(String address) {
+ return serversToServerInfo.get(address);
+ }
+
+ /**
+ * @return Read-only map of servers to serverinfo.
+ */
+ public Map<String, HServerInfo> getServersToServerInfo() {
+ return Collections.unmodifiableMap(serversToServerInfo);
+ }
+
+ /**
+ * @return Read-only map of servers to load.
+ */
+ public Map<String, HServerLoad> getServersToLoad() {
+ return Collections.unmodifiableMap(serversToLoad);
+ }
+
+ /**
+ * @return Read-only map of load to servers.
+ */
+ public Map<HServerLoad, Set<String>> getLoadToServers() {
+ return Collections.unmodifiableMap(loadToServers);
+ }
+
+ public void notifyServers() {
+ synchronized (serversToServerInfo) {
+ serversToServerInfo.notifyAll();
+ }
+ }
+
+ /*
+ * Wait on regionservers to report in
+ * with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice
+ * the master is going down. Waits until all region servers come back with
+ * a MSG_REGIONSERVER_STOP which will cancel their lease or until leases held
+ * by remote region servers have expired.
+ */
+ void letRegionServersShutdown() {
+ if (!master.fsOk) {
+ // Forget waiting for the region servers if the file system has gone
+ // away. Just exit as quickly as possible.
+ return;
+ }
+ synchronized (serversToServerInfo) {
+ while (serversToServerInfo.size() > 0) {
+ LOG.info("Waiting on following regionserver(s) to go down (or " +
+ "region server lease expiration, whichever happens first): " +
+ serversToServerInfo.values());
+ try {
+ serversToServerInfo.wait(master.threadWakeFrequency);
+ } catch (InterruptedException e) {}
+ }
+ }
+ }
+
+ /** Instantiated to monitor the health of a region server */
+ private class ServerExpirer implements LeaseListener {
+ @SuppressWarnings("hiding")
+ private String server;
+
+ ServerExpirer(String server) {
+ this.server = server;
+ }
+
+ /** {@inheritDoc} */
+ public void leaseExpired() {
+ LOG.info(server + " lease expired");
+ // Remove the server from the known servers list and update load info
+ HServerInfo info = serversToServerInfo.remove(server);
+ if (info != null) {
+ HServerAddress root = master.getRootRegionLocation();
+ if (root != null && root.equals(info.getServerAddress())) {
+ master.regionManager.unassignRootRegion();
+ }
+ String serverName = info.getServerAddress().toString();
+ HServerLoad load = serversToLoad.remove(serverName);
+ if (load != null) {
+ Set<String> servers = loadToServers.get(load);
+ if (servers != null) {
+ servers.remove(serverName);
+ loadToServers.put(load, servers);
+ }
+ }
+ deadServers.add(server);
+ }
+ synchronized (serversToServerInfo) {
+ serversToServerInfo.notifyAll();
+ }
+
+ // NOTE: If the server was serving the root region, we cannot reassign it
+ // here because the new server will start serving the root region before
+ // the ProcessServerShutdown operation has a chance to split the log file.
+ if (info != null) {
+ master.delayedToDoQueue.put(new ProcessServerShutdown(master, info));
+ }
+ }
+ }
+
+ /** Start up the server manager */
+ public void start() {
+ // Leases are not the same as Chore threads. Set name differently.
+ this.serverLeases.setName("ServerManager.leaseChecker");
+ this.serverLeases.start();
+ }
+
+ /** Shut down the server manager */
+ public void stop() {
+ // stop monitor lease monitor
+ serverLeases.close();
+ }
+
+ public void removeDeadServer(String serverName) {
+ deadServers.remove(serverName);
+ }
+
+ public boolean isDead(String serverName) {
+ return deadServers.contains(serverName);
+ }
+}
\ No newline at end of file
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableDelete.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableDelete.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableDelete.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableDelete.java Sat Feb 23 15:53:21 2008
@@ -47,7 +47,7 @@
// For regions that are being served, mark them for deletion
for (HashSet<HRegionInfo> s: servedRegions.values()) {
for (HRegionInfo i: s) {
- this.master.regionsToDelete.add(i.getRegionName());
+ master.regionManager.markRegionForDeletion(i.getRegionName());
}
}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableOperation.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableOperation.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableOperation.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/master/TableOperation.java Sat Feb 23 15:53:21 2008
@@ -36,6 +36,11 @@
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Text;
+/**
+ * Abstract base class for operations that need to examine all HRegionInfo
+ * objects that make up a table. (For a table, operate on each of its rows
+ * in .META.) To gain the
+ */
abstract class TableOperation implements HConstants {
static final Long ZERO_L = Long.valueOf(0L);
@@ -58,31 +63,18 @@
throw new MasterNotRunningException();
}
- this.metaRegions = new HashSet<MetaRegion>();
this.tableName = tableName;
this.unservedRegions = new HashSet<HRegionInfo>();
// We can not access any meta region if they have not already been
// assigned and scanned.
- if (this.master.metaScannerThread.waitForMetaRegionsOrClose()) {
- throw new MasterNotRunningException(); // We're shutting down. Forget it.
+ if (master.regionManager.metaScannerThread.waitForMetaRegionsOrClose()) {
+ // We're shutting down. Forget it.
+ throw new MasterNotRunningException();
}
- Text firstMetaRegion = null;
- synchronized (this.master.onlineMetaRegions) {
- if (this.master.onlineMetaRegions.size() == 1) {
- firstMetaRegion = this.master.onlineMetaRegions.firstKey();
-
- } else if (this.master.onlineMetaRegions.containsKey(tableName)) {
- firstMetaRegion = tableName;
-
- } else {
- firstMetaRegion = this.master.onlineMetaRegions.headMap(tableName).lastKey();
- }
- this.metaRegions.addAll(this.master.onlineMetaRegions.tailMap(
- firstMetaRegion).values());
- }
+ this.metaRegions = master.regionManager.getMetaRegionsForTable(tableName);
}
void process() throws IOException {
@@ -90,19 +82,16 @@
boolean tableExists = false;
try {
// Prevent meta scanner from running
- synchronized(this.master.metaScannerLock) {
+ synchronized(master.regionManager.metaScannerThread.scannerLock) {
for (MetaRegion m: metaRegions) {
-
// Get a connection to a meta server
-
HRegionInterface server =
- this.master.connection.getHRegionConnection(m.getServer());
+ master.connection.getHRegionConnection(m.getServer());
// Open a scanner on the meta region
-
long scannerId =
server.openScanner(m.getRegionName(), COLUMN_FAMILY_ARRAY,
- tableName, System.currentTimeMillis(), null);
+ tableName, System.currentTimeMillis(), null);
try {
while (true) {
@@ -166,7 +155,7 @@
protected boolean isBeingServed(String serverName, long startCode) {
boolean result = false;
if (serverName != null && serverName.length() > 0 && startCode != -1L) {
- HServerInfo s = this.master.serversToServerInfo.get(serverName);
+ HServerInfo s = master.serverManager.getServerInfo(serverName);
result = s != null && s.getStartCode() == startCode;
}
return result;
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/SoftSortedMap.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/SoftSortedMap.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/SoftSortedMap.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/SoftSortedMap.java Sat Feb 23 15:53:21 2008
@@ -68,7 +68,17 @@
public V get(Object key) {
checkReferences();
SoftValue<K,V> value = internalMap.get(key);
- return value == null ? null : value.get();
+
+ if (value == null) {
+ return null;
+ } else {
+ if (value.get() == null) {
+ internalMap.remove(key);
+ return null;
+ } else {
+ return value.get();
+ }
+ }
}
public V remove(Object key) {
@@ -83,8 +93,9 @@
}
public boolean containsValue(Object value) {
- checkReferences();
- return internalMap.containsValue(value);
+/* checkReferences();
+ return internalMap.containsValue(value);*/
+ throw new UnsupportedOperationException("Don't support containsValue!");
}
public K firstKey() {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java Sat Feb 23 15:53:21 2008
@@ -35,20 +35,20 @@
* @return Returns the passed Thread <code>t</code>.
*/
public static Thread setDaemonThreadRunning(final Thread t,
- final String name) {
+ final String name) {
return setDaemonThreadRunning(t, name, null);
}
- /**
- * Utility method that sets name, daemon status and starts passed thread.
- * @param t
- * @param name
- * @param handler A handler to set on the thread. Pass null if want to
- * use default handler.
- * @return Returns the passed Thread <code>t</code>.
- */
- public static Thread setDaemonThreadRunning(final Thread t,
- final String name, final UncaughtExceptionHandler handler) {
+ /**
+ * Utility method that sets name, daemon status and starts passed thread.
+ * @param t
+ * @param name
+ * @param handler A handler to set on the thread. Pass null if want to
+ * use default handler.
+ * @return Returns the passed Thread <code>t</code>.
+ */
+ public static Thread setDaemonThreadRunning(final Thread t,
+ final String name, final UncaughtExceptionHandler handler) {
t.setName(name);
if (handler != null) {
t.setUncaughtExceptionHandler(handler);
Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java?rev=630545&r1=630544&r2=630545&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/MultiRegionTable.java Sat Feb 23 15:53:21 2008
@@ -147,7 +147,7 @@
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITA));
HRegionInfo splitB =
Writables.getHRegionInfoOrNull(data.get(HConstants.COL_SPLITB));
- assertTrue(fs.exists(parentDir));
+ assertTrue("parentDir should exist", fs.exists(parentDir));
LOG.info("Split happened. Parent is " + parent.getRegionName());
// Recalibrate will cause us to wait on new regions' deployment