You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jg...@apache.org on 2010/08/05 09:35:02 UTC
svn commit: r982489 [2/7] - in /hbase/branches/0.90_master_rewrite: ./
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/catalog/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/executor/ s...
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/HBaseExecutorService.java Thu Aug 5 07:35:00 2010
@@ -24,7 +24,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -43,88 +43,77 @@ import org.apache.commons.logging.LogFac
* HBEventHandler class and create an event type that submits to this service.
*
*/
-public class HBaseExecutorService
-{
+public class HBaseExecutorService {
private static final Log LOG = LogFactory.getLog(HBaseExecutorService.class);
// default number of threads in the pool
private int corePoolSize = 1;
- // max number of threads - maximum concurrency
- private int maximumPoolSize = 5;
// how long to retain excess threads
private long keepAliveTimeInMillis = 1000;
// the thread pool executor that services the requests
ThreadPoolExecutor threadPoolExecutor;
// work queue to use - unbounded queue
- BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
+ BlockingQueue<Runnable> workQueue = new PriorityBlockingQueue<Runnable>();
// name for this executor service
String name;
// hold the all the executors created in a map addressable by their names
static Map<String, HBaseExecutorService> executorServicesMap =
Collections.synchronizedMap(new HashMap<String, HBaseExecutorService>());
-
/**
- * The following is a list of names for the various executor services in both
+ * The following is a list of names for the various executor services in both
* the master and the region server.
*/
public enum HBaseExecutorServiceType {
- NONE (-1),
- MASTER_CLOSEREGION (1),
- MASTER_OPENREGION (2);
-
- private final int value;
-
- HBaseExecutorServiceType(int intValue) {
- this.value = intValue;
- }
-
- public void startExecutorService(String serverName) {
- // if this is NONE then there is no executor to start
- if(value == NONE.value) {
- throw new RuntimeException("Cannot start NONE executor type.");
- }
+
+ // Master executor services
+ MASTER_CLOSE_REGION (1),
+ MASTER_OPEN_REGION (2),
+ MASTER_SERVER_OPERATIONS (3),
+ MASTER_TABLE_OPERATIONS (4),
+
+ // RegionServer executor services
+ RS_OPEN_REGION (20),
+ RS_OPEN_ROOT (21),
+ RS_OPEN_META (22),
+ RS_CLOSE_REGION (23),
+ RS_CLOSE_ROOT (24),
+ RS_CLOSE_META (25);
+
+ HBaseExecutorServiceType(int value) {}
+
+ public void startExecutorService(String serverName, int maxThreads) {
String name = getExecutorName(serverName);
if(HBaseExecutorService.isExecutorServiceRunning(name)) {
LOG.debug("Executor service " + toString() + " already running on " + serverName);
return;
}
LOG.debug("Starting executor service [" + name + "]");
- HBaseExecutorService.startExecutorService(name);
+ HBaseExecutorService.startExecutorService(name, maxThreads);
}
-
+
public HBaseExecutorService getExecutor(String serverName) {
- // if this is NONE then there is no executor
- if(value == NONE.value) {
- return null;
- }
return HBaseExecutorService.getExecutorService(getExecutorName(serverName));
}
-
+
public String getExecutorName(String serverName) {
- // if this is NONE then there is no executor
- if(value == NONE.value) {
- return null;
- }
return (this.toString() + "-" + serverName);
}
}
-
-
/**
* Start an executor service with a given name. If there was a service already
* started with the same name, this throws a RuntimeException.
* @param name Name of the service to start.
*/
- public static void startExecutorService(String name) {
+ public static void startExecutorService(String name, int maxThreads) {
if(executorServicesMap.get(name) != null) {
throw new RuntimeException("An executor service with the name " + name + " is already running!");
}
- HBaseExecutorService hbes = new HBaseExecutorService(name);
+ HBaseExecutorService hbes = new HBaseExecutorService(name, maxThreads);
executorServicesMap.put(name, hbes);
LOG.debug("Starting executor service: " + name);
}
-
+
public static boolean isExecutorServiceRunning(String name) {
return (executorServicesMap.containsKey(name));
}
@@ -140,7 +129,7 @@ public class HBaseExecutorService
}
return executor;
}
-
+
public static void shutdown() {
for(Entry<String, HBaseExecutorService> entry : executorServicesMap.entrySet()) {
entry.getValue().threadPoolExecutor.shutdown();
@@ -148,16 +137,11 @@ public class HBaseExecutorService
executorServicesMap.clear();
}
- protected HBaseExecutorService(String name) {
+ protected HBaseExecutorService(String name, int maxThreads) {
this.name = name;
// create the thread pool executor
- threadPoolExecutor = new ThreadPoolExecutor(
- corePoolSize,
- maximumPoolSize,
- keepAliveTimeInMillis,
- TimeUnit.MILLISECONDS,
- workQueue
- );
+ threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxThreads,
+ keepAliveTimeInMillis, TimeUnit.MILLISECONDS, workQueue);
// name the threads for this threadpool
threadPoolExecutor.setThreadFactory(new NamedThreadFactory(name));
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/executor/RegionTransitionData.java Thu Aug 5 07:35:00 2010
@@ -23,8 +23,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import org.apache.hadoop.hbase.HMsg;
-import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.EventHandler.EventType;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
@@ -36,19 +36,16 @@ public class RegionTransitionData implem
* Type of transition event (offline, opening, opened, closing, closed).
* Required.
*/
- private HBaseEventType eventType;
+ private EventType eventType;
/** Region being transitioned. Required. */
- private String regionName;
+ private byte [] regionName;
/** Server event originated from. Optional. */
private String serverName;
/** Time the event was created. Required but automatically set. */
- private long timeStamp;
-
- /** Temporary. Holds payload used doing transitions via heartbeats. */
- private HMsg hmsg; // to be removed shortly once we stop using heartbeats
+ private long stamp;
/**
* Writable constructor. Do not use directly.
@@ -68,12 +65,12 @@ public class RegionTransitionData implem
* assignment.
*
* <p>Since only the master uses this constructor, the type should always be
- * {@link HBaseEventType#M2ZK_REGION_OFFLINE}.
+ * {@link EventType#M2ZK_REGION_OFFLINE}.
*
* @param eventType type of event
* @param regionName name of region
*/
- public RegionTransitionData(HBaseEventType eventType, String regionName) {
+ public RegionTransitionData(EventType eventType, byte [] regionName) {
this(eventType, regionName, null);
}
@@ -83,37 +80,20 @@ public class RegionTransitionData implem
*
* <p>Used when the server name is known (a regionserver is setting it).
*
- * <p>Valid types for this constructor are {@link HBaseEventType#RS2ZK_REGION_CLOSING},
- * {@link HBaseEventType#RS2ZK_REGION_CLOSED}, {@link HBaseEventType#RS2ZK_REGION_OPENING},
- * and {@link HBaseEventType#RS2ZK_REGION_OPENED}.
+ * <p>Valid types for this constructor are {@link EventType#RS2ZK_REGION_CLOSING},
+ * {@link EventType#RS2ZK_REGION_CLOSED}, {@link EventType#RS2ZK_REGION_OPENING},
+ * and {@link EventType#RS2ZK_REGION_OPENED}.
*
* @param eventType type of event
* @param regionName name of region
* @param serverName name of server setting data
*/
- public RegionTransitionData(HBaseEventType eventType, String regionName,
+ public RegionTransitionData(EventType eventType, byte [] regionName,
String serverName) {
- this(eventType, regionName, serverName, null);
- }
-
- /**
- * Construct data for a fully-specified, old-format region transition event
- * which uses HMsg/heartbeats.
- *
- * TODO: Remove this constructor once we stop using heartbeats.
- *
- * @param eventType
- * @param regionName
- * @param serverName
- * @param hmsg
- */
- public RegionTransitionData(HBaseEventType eventType, String regionName,
- String serverName, HMsg hmsg) {
this.eventType = eventType;
- this.timeStamp = System.currentTimeMillis();
+ this.stamp = System.currentTimeMillis();
this.regionName = regionName;
this.serverName = serverName;
- this.hmsg = hmsg;
}
/**
@@ -121,25 +101,25 @@ public class RegionTransitionData implem
*
* <p>One of:
* <ul>
- * <li>{@link HBaseEventType#M2ZK_REGION_OFFLINE}
- * <li>{@link HBaseEventType#RS2ZK_REGION_CLOSING}
- * <li>{@link HBaseEventType#RS2ZK_REGION_CLOSED}
- * <li>{@link HBaseEventType#RS2ZK_REGION_OPENING}
- * <li>{@link HBaseEventType#RS2ZK_REGION_OPENED}
+ * <li>{@link EventType#M2ZK_REGION_OFFLINE}
+ * <li>{@link EventType#RS2ZK_REGION_CLOSING}
+ * <li>{@link EventType#RS2ZK_REGION_CLOSED}
+ * <li>{@link EventType#RS2ZK_REGION_OPENING}
+ * <li>{@link EventType#RS2ZK_REGION_OPENED}
* </ul>
* @return type of region transition event
*/
- public HBaseEventType getEventType() {
+ public EventType getEventType() {
return eventType;
}
/**
- * Gets the encoded name of the region being transitioned.
+ * Gets the name of the region being transitioned.
*
* <p>Region name is required so this never returns null.
* @return region name
*/
- public String getRegionName() {
+ public byte [] getRegionName() {
return regionName;
}
@@ -156,54 +136,39 @@ public class RegionTransitionData implem
/**
* Gets the timestamp when this event was created.
*
- * @return time event was created
+ * @return stamp event was created
*/
- public long getTimeStamp() {
- return timeStamp;
- }
-
- /**
- * Gets the {@link HMsg} payload of this region transition event.
- * @return heartbeat payload
- */
- public HMsg getHmsg() {
- return hmsg;
+ public long getStamp() {
+ return stamp;
}
@Override
public void readFields(DataInput in) throws IOException {
// the event type byte
- eventType = HBaseEventType.fromByte(in.readByte());
+ eventType = EventType.values()[in.readShort()];
// the timestamp
- timeStamp = in.readLong();
+ stamp = in.readLong();
// the encoded name of the region being transitioned
- regionName = in.readUTF();
+ regionName = Bytes.readByteArray(in);
// remaining fields are optional so prefixed with boolean
// the name of the regionserver sending the data
if(in.readBoolean()) {
serverName = in.readUTF();
- }
- // hmsg
- if(in.readBoolean()) {
- hmsg = new HMsg();
- hmsg.readFields(in);
+ } else {
+ serverName = null;
}
}
@Override
public void write(DataOutput out) throws IOException {
- out.writeByte(eventType.getByteValue());
+ out.writeShort(eventType.ordinal());
out.writeLong(System.currentTimeMillis());
- out.writeUTF(regionName);
+ Bytes.writeByteArray(out, regionName);
// remaining fields are optional so prefixed with boolean
out.writeBoolean(serverName != null);
if(serverName != null) {
out.writeUTF(serverName);
}
- out.writeBoolean(hmsg != null);
- if(hmsg != null) {
- hmsg.write(out);
- }
}
/**
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java Thu Aug 5 07:35:00 2010
@@ -19,13 +19,11 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.IOException;
+
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.io.Writable;
-
-import java.io.IOException;
/**
* Clients interact with the HMasterInterface to gain access to meta-level
@@ -110,11 +108,10 @@ public interface HMasterInterface extend
* Modify a table's metadata
*
* @param tableName table to modify
- * @param op the operation to do
- * @param args arguments for operation
+ * @param htd new descriptor for table
* @throws IOException e
*/
- public void modifyTable(byte[] tableName, HConstants.Modify op, Writable[] args)
+ public void modifyTable(byte[] tableName, HTableDescriptor htd)
throws IOException;
/**
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Aug 5 07:35:00 2010
@@ -19,6 +19,9 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -31,9 +34,6 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import java.io.IOException;
-import java.util.List;
-
/**
* Clients interact with HRegionServers using a handle to the HRegionInterface.
*
@@ -279,10 +279,64 @@ public interface HRegionInterface extend
* @throws IOException e
*/
public MultiPutResponse multiPut(MultiPut puts) throws IOException;
-
+
/**
* Bulk load an HFile into an open region
*/
public void bulkLoadHFile(String hfilePath,
byte[] regionName, byte[] familyName) throws IOException;
+
+ // Master methods
+
+ /**
+ * Opens the specified region.
+ * @param region region to open
+ */
+ public void openRegion(final HRegionInfo region);
+
+ /**
+ * Closes the specified region.
+ * @param region region to close
+ * @return true if closing region, false if not
+ */
+ public boolean closeRegion(final HRegionInfo region)
+ throws NotServingRegionException;
+
+ // Region administrative methods
+
+ /**
+ * Flushes the MemStore of the specified region.
+ * <p>
+ * This method is synchronous.
+ * @param regionInfo region to flush
+ * @throws NotServingRegionException
+ * @throws IOException
+ */
+ void flushRegion(HRegionInfo regionInfo)
+ throws NotServingRegionException, IOException;
+
+ /**
+ * Splits the specified region.
+ * <p>
+ * This method currently flushes the region and then forces a compaction which
+ * will then trigger a split. The flush is done synchronously but the
+ * compaction is asynchronous.
+ * @param regionInfo region to split
+ * @throws NotServingRegionException
+ * @throws IOException
+ */
+ void splitRegion(HRegionInfo regionInfo)
+ throws NotServingRegionException, IOException;
+
+ /**
+ * Compacts the specified region. Performs a major compaction if specified.
+ * <p>
+ * This method is asynchronous.
+ * @param regionInfo region to compact
+ * @param major true to force major compaction
+ * @throws NotServingRegionException
+ * @throws IOException
+ */
+ void compactRegion(HRegionInfo regionInfo, boolean major)
+ throws NotServingRegionException, IOException;
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ActiveMasterManager.java Thu Aug 5 07:35:00 2010
@@ -48,10 +48,10 @@ public class ActiveMasterManager extends
final AtomicBoolean clusterHasActiveMaster = new AtomicBoolean(false);
private final HServerAddress address;
- private final MasterStatus status;
+ private final MasterController status;
ActiveMasterManager(ZooKeeperWatcher watcher, HServerAddress address,
- MasterStatus status) {
+ MasterController status) {
super(watcher);
this.address = address;
this.status = status;
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Thu Aug 5 07:35:00 2010
@@ -19,41 +19,91 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
-import org.apache.hadoop.hbase.master.handler.MasterCloseRegionHandler;
-import org.apache.hadoop.hbase.master.handler.MasterOpenRegionHandler;
+import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
+import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
+import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
+import org.apache.hadoop.io.Writable;
import org.apache.zookeeper.KeeperException;
/**
- * Manages region assignment.
- *
- * <p>Monitors ZooKeeper for events related to regions in transition.
- *
- * <p>Handles existing regions in transition during master failover.
+ * Manages and performs region assignment.
+ * <p>
+ * Monitors ZooKeeper for events related to regions in transition.
+ * <p>
+ * Handles existing regions in transition during master failover.
*/
public class AssignmentManager extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
- private MasterStatus status;
+ protected MasterController master;
private ServerManager serverManager;
- private RegionManager regionManager;
+ private CatalogTracker catalogTracker;
- private String serverName;
+ private TimeoutMonitor timeoutMonitor;
-// TODO: Eventually RIT will move here?
-// private final Map<String,RegionState> regionsInTransition =
-// new TreeMap<String,RegionState>();
+ /** Regions currently in transition. */
+ private final Map<String,RegionState> regionsInTransition =
+ new TreeMap<String,RegionState>();
+
+ /** Plans for region movement. */
+ private final Map<String,RegionPlan> regionPlans =
+ new TreeMap<String,RegionPlan>();
+
+ /** Set of tables that have been disabled. */
+ private final Set<String> disabledTables =
+ Collections.synchronizedSet(new HashSet<String>());
+
+ /**
+ * Server to regions assignment map.
+ * Contains the set of regions currently assigned to a given server.
+ */
+ private final SortedMap<HServerInfo,Set<HRegionInfo>> servers =
+ new TreeMap<HServerInfo,Set<HRegionInfo>>();
+
+ /**
+ * Region to server assignment map.
+ * Contains the server a given region is currently assigned to.
+ * This object should be used for all synchronization around servers/regions.
+ */
+ private final SortedMap<HRegionInfo,HServerInfo> regions =
+ new TreeMap<HRegionInfo,HServerInfo>();
/**
* Constructs a new assignment manager.
@@ -63,85 +113,211 @@ public class AssignmentManager extends Z
* @param watcher zookeeper watcher
* @param status master status
*/
- public AssignmentManager(ZooKeeperWatcher watcher, MasterStatus status,
- ServerManager serverManager, RegionManager regionManager) {
+ public AssignmentManager(ZooKeeperWatcher watcher, MasterController master,
+ ServerManager serverManager, CatalogTracker catalogTracker) {
super(watcher);
- this.status = status;
+ this.master = master;
this.serverManager = serverManager;
- this.regionManager = regionManager;
- serverName = status.getHServerAddress().toString();
+ this.catalogTracker = catalogTracker;
+ Configuration conf = master.getConfiguration();
+ this.timeoutMonitor = new TimeoutMonitor(
+ conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
+ master.getClosed(),
+ conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000));
+ Threads.setDaemonThreadRunning(timeoutMonitor,
+ master.getServerName() + ".timeoutMonitor");
}
/**
- * Starts the assignment manager.
- *
- * <p>This includes registering itself with ZooKeeper and handling
- * the initial state of whatever unassigned nodes already exist.
+ * Cluster startup. Reset all unassigned nodes and assign all user regions.
+ * @throws IOException
* @throws KeeperException
*/
- public void start() throws KeeperException {
- watcher.registerListener(this);
- if(status.isClusterStartup()) {
- processStartup();
- } else {
- processFailover();
- }
- }
-
- public synchronized void processStartup() throws KeeperException {
+ void processStartup() throws IOException, KeeperException {
+ // Cleanup any existing ZK nodes and start watching
ZKAssign.deleteAllNodes(watcher);
- ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode);
+ ZKUtil.listChildrenAndWatchForNewChildren(watcher,
+ watcher.assignmentZNode);
+ // Assign all existing user regions out
+ assignAllUserRegions();
}
/**
- * Handle failover.
+ * Handle failover. Restore state from META and ZK. Handle any regions in
+ * transition.
* @throws KeeperException
+ * @throws IOException
*/
- public synchronized void processFailover() throws KeeperException {
+ void processFailover() throws KeeperException, IOException {
+ // Scan META to build list of existing regions, servers, and assignment
+ rebuildUserRegions();
+ // Pickup any disabled tables
+ rebuildDisabledTables();
+ // Check existing regions in transition
List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.assignmentZNode);
if(nodes.isEmpty()) {
- LOG.info("No regions in transition in ZK, nothing to do for failover");
+ LOG.info("No regions in transition in ZK to process on failover");
return;
}
LOG.info("Failed-over master needs to process " + nodes.size() +
" regions in transition");
for(String regionName : nodes) {
RegionTransitionData data = ZKAssign.getData(watcher, regionName);
+ HRegionInfo regionInfo =
+ MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst();
+ String encodedName = regionInfo.getEncodedName();
switch(data.getEventType()) {
- case M2ZK_REGION_OFFLINE:
- // TODO: Generate new assignment and send OPEN RPC
- break;
case RS2ZK_REGION_CLOSING:
- // TODO: Only need to deal with timeouts.
+ // Just insert region into RIT
+ // If this never updates the timeout will trigger new assignment
+ regionsInTransition.put(encodedName,
+ new RegionState(regionInfo, RegionState.State.CLOSING,
+ data.getStamp()));
break;
+
case RS2ZK_REGION_CLOSED:
- // TODO: Generate new assignment and send OPEN RPC
+ // Region is closed, insert into RIT and handle it
+ regionsInTransition.put(encodedName,
+ new RegionState(regionInfo, RegionState.State.CLOSED,
+ data.getStamp()));
+ new ClosedRegionHandler(master, this, data, regionInfo).execute();
break;
+
case RS2ZK_REGION_OPENING:
- // TODO: Only need to deal with timeouts.
+ // Just insert region into RIT
+ // If this never updates the timeout will trigger new assignment
+ regionsInTransition.put(encodedName,
+ new RegionState(regionInfo, RegionState.State.OPENING,
+ data.getStamp()));
break;
+
case RS2ZK_REGION_OPENED:
- // TODO: Delete the node from ZK. Region successfully opened but not
- // acknowledged.
+ // Region is opened, insert into RIT and handle it
+ regionsInTransition.put(encodedName,
+ new RegionState(regionInfo, RegionState.State.OPENING,
+ data.getStamp()));
+ new OpenedRegionHandler(master, this, data, regionInfo,
+ serverManager.getServerInfo(data.getServerName())).execute();
break;
}
}
}
- private synchronized void handleRegion(RegionTransitionData data) {
- switch(data.getEventType()) {
- case RS2ZK_REGION_CLOSED:
- new MasterCloseRegionHandler(data.getEventType(), serverManager,
- serverName, data.getRegionName(), data.getBytes())
- .submit();
- break;
- case RS2ZK_REGION_OPENED:
- case RS2ZK_REGION_OPENING:
- new MasterOpenRegionHandler(data.getEventType(), serverManager,
- serverName, data.getRegionName(), data.getBytes())
- .submit();
- break;
+ /**
+ * Gets the region info for the region with the specified encoded name.
+ * <p>
+ * Currently this does a full scan of the regions map looking for a region
+ * with the specified encoded name.
+ * <p>
+ * Returns null if none found.
+ * @param regionName
+ * @return
+ * @deprecated should be able to remove this now?
+ */
+ @Deprecated
+ private HRegionInfo getRegionInfoFromEncoded(String encodedName) {
+ for(HRegionInfo regionInfo : regions.keySet()) {
+ if(regionInfo.getEncodedName().equals(encodedName)) {
+ return regionInfo;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Handles various states an unassigned node can be in.
+ * <p>
+ * Method is called when a state change is suspected for an unassigned node.
+ * <p>
+ * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
+ * yet).
+ * @param data
+ */
+ private void handleRegion(RegionTransitionData data) {
+ synchronized(regionsInTransition) {
+ // Verify this is a known server
+ if(!serverManager.isServerOnline(data.getServerName())) {
+ LOG.warn("Attempted to handle region transition for server " +
+ data.getServerName() + " but server is not online");
+ return;
+ }
+ String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
+ LOG.debug("Attempting to handle region transition for server " +
+ data.getServerName() + " and region " + encodedName);
+ RegionState regionState = regionsInTransition.get(encodedName);
+ switch(data.getEventType()) {
+
+ case RS2ZK_REGION_CLOSING:
+ // Should see CLOSING after we have asked it to CLOSE or additional
+ // times after already being in state of CLOSING
+ if(regionState == null ||
+ (!regionState.isPendingClose() && !regionState.isClosing())) {
+ LOG.warn("Received CLOSING for region " + encodedName +
+ " from server " + data.getServerName() + " but region was in " +
+ " the state " + regionState + " and not " +
+ "in expected PENDING_CLOSE or CLOSING states");
+ return;
+ }
+ // Transition to CLOSING (or update stamp if already CLOSING)
+ regionState.update(RegionState.State.CLOSING, data.getStamp());
+ break;
+
+ case RS2ZK_REGION_CLOSED:
+ // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
+ if(regionState == null ||
+ (!regionState.isPendingClose() && !regionState.isClosing())) {
+ LOG.warn("Received CLOSED for region " + encodedName +
+ " from server " + data.getServerName() + " but region was in " +
+ " the state " + regionState + " and not " +
+ "in expected PENDING_CLOSE or CLOSING states");
+ return;
+ }
+ // Handle CLOSED by assigning elsewhere or stopping if a disable
+ new ClosedRegionHandler(master, this, data, regionState.getRegion())
+ .submit();
+ break;
+
+ case RS2ZK_REGION_OPENING:
+ // Should see OPENING after we have asked it to OPEN or additional
+ // times after already being in state of OPENING
+ if(regionState == null ||
+ (!regionState.isPendingOpen() && !regionState.isOpening())) {
+ LOG.warn("Received OPENING for region " + encodedName +
+ " from server " + data.getServerName() + " but region was in " +
+ " the state " + regionState + " and not " +
+ "in expected PENDING_OPEN or OPENING states");
+ return;
+ }
+ // Transition to OPENING (or update stamp if already OPENING)
+ regionState.update(RegionState.State.OPENING, data.getStamp());
+ break;
+
+ case RS2ZK_REGION_OPENED:
+ // Should see OPENED after OPENING but possible after PENDING_OPEN
+ if(regionState == null ||
+ (!regionState.isPendingOpen() && !regionState.isOpening())) {
+ LOG.warn("Received OPENED for region " + encodedName +
+ " from server " + data.getServerName() + " but region was in " +
+ " the state " + regionState + " and not " +
+ "in expected PENDING_OPEN or OPENING states");
+ return;
+ }
+ // If this is a catalog table, update catalog manager accordingly
+ // Moving root and meta editing over to RS who does the opening
+ LOG.debug("Processing OPENED for region " + regionState.getRegion() +
+ " which isMeta[" + regionState.getRegion().isMetaRegion() + "] " +
+ " isRoot[" + regionState.getRegion().isRootRegion() + "]");
+
+ // Used to have updating of root/meta locations here but it's
+ // automatic in CatalogTracker now
+
+ // Handle OPENED by removing from transition and deleted zk node
+ new OpenedRegionHandler(master, this, data, regionState.getRegion(),
+ serverManager.getServerInfo(data.getServerName()))
+ .submit();
+ break;
+ }
}
}
@@ -160,17 +336,19 @@ public class AssignmentManager extends Z
* </ol>
*/
@Override
- public synchronized void nodeCreated(String path) {
+ public void nodeCreated(String path) {
if(path.startsWith(watcher.assignmentZNode)) {
- try {
- RegionTransitionData data = ZKAssign.getData(watcher, path);
- if(data == null) {
- return;
+ synchronized(regionsInTransition) {
+ try {
+ RegionTransitionData data = ZKAssign.getData(watcher, path);
+ if(data == null) {
+ return;
+ }
+ handleRegion(data);
+ } catch (KeeperException e) {
+ LOG.error("Unexpected ZK exception reading unassigned node data", e);
+ master.abort();
}
- handleRegion(data);
- } catch (KeeperException e) {
- LOG.error("Unexpected ZK exception reading unassigned node data", e);
- status.abort();
}
}
}
@@ -188,17 +366,19 @@ public class AssignmentManager extends Z
* </ol>
*/
@Override
- public synchronized void nodeDataChanged(String path) {
+ public void nodeDataChanged(String path) {
if(path.startsWith(watcher.assignmentZNode)) {
- try {
- RegionTransitionData data = ZKAssign.getData(watcher, path);
- if(data == null) {
- return;
+ synchronized(regionsInTransition) {
+ try {
+ RegionTransitionData data = ZKAssign.getData(watcher, path);
+ if(data == null) {
+ return;
+ }
+ handleRegion(data);
+ } catch (KeeperException e) {
+ LOG.error("Unexpected ZK exception reading unassigned node data", e);
+ master.abort();
}
- handleRegion(data);
- } catch (KeeperException e) {
- LOG.error("Unexpected ZK exception reading unassigned node data", e);
- status.abort();
}
}
}
@@ -217,19 +397,589 @@ public class AssignmentManager extends Z
* </ol>
*/
@Override
- public synchronized void nodeChildrenChanged(String path) {
+ public void nodeChildrenChanged(String path) {
if(path.equals(watcher.assignmentZNode)) {
+ synchronized(regionsInTransition) {
+ try {
+ List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
+ watcher.assignmentZNode);
+ for(NodeAndData newNode : newNodes) {
+ LOG.debug("Handling new unassigned node: " + newNode);
+ handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
+ }
+ } catch(KeeperException e) {
+ LOG.error("Unexpected ZK exception reading unassigned children", e);
+ master.abort();
+ }
+ }
+ }
+ }
+
+ /**
+ * Marks the region as online. Removes it from regions in transition and
+ * updates the in-memory assignment information.
+ * <p>
+ * Used when a region has been successfully opened on a region server.
+ * @param regionInfo
+ * @param serverInfo
+ */
+ public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) {
+ synchronized(regionsInTransition) {
+ regionsInTransition.remove(regionInfo.getEncodedName());
+ regionsInTransition.notifyAll();
+ }
+ synchronized(regions) {
+ regions.put(regionInfo, serverInfo);
+ Set<HRegionInfo> regionSet = servers.get(serverInfo);
+ if(regionSet == null) {
+ regionSet = new TreeSet<HRegionInfo>();
+ servers.put(serverInfo, regionSet);
+ }
+ regionSet.add(regionInfo);
+ }
+ }
+
+ /**
+ * Marks the region as offline. Removes it from regions in transition and
+ * removes in-memory assignment information.
+ * <p>
+ * Used when a region has been closed and should remain closed.
+ * @param regionInfo
+ * @param serverInfo
+ */
+ public void regionOffline(HRegionInfo regionInfo) {
+ synchronized(regionsInTransition) {
+ regionsInTransition.remove(regionInfo.getEncodedName());
+ regionsInTransition.notifyAll();
+ }
+ synchronized(regions) {
+ HServerInfo serverInfo = regions.remove(regionInfo);
+ Set<HRegionInfo> serverRegions = servers.get(serverInfo);
+ serverRegions.remove(regionInfo);
+ }
+ }
+
+ /**
+ * Sets the region as offline by removing in-memory assignment information but
+ * retaining transition information.
+ * <p>
+ * Used when a region has been closed but should be reassigned.
+ * @param regionInfo
+ */
+ public void setOffline(HRegionInfo regionInfo) {
+ synchronized(regions) {
+ HServerInfo serverInfo = regions.remove(regionInfo);
+ Set<HRegionInfo> serverRegions = servers.get(serverInfo);
+ serverRegions.remove(regionInfo);
+ }
+ }
+
+ // Assignment methods
+
+ /**
+ * Assigns the specified region.
+ * <p>
+ * If a RegionPlan is available with a valid destination then it will be used
+ * to determine what server region is assigned to. If no RegionPlan is
+ * available, region will be assigned to a random available server.
+ * <p>
+ * Updates the RegionState and sends the OPEN RPC.
+ * <p>
+ * This will only succeed if the region is in transition and in a CLOSED or
+ * OFFLINE state or not in transition (in-memory not zk). If the in-memory
+ * checks pass, the zk node is forced to OFFLINE before assigning.
+ *
+ * @param regionName server to be assigned
+ */
+ public void assign(HRegionInfo region) {
+ LOG.debug("Starting assignment for region " + region);
+ // Grab the state of this region and synchronize on it
+ String regionName = region.getEncodedName();
+ RegionState state;
+ synchronized(regionsInTransition) {
+ state = regionsInTransition.get(regionName);
+ if(state == null) {
+ state = new RegionState(region, RegionState.State.OFFLINE);
+ regionsInTransition.put(regionName, state);
+ }
+ }
+ synchronized(state) {
+ if(!state.isClosed() && !state.isOffline()) {
+ LOG.info("Attempting to assign region but it is in transition and in " +
+ "an unexpected state:" + state);
+ return;
+ } else {
+ state.update(RegionState.State.OFFLINE);
+ }
+ try {
+ if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(), region,
+ master.getServerName())) {
+ LOG.warn("Attempted to create/force node into OFFLINE state before " +
+ "completing assignment but failed to do so");
+ return;
+ }
+ } catch (KeeperException e) {
+ LOG.error("Unexpected ZK exception creating/setting node OFFLINE", e);
+ master.abort();
+ return;
+ }
+ // Pickup existing plan or make a new one
+ RegionPlan plan;
+ synchronized(regionPlans) {
+ plan = regionPlans.get(regionName);
+ if(plan == null) {
+ LOG.debug("No previous transition plan for " + regionName +
+ " so generating a random one from " + serverManager.numServers() +
+ " ( " + serverManager.getOnlineServers().size() + ") available servers");
+ plan = new RegionPlan(regionName, null,
+ LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
+ regionPlans.put(regionName, plan);
+ }
+ }
+ // Transition RegionState to PENDING_OPEN and send OPEN RPC
+ state.update(RegionState.State.PENDING_OPEN);
+ serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
+ }
+ }
+
+ /**
+ * Unassigns the specified region.
+ * <p>
+ * Updates the RegionState and sends the OPEN RPC.
+ * <p>
+ * If a RegionPlan is already set, it will remain. If this is being used
+ * to disable a table, be sure to use {@link #disableTable(String)} to ensure
+ * regions are not onlined after being closed.
+ *
+ * @param regionName server to be unassigned
+ */
+ public void unassign(HRegionInfo region) {
+ LOG.debug("Starting unassignment of region " + region + " (offlining)");
+ // Check if this region is currently assigned
+ if (!regions.containsKey(region)) {
+ LOG.debug("Attempted to unassign region " + region + " but it is not " +
+ "currently assigned anywhere");
+ return;
+ }
+ String regionName = region.getEncodedName();
+ // Grab the state of this region and synchronize on it
+ RegionState state;
+ synchronized(regionsInTransition) {
+ state = regionsInTransition.get(regionName);
+ if(state == null) {
+ state = new RegionState(region, RegionState.State.PENDING_CLOSE);
+ regionsInTransition.put(regionName, state);
+ } else {
+ LOG.debug("Attempting to unassign region " + region + " but it is " +
+ "already in transition (" + state.getState() + ")");
+ return;
+ }
+ }
+ // Send OPEN RPC
+ try {
+ serverManager.sendRegionClose(regions.get(region), state.getRegion());
+ } catch (NotServingRegionException e) {
+ LOG.warn("Attempted to close region " + region + " but got an NSRE", e);
+ }
+ }
+
+ /**
+ * Waits until the specified region has completed assignment.
+ * <p>
+ * If the region is already assigned, returns immediately. Otherwise, method
+ * blocks until the region is assigned.
+ * @param regionInfo region to wait on assignment for
+ * @throws InterruptedException
+ */
+ public void waitForAssignment(HRegionInfo regionInfo)
+ throws InterruptedException {
+ synchronized(regions) {
+ while(!regions.containsKey(regionInfo)) {
+ regions.wait();
+ }
+ }
+ }
+
+ /**
+ * Assigns the ROOT region.
+ * <p>
+ * Assumes that ROOT is currently closed and is not being actively served by
+ * any RegionServer.
+ * <p>
+ * Forcibly unsets the current root region location in ZooKeeper and assigns
+ * ROOT to a random RegionServer.
+ */
+ public void assignRoot() {
+ // Force assignment to a random server
+ assign(HRegionInfo.ROOT_REGIONINFO);
+ }
+
+ /**
+ * Assigns the META region.
+ * <p>
+ * Assumes that META is currently closed and is not being actively served by
+ * any RegionServer.
+ * <p>
+ * Forcibly assigns META to a random RegionServer.
+ */
+ public void assignMeta() {
+ // Force assignment to a random server
+ assign(HRegionInfo.FIRST_META_REGIONINFO);
+ }
+
+ /**
+ * Assigns all user regions, if any exist. Used during cluster startup.
+ * <p>
+ * This is a synchronous call and will return once every region has been
+ * assigned. If anything fails, an exception is thrown and the cluster
+ * should be shutdown.
+ */
+ public void assignAllUserRegions() throws IOException {
+ // First experiment at synchronous assignment
+ // Simpler because just wait for no regions in transition
+
+ // Scan META for all user regions
+ List<HRegionInfo> allRegions = MetaScanner.listAllRegions(
+ master.getConfiguration());
+ if(allRegions == null || allRegions.isEmpty()) {
+ return;
+ }
+
+ // Get all available servers
+ List<HServerInfo> servers = serverManager.getOnlineServersList();
+
+ LOG.info("Assigning " + allRegions.size() + " across " + servers.size() +
+ " servers");
+
+ // Generate a cluster startup region placement plan
+ Map<HServerInfo,List<HRegionInfo>> bulkPlan =
+ LoadBalancer.bulkAssignment(allRegions, servers);
+
+ // For each server, create OFFLINE nodes and send OPEN RPCs
+ for(Map.Entry<HServerInfo,List<HRegionInfo>> entry : bulkPlan.entrySet()) {
+ HServerInfo server = entry.getKey();
+ List<HRegionInfo> regions = entry.getValue();
+ LOG.debug("Assigning " + regions.size() + " regions to " + server);
+ for(HRegionInfo region : regions) {
+ LOG.debug("Assigning " + region + " to " + server);
+ String regionName = region.getEncodedName();
+ RegionPlan plan = new RegionPlan(regionName, null,server);
+ regionPlans.put(regionName, plan);
+ assign(region);
+ }
+ }
+
+ // Wait for no regions to be in transition
+ try {
+ waitUntilNoRegionsInTransition();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for regions to be assigned", e);
+ throw new IOException(e);
+ }
+
+ LOG.info("\n\nAll user regions have been assigned");
+ }
+
+ private void rebuildUserRegions() throws IOException {
+ Map<HRegionInfo,HServerAddress> allRegions =
+ MetaReader.fullScan(catalogTracker);
+ for(Map.Entry<HRegionInfo,HServerAddress> region : allRegions.entrySet()) {
+ HServerAddress regionLocation = region.getValue();
+ HRegionInfo regionInfo = region.getKey();
+ if(regionLocation == null) {
+ regions.put(regionInfo, null);
+ continue;
+ }
+ HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation);
+ regions.put(regionInfo, serverInfo);
+ Set<HRegionInfo> regionSet = servers.get(serverInfo);
+ if(regionSet == null) {
+ regionSet = new TreeSet<HRegionInfo>();
+ servers.put(serverInfo, regionSet);
+ }
+ regionSet.add(regionInfo);
+ }
+ }
+
+ /**
+ * Blocks until there are no regions in transition. It is possible that there
+ * are regions in transition immediately after this returns but guarantees
+ * that if it returns without an exception that there was a period of time
+ * with no regions in transition from the point-of-view of the in-memory
+ * state of the Master.
+ * @throws InterruptedException
+ */
+ public void waitUntilNoRegionsInTransition() throws InterruptedException {
+ synchronized(regionsInTransition) {
+ while(regionsInTransition.size() > 0) {
+ regionsInTransition.wait();
+ }
+ }
+ }
+
+ /**
+ * Gets the map of regions currently in transition.
+ * @return
+ */
+ public Map<String, RegionState> getRegionsInTransition() {
+ return regionsInTransition;
+ }
+
+ /**
+ * Checks if the specified table has been disabled by the user.
+ * @param tableName
+ * @return
+ */
+ public boolean isTableDisabled(String tableName) {
+ synchronized(disabledTables) {
+ return disabledTables.contains(tableName);
+ }
+ }
+
+ /**
+ * Checks if the table of the specified region has been disabled by the user.
+ * @param regionName
+ * @return
+ */
+ public boolean isTableOfRegionDisabled(byte [] regionName) {
+ return isTableDisabled(Bytes.toString(
+ HRegionInfo.getTableName(regionName)));
+ }
+
+ /**
+ * Sets the specified table to be disabled.
+ * @param tableName table to be disabled
+ */
+ public void disableTable(String tableName) {
+ synchronized(disabledTables) {
+ if(!isTableDisabled(tableName)) {
+ disabledTables.add(tableName);
+ try {
+ ZKTableDisable.disableTable(master.getZooKeeper(), tableName);
+ } catch (KeeperException e) {
+ LOG.warn("ZK error setting table as disabled", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Unsets the specified table from being disabled.
+ * <p>
+ * This operation only acts on the in-memory
+ * @param tableName table to be undisabled
+ */
+ public void undisableTable(String tableName) {
+ synchronized(disabledTables) {
+ if(isTableDisabled(tableName)) {
+ disabledTables.remove(tableName);
+ try {
+ ZKTableDisable.undisableTable(master.getZooKeeper(), tableName);
+ } catch (KeeperException e) {
+ LOG.warn("ZK error setting table as disabled", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Rebuild the set of disabled tables from zookeeper. Used during master
+ * failover.
+ */
+ private void rebuildDisabledTables() {
+ synchronized(disabledTables) {
+ List<String> disabledTables;
try {
- List<NodeAndData> newNodes = ZKUtil.watchAndGetNewChildren(watcher,
- watcher.assignmentZNode);
- for(NodeAndData newNode : newNodes) {
- LOG.debug("Handling new unassigned node: " + newNode);
- handleRegion(RegionTransitionData.fromBytes(newNode.getData()));
- }
- } catch(KeeperException e) {
- LOG.error("Unexpected ZK exception reading unassigned children", e);
- status.abort();
+ disabledTables = ZKTableDisable.getDisabledTables(master.getZooKeeper());
+ } catch (KeeperException e) {
+ LOG.warn("ZK error getting list of disabled tables", e);
+ return;
}
+ if(!disabledTables.isEmpty()) {
+ LOG.info("Rebuilt list of " + disabledTables.size() + " disabled " +
+ "tables from zookeeper");
+ disabledTables.addAll(disabledTables);
+ }
+ }
+ }
+
+ /**
+ * Gets the online regions of the specified table.
+ * @param tableName
+ * @return
+ */
+ public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
+ List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
+ for(HRegionInfo regionInfo : regions.tailMap(new HRegionInfo(
+ new HTableDescriptor(tableName), null, null)).keySet()) {
+ if(Bytes.equals(regionInfo.getTableDesc().getName(), tableName)) {
+ tableRegions.add(regionInfo);
+ } else {
+ break;
+ }
+ }
+ return tableRegions;
+ }
+
+ /**
+ * Unsets the specified table as disabled (enables it).
+ */
+ public class TimeoutMonitor extends Chore {
+
+ private final int timeout;
+
+ /**
+ * Creates a periodic monitor to check for time outs on region transition
+ * operations. This will deal with retries if for some reason something
+ * doesn't happen within the specified timeout.
+ * @param period
+ * @param stop
+ * @param timeout
+ */
+ public TimeoutMonitor(final int period, final AtomicBoolean stop,
+ final int timeout) {
+ super("AssignmentTimeoutMonitor", period, stop);
+ this.timeout = timeout;
+ }
+
+ @Override
+ protected void chore() {
+ synchronized(regionsInTransition) {
+ // Iterate all regions in transition checking for time outs
+ long now = System.currentTimeMillis();
+ for(RegionState regionState : regionsInTransition.values()) {
+ if(regionState.getStamp() + timeout <= now) {
+ HRegionInfo regionInfo = regionState.getRegion();
+ String regionName = regionInfo.getEncodedName();
+ LOG.info("Region transition timed out for region " + regionName);
+ // Expired! Do a retry.
+ switch(regionState.getState()) {
+ case OFFLINE:
+ case CLOSED:
+ LOG.info("Region has been OFFLINE or CLOSED for too long, " +
+ "reassigning " + regionInfo.getRegionNameAsString());
+ assign(regionState.getRegion());
+ break;
+ case PENDING_OPEN:
+ case OPENING:
+ LOG.info("Region has been PENDING_OPEN or OPENING for too " +
+ "long, reassigning " + regionInfo.getRegionNameAsString());
+ assign(regionState.getRegion());
+ break;
+ case OPEN:
+ LOG.warn("Long-running region in OPEN state? This should " +
+ "not happen");
+ break;
+ case PENDING_CLOSE:
+ case CLOSING:
+ LOG.info("Region has been PENDING_CLOSE or CLOSING for too " +
+ "long, resending close rpc");
+ unassign(regionInfo);
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ public static class RegionState implements Writable {
+ private HRegionInfo region;
+
+ public enum State {
+ OFFLINE, // region is in an offline state
+ PENDING_OPEN, // sent rpc to server to open but has not begun
+ OPENING, // server has begun to open but not yet done
+ OPEN, // server opened region and updated meta
+ PENDING_CLOSE, // sent rpc to server to close but has not begun
+ CLOSING, // server has begun to close but not yet done
+ CLOSED // server closed region and updated meta
+ }
+
+ private State state;
+ private long stamp;
+
+ public RegionState() {}
+
+ RegionState(HRegionInfo region, State state) {
+ this(region, state, System.currentTimeMillis());
+ }
+
+ RegionState(HRegionInfo region, State state, long stamp) {
+ this.region = region;
+ this.state = state;
+ this.stamp = stamp;
+ }
+
+ public void update(State state, long stamp) {
+ this.state = state;
+ this.stamp = stamp;
+ }
+
+ public void update(State state) {
+ this.state = state;
+ this.stamp = System.currentTimeMillis();
+ }
+
+ public State getState() {
+ return state;
+ }
+
+ public long getStamp() {
+ return stamp;
+ }
+
+ public HRegionInfo getRegion() {
+ return region;
+ }
+
+ public boolean isClosing() {
+ return state == State.CLOSING;
+ }
+
+ public boolean isClosed() {
+ return state == State.CLOSED;
+ }
+
+ public boolean isPendingClose() {
+ return state == State.PENDING_CLOSE;
+ }
+
+ public boolean isOpening() {
+ return state == State.OPENING;
+ }
+
+ public boolean isOpened() {
+ return state == State.OPEN;
+ }
+
+ public boolean isPendingOpen() {
+ return state == State.PENDING_OPEN;
+ }
+
+ public boolean isOffline() {
+ return state == State.OFFLINE;
+ }
+
+ @Override
+ public String toString() {
+ return "RegionState (" + region.getRegionNameAsString() + ") " + state +
+ " at time " + stamp;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ region = new HRegionInfo();
+ region.readFields(in);
+ state = State.valueOf(in.readUTF());
+ stamp = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ region.write(out);
+ out.writeUTF(state.name());
+ out.writeLong(stamp);
}
}
}
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/FileSystemManager.java Thu Aug 5 07:35:00 2010
@@ -34,13 +34,14 @@ import org.apache.hadoop.hbase.HConstant
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
/**
- * This class abstract a bunch of operations the HMaster needs to interact with
- * the underlying file system, including splitting log files, checking file
+ * This class abstract a bunch of operations the HMaster needs to interact with
+ * the underlying file system, including splitting log files, checking file
* system status, etc.
*/
public class FileSystemManager {
@@ -48,7 +49,7 @@ public class FileSystemManager {
// HBase configuration
Configuration conf;
// master status
- MasterStatus masterStatus;
+ MasterController masterStatus;
// Keep around for convenience.
private final FileSystem fs;
// Is the fileystem ok?
@@ -59,8 +60,8 @@ public class FileSystemManager {
private final Path rootdir;
// create the split log lock
final Lock splitLogLock = new ReentrantLock();
-
- public FileSystemManager(Configuration conf, MasterStatus masterStatus) throws IOException {
+
+ public FileSystemManager(Configuration conf, MasterController masterStatus) throws IOException {
this.conf = conf;
this.masterStatus = masterStatus;
// Set filesystem to be that of this.rootdir else we get complaints about
@@ -105,7 +106,7 @@ public class FileSystemManager {
public Path getOldLogDir() {
return this.oldLogDir;
}
-
+
/**
* Checks to see if the file system is still accessible.
* If not, sets closed
@@ -123,7 +124,7 @@ public class FileSystemManager {
}
return this.fsOk;
}
-
+
/**
* @return HBase root dir.
* @throws IOException
@@ -131,20 +132,22 @@ public class FileSystemManager {
public Path getRootDir() {
return this.rootdir;
}
-
+
public Lock getSplitLogLock() {
return splitLogLock;
}
-
- /*
+
+ /**
* Inspect the log directory to recover any log file without
- * ad active region server.
+ * an active region server.
*/
public void splitLogAfterStartup() {
Path logsDirPath =
new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
try {
- if (!this.fs.exists(logsDirPath)) return;
+ if (!this.fs.exists(logsDirPath)) {
+ return;
+ }
} catch (IOException e) {
throw new RuntimeException("Could exists for " + logsDirPath, e);
}
@@ -179,8 +182,8 @@ public class FileSystemManager {
}
}
}
-
- /*
+
+ /**
* Get the rootdir. Make sure its wholesome and exists before returning.
* @param rd
* @param conf
@@ -238,7 +241,7 @@ public class FileSystemManager {
}
}
- /*
+ /**
* @param hri Set all family block caching to <code>b</code>
* @param b
*/
@@ -250,4 +253,29 @@ public class FileSystemManager {
}
}
}
+
+ public void deleteRegion(HRegionInfo region) throws IOException {
+ fs.delete(HRegion.getRegionDir(rootdir, region), true);
+ }
+
+ public void deleteTable(byte[] tableName) throws IOException {
+ fs.delete(new Path(rootdir, Bytes.toString(tableName)), true);
+ }
+
+ public void updateRegionInfo(HRegionInfo region) {
+ // TODO implement this. i think this is currently broken in trunk i don't
+ // see this getting updated.
+ // @see HRegion.checkRegioninfoOnFilesystem()
+ }
+
+ public void addFamily(HRegionInfo region, byte[] familyName) {
+ // TODO Looks like the family directory is just created on the first flush?
+ }
+
+ public void deleteFamily(HRegionInfo region, byte[] familyName)
+ throws IOException {
+ fs.delete(Store.getStoreHomedir(
+ new Path(rootdir, region.getTableDesc().getNameAsString()),
+ region.getEncodedName(), familyName), true);
+ }
}