You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/09/24 22:33:20 UTC
svn commit: r1389561 [1/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/master/handler/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/zookeeper/ test...
Author: jxiang
Date: Mon Sep 24 20:33:19 2012
New Revision: 1389561
URL: http://svn.apache.org/viewvc?rev=1389561&view=rev
Log:
HBASE-6381 AssignmentManager should use the same logic for clean startup and failover
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (with props)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Mon Sep 24 20:33:19 2012
@@ -19,12 +19,10 @@
package org.apache.hadoop.hbase.master;
import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -33,6 +31,7 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -62,13 +61,11 @@ import org.apache.hadoop.hbase.master.ha
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
-import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@@ -99,13 +96,13 @@ public class AssignmentManager extends Z
public static final ServerName HBCK_CODE_SERVERNAME = new ServerName(HConstants.HBCK_CODE_NAME,
-1, -1L);
- protected Server master;
+ protected Server server;
private ServerManager serverManager;
private CatalogTracker catalogTracker;
- private TimeoutMonitor timeoutMonitor;
+ final TimeoutMonitor timeoutMonitor;
private TimerUpdater timerUpdater;
@@ -114,11 +111,6 @@ public class AssignmentManager extends Z
final private KeyLocker<String> locker = new KeyLocker<String>();
/**
- * Used for assignment only. TODO: revisit the assign lock scheme
- */
- final private KeyLocker<String> assignLocker = new KeyLocker<String>();
-
- /**
* Map of regions to reopen after the schema of a table is changed. Key -
* encoded region name, value - HRegionInfo
*/
@@ -138,11 +130,6 @@ public class AssignmentManager extends Z
private final ZKTable zkTable;
- // store all the table names in disabling state
- Set<String> disablingTables = new HashSet<String>();
- // store all the enabling state tablenames.
- Set<String> enablingTables = new HashSet<String>();
-
/**
* Contains the server which need to update timer, these servers will be
* handled by {@link TimerUpdater}
@@ -158,62 +145,59 @@ public class AssignmentManager extends Z
private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
- /**
- * Set when we are doing master failover processing; cleared when failover
- * completes.
- */
- private volatile boolean failover = false;
-
- // Set holding all the regions which got processed while RIT was not
- // populated during master failover.
- private Map<String, HRegionInfo> failoverProcessedRegions =
- new HashMap<String, HRegionInfo>();
+ // metrics instance to send metrics for RITs
+ MasterMetrics masterMetrics;
- // metrics instance to send metrics for RITs
- MasterMetrics masterMetrics;
+ private final RegionStates regionStates;
- private final RegionStates regionStates;
+ /**
+ * Indicator that AssignmentManager has recovered the region states so
+ * that ServerShutdownHandler can be fully enabled and re-assign regions
+ * of dead servers. So that when re-assignment happens, AssignmentManager
+ * has proper region states.
+ */
+ final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
/**
* Constructs a new assignment manager.
*
- * @param master
+ * @param server
* @param serverManager
* @param catalogTracker
* @param service
* @throws KeeperException
* @throws IOException
*/
- public AssignmentManager(Server master, ServerManager serverManager,
+ public AssignmentManager(Server server, ServerManager serverManager,
CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service, MasterMetrics metrics) throws KeeperException, IOException {
- super(master.getZooKeeper());
- this.master = master;
+ super(server.getZooKeeper());
+ this.server = server;
this.serverManager = serverManager;
this.catalogTracker = catalogTracker;
this.executorService = service;
this.regionsToReopen = Collections.synchronizedMap
(new HashMap<String, HRegionInfo> ());
- Configuration conf = master.getConfiguration();
+ Configuration conf = server.getConfiguration();
this.timeoutMonitor = new TimeoutMonitor(
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
- master, serverManager,
+ server, serverManager,
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
this.timerUpdater = new TimerUpdater(conf.getInt(
- "hbase.master.assignment.timerupdater.period", 10000), master);
+ "hbase.master.assignment.timerupdater.period", 10000), server);
Threads.setDaemonThreadRunning(timerUpdater.getThread(),
- master.getServerName() + ".timerUpdater");
- this.zkTable = new ZKTable(this.master.getZooKeeper());
+ server.getServerName() + ".timerUpdater");
+ this.zkTable = new ZKTable(this.watcher);
this.maximumAssignmentAttempts =
- this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
+ this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = balancer;
this.threadPoolExecutorService = Executors.newCachedThreadPool();
this.masterMetrics = metrics;// can be null only with tests.
- this.regionStates = new RegionStates(master, serverManager);
+ this.regionStates = new RegionStates(server, serverManager);
}
void startTimeOutMonitor() {
- Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName()
+ Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), server.getServerName()
+ ".timeoutMonitor");
}
@@ -283,7 +267,7 @@ public class AssignmentManager extends Z
public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
throws IOException {
List <HRegionInfo> hris =
- MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
+ MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName);
Integer pending = 0;
for (HRegionInfo hri : hris) {
String name = hri.getEncodedName();
@@ -295,17 +279,23 @@ public class AssignmentManager extends Z
}
return new Pair<Integer, Integer>(pending, hris.size());
}
+
/**
- * Reset all unassigned znodes. Called on startup of master.
- * Call {@link #assignAllUserRegions()} after root and meta have been assigned.
- * @throws IOException
- * @throws KeeperException
+ * Used by ServerShutdownHandler to make sure AssignmentManager has completed
+ * the failover cleanup before re-assigning regions of dead servers. So that
+ * when re-assignment happens, AssignmentManager has proper region states.
*/
- void cleanoutUnassigned() throws IOException, KeeperException {
- // Cleanup any existing ZK nodes and start watching
- ZKAssign.deleteAllNodes(watcher);
- ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
- this.watcher.assignmentZNode);
+ public boolean isFailoverCleanupDone() {
+ return failoverCleanupDone.get();
+ }
+
+ /**
+ * Now, failover cleanup is completed. Notify server manager to
+ * process queued up dead servers processing, if any.
+ */
+ void failoverCleanupDone() {
+ failoverCleanupDone.set(true);
+ serverManager.processQueuedDeadServers();
}
/**
@@ -327,17 +317,15 @@ public class AssignmentManager extends Z
// Scan META to build list of existing regions, servers, and assignment
// Returns servers who have not checked in (assumed dead) and their regions
- Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions();
+ Map<ServerName, List<HRegionInfo>> deadServers = rebuildUserRegions();
// This method will assign all user regions if a clean server startup or
- // it will reconstitute master state and cleanup any leftovers from
+ // it will reconstruct master state and cleanup any leftovers from
// previous master process.
processDeadServersAndRegionsInTransition(deadServers);
- // Recover the tables that were not fully moved to DISABLED state.
- // These tables are in DISABLING state when the master restarted/switched.
- boolean isWatcherCreated = recoverTableInDisablingState(this.disablingTables);
- recoverTableInEnablingState(this.enablingTables, isWatcherCreated);
+ recoverTableInDisablingState();
+ recoverTableInEnablingState();
}
/**
@@ -352,58 +340,47 @@ public class AssignmentManager extends Z
* @throws InterruptedException
*/
void processDeadServersAndRegionsInTransition(
- final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
- throws KeeperException, IOException, InterruptedException {
- List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
+ final Map<ServerName, List<HRegionInfo>> deadServers)
+ throws KeeperException, IOException, InterruptedException {
+ List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
watcher.assignmentZNode);
if (nodes == null) {
String errorMessage = "Failed to get the children from ZK";
- master.abort(errorMessage, new IOException(errorMessage));
+ server.abort(errorMessage, new IOException(errorMessage));
return;
}
- // Run through all regions. If they are not assigned and not in RIT, then
- // its a clean cluster startup, else its a failover.
- Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
- for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
- if (!e.getKey().isMetaTable() && e.getValue() != null) {
- LOG.debug("Found " + e + " out on cluster");
- this.failover = true;
- break;
- }
- if (nodes.contains(e.getKey().getEncodedName())) {
- LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
- // Could be a meta region.
- this.failover = true;
- break;
- }
- }
+ boolean failover = !serverManager.getDeadServers().isEmpty();
- // Remove regions in RIT, they are possibly being processed by
- // ServerShutdownHandler.
- // no lock concurrent access ok: some threads may be adding/removing items but its java-valid
- nodes.removeAll(regionStates.getRegionsInTransition().keySet());
-
- // If some dead servers are processed by ServerShutdownHandler, we shouldn't
- // assign all user regions( some would be assigned by
- // ServerShutdownHandler), consider it as a failover
- if (!this.serverManager.getDeadServers().isEmpty()) {
- this.failover = true;
+ if (!failover) {
+ // Run through all regions. If they are not assigned and not in RIT, then
+ // its a clean cluster startup, else its a failover.
+ Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
+ for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
+ if (!e.getKey().isMetaTable() && e.getValue() != null) {
+ LOG.debug("Found " + e + " out on cluster");
+ failover = true;
+ break;
+ }
+ if (nodes.contains(e.getKey().getEncodedName())) {
+ LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
+ // Could be a meta region.
+ failover = true;
+ break;
+ }
+ }
}
// If we found user regions out on cluster, its a failover.
- if (this.failover) {
+ if (failover) {
LOG.info("Found regions out on cluster or in RIT; failover");
// Process list of dead servers and regions in RIT.
// See HBASE-4580 for more information.
processDeadServersAndRecoverLostRegions(deadServers, nodes);
- this.failover = false;
- failoverProcessedRegions.clear();
} else {
// Fresh cluster startup.
LOG.info("Clean cluster startup. Assigning userregions");
- cleanoutUnassigned();
assignAllUserRegions();
}
}
@@ -425,7 +402,7 @@ public class AssignmentManager extends Z
processRegionInTransition(hri.getEncodedName(), hri, null);
if (!intransistion) return intransistion;
LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
- while (!this.master.isStopped() &&
+ while (!this.server.isStopped() &&
this.regionStates.isRegionInTransition(hri.getEncodedName())) {
// We put a timeout because we may have the region getting in just between the test
// and the waitForUpdate
@@ -445,139 +422,143 @@ public class AssignmentManager extends Z
* @throws IOException
*/
boolean processRegionInTransition(final String encodedRegionName,
- final HRegionInfo regionInfo, final Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers)
- throws KeeperException, IOException {
- Stat stat = new Stat();
- byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
- if (data == null) return false;
- RegionTransition rt;
+ final HRegionInfo regionInfo, final Map<ServerName, List<HRegionInfo>> deadServers)
+ throws KeeperException, IOException {
+ // We need a lock here to ensure that we will not put the same region twice
+ // It has no reason to be a lock shared with the other operations.
+ // We can do the lock on the region only, instead of a global lock: what we want to ensure
+ // is that we don't have two threads working on the same region.
+ Lock lock = locker.acquireLock(encodedRegionName);
try {
- rt = RegionTransition.parseFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse znode data", e);
- return false;
- }
- HRegionInfo hri = regionInfo;
- if (hri == null) {
- if ((hri = getHRegionInfo(rt.getRegionName())) == null) return false;
+ Stat stat = new Stat();
+ byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
+ if (data == null) return false;
+ RegionTransition rt;
+ try {
+ rt = RegionTransition.parseFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed parse znode data", e);
+ return false;
+ }
+ HRegionInfo hri = regionInfo;
+ if (hri == null) {
+ hri = regionStates.getRegionInfo(rt.getRegionName());
+ if (hri == null) return false;
+ }
+ processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
+ return true;
+ } finally {
+ lock.unlock();
}
- processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
- return true;
}
+ /**
+ * This call is invoked only during failover mode, zk assignment node processing.
+ * The locker is set in the caller.
+ *
+ * It should be private but it is used by some test too.
+ */
void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo,
- final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers, int expectedVersion)
- throws KeeperException {
+ final Map<ServerName, List<HRegionInfo>> deadServers, int expectedVersion)
+ throws KeeperException {
EventType et = rt.getEventType();
// Get ServerName. Could be null.
ServerName sn = rt.getServerName();
String encodedRegionName = regionInfo.getEncodedName();
LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
- // We need a lock here to ensure that we will not put the same region twice
- // It has no reason to be a lock shared with the other operations.
- // We can do the lock on the region only, instead of a global lock: what we want to ensure
- // is that we don't have two threads working on the same region.
- Lock lock = locker.acquireLock(encodedRegionName);
- try {
- if (regionStates.isRegionInTransition(encodedRegionName)
- || failoverProcessedRegions.containsKey(encodedRegionName)) {
- // Just return
- return;
+ if (regionStates.isRegionInTransition(encodedRegionName)) {
+ // Just return
+ return;
+ }
+ switch (et) {
+ case M_ZK_REGION_CLOSING:
+ // If zk node of the region was updated by a live server skip this
+ // region and just add it into RIT.
+ if (isOnDeadServer(regionInfo, deadServers)
+ && !serverManager.isServerOnline(sn)) {
+ // If was on dead server, its closed now. Force to OFFLINE and this
+ // will get it reassigned if appropriate
+ forceOffline(regionInfo, rt);
+ } else {
+ // Just insert region into RIT.
+ // If this never updates the timeout will trigger new assignment
+ regionStates.updateRegionState(rt, RegionState.State.CLOSING);
}
- switch (et) {
- case M_ZK_REGION_CLOSING:
- // If zk node of the region was updated by a live server skip this
- // region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers) && !isServerOnline(sn)) {
- // If was on dead server, its closed now. Force to OFFLINE and this
- // will get it reassigned if appropriate
- forceOffline(regionInfo, rt);
- } else {
- // Just insert region into RIT.
- // If this never updates the timeout will trigger new assignment
- regionStates.updateRegionState(rt, RegionState.State.CLOSING);
- }
- failoverProcessedRegions.put(encodedRegionName, regionInfo);
- break;
+ break;
- case RS_ZK_REGION_CLOSED:
- case RS_ZK_REGION_FAILED_OPEN:
- // Region is closed, insert into RIT and handle it
- addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
- failoverProcessedRegions.put(encodedRegionName, regionInfo);
- break;
+ case RS_ZK_REGION_CLOSED:
+ case RS_ZK_REGION_FAILED_OPEN:
+ // Region is closed, insert into RIT and handle it
+ addToRITandCallClose(regionInfo, RegionState.State.CLOSED, rt);
+ break;
- case M_ZK_REGION_OFFLINE:
- // If zk node of the region was updated by a live server skip this
- // region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) {
- // Region is offline, insert into RIT and handle it like a closed
- addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
- } else if (sn != null && !isServerOnline(sn)) {
- // to handle cases where offline node is created but sendRegionOpen
- // RPC is not yet sent
- addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
- } else {
- regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
- }
- failoverProcessedRegions.put(encodedRegionName, regionInfo);
- break;
+ case M_ZK_REGION_OFFLINE:
+ // If zk node of the region was updated by a live server skip this
+ // region and just add it into RIT.
+ if (isOnDeadServer(regionInfo, deadServers)
+ && (sn == null || !serverManager.isServerOnline(sn))) {
+ // Region is offline, insert into RIT and handle it like a closed
+ addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+ } else if (sn != null && !serverManager.isServerOnline(sn)) {
+ // to handle cases where offline node is created but sendRegionOpen
+ // RPC is not yet sent
+ addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
+ } else {
+ regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
+ }
+ break;
- case RS_ZK_REGION_OPENING:
- // TODO: Could check if it was on deadServers. If it was, then we could
- // do what happens in TimeoutMonitor when it sees this condition.
- // Just insert region into RIT
- // If this never updates the timeout will trigger new assignment
- if (regionInfo.isMetaTable()) {
- regionStates.updateRegionState(rt, RegionState.State.OPENING);
- // If ROOT or .META. table is waiting for timeout monitor to assign
- // it may take lot of time when the assignment.timeout.period is
- // the default value which may be very long. We will not be able
- // to serve any request during this time.
- // So we will assign the ROOT and .META. region immediately.
- processOpeningState(regionInfo);
- break;
- } else if (deadServers.keySet().contains(sn)) {
- // if the region is found on a dead server, we can assign
- // it to a new RS. (HBASE-5882)
- processOpeningState(regionInfo);
- break;
- }
+ case RS_ZK_REGION_OPENING:
+ // TODO: Could check if it was on deadServers. If it was, then we could
+ // do what happens in TimeoutMonitor when it sees this condition.
+ // Just insert region into RIT
+ // If this never updates the timeout will trigger new assignment
+ if (regionInfo.isMetaTable()) {
regionStates.updateRegionState(rt, RegionState.State.OPENING);
- failoverProcessedRegions.put(encodedRegionName, regionInfo);
- break;
-
- case RS_ZK_REGION_OPENED:
- // Region is opened, insert into RIT and handle it
- regionStates.updateRegionState(rt, RegionState.State.OPEN);
- // sn could be null if this server is no longer online. If
- // that is the case, just let this RIT timeout; it'll be assigned
- // to new server then.
- if (sn == null) {
- LOG.warn("Region in transition " + regionInfo.getEncodedName() +
- " references a null server; letting RIT timeout so will be " +
- "assigned elsewhere");
- } else if (!serverManager.isServerOnline(sn)
- && (isOnDeadServer(regionInfo, deadServers)
- || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
- forceOffline(regionInfo, rt);
- } else {
- new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion).process();
- }
- failoverProcessedRegions.put(encodedRegionName, regionInfo);
- break;
- case RS_ZK_REGION_SPLITTING:
- LOG.debug("Processed region in state : " + et);
+ // If ROOT or .META. table is waiting for timeout monitor to assign
+ // it may take lot of time when the assignment.timeout.period is
+ // the default value which may be very long. We will not be able
+ // to serve any request during this time.
+ // So we will assign the ROOT and .META. region immediately.
+ processOpeningState(regionInfo);
break;
- case RS_ZK_REGION_SPLIT:
- LOG.debug("Processed region in state : " + et);
+ } else if (deadServers != null
+ && deadServers.keySet().contains(sn)) {
+ // if the region is found on a dead server, we can assign
+ // it to a new RS. (HBASE-5882)
+ processOpeningState(regionInfo);
break;
- default:
- throw new IllegalStateException("Received region in state :" + et + " is not valid");
}
- } finally {
- lock.unlock();
+ regionStates.updateRegionState(rt, RegionState.State.OPENING);
+ break;
+
+ case RS_ZK_REGION_OPENED:
+ // Region is opened, insert into RIT and handle it
+ regionStates.updateRegionState(rt, RegionState.State.OPEN);
+ // sn could be null if this server is no longer online. If
+ // that is the case, just let this RIT timeout; it'll be assigned
+ // to new server then.
+ if (sn == null) {
+ LOG.warn("Region in transition " + regionInfo.getEncodedName() +
+ " references a null server; letting RIT timeout so will be " +
+ "assigned elsewhere");
+ } else if (!serverManager.isServerOnline(sn)
+ && (isOnDeadServer(regionInfo, deadServers)
+ || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
+ forceOffline(regionInfo, rt);
+ } else {
+ new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
+ }
+ break;
+ case RS_ZK_REGION_SPLITTING:
+ LOG.debug("Processed region in state : " + et);
+ break;
+ case RS_ZK_REGION_SPLIT:
+ LOG.debug("Processed region in state : " + et);
+ break;
+ default:
+ throw new IllegalStateException("Received region in state :" + et + " is not valid");
}
}
@@ -598,7 +579,7 @@ public class AssignmentManager extends Z
LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
" was on deadserver; forcing offline");
ZKAssign.createOrForceNodeOffline(this.watcher, hri,
- this.master.getServerName());
+ this.server.getServerName());
addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
}
@@ -612,7 +593,7 @@ public class AssignmentManager extends Z
private void addToRITandCallClose(final HRegionInfo hri,
final RegionState.State state, final RegionTransition oldData) {
regionStates.updateRegionState(oldData, state);
- new ClosedRegionHandler(this.master, this, hri).process();
+ new ClosedRegionHandler(this.server, this, hri).process();
}
/**
@@ -632,12 +613,11 @@ public class AssignmentManager extends Z
* @return True if the passed regionInfo in the passed map of deadServers?
*/
private boolean isOnDeadServer(final HRegionInfo regionInfo,
- final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers) {
+ final Map<ServerName, List<HRegionInfo>> deadServers) {
if (deadServers == null) return false;
- for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer:
- deadServers.entrySet()) {
- for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
- if (e.getFirst().equals(regionInfo)) return true;
+ for (List<HRegionInfo> deadRegions: deadServers.values()) {
+ if (deadRegions.contains(regionInfo)) {
+ return true;
}
}
return false;
@@ -654,7 +634,6 @@ public class AssignmentManager extends Z
* @param expectedVersion
*/
private void handleRegion(final RegionTransition rt, int expectedVersion) {
- HRegionInfo hri = null;
if (rt == null) {
LOG.warn("Unexpected NULL input " + rt);
return;
@@ -675,7 +654,7 @@ public class AssignmentManager extends Z
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
// Verify this is a known server
if (!serverManager.isServerOnline(sn) &&
- !this.master.getServerName().equals(sn)
+ !this.server.getServerName().equals(sn)
&& !ignoreStatesRSOffline.contains(rt.getEventType())) {
LOG.warn("Attempted to handle region transition for server but " +
"server is not online: " + prettyPrintedRegionName);
@@ -739,21 +718,14 @@ public class AssignmentManager extends Z
break;
}
// Run handler to do the rest of the SPLIT handling.
- this.executorService.submit(new SplitRegionHandler(master, this,
+ this.executorService.submit(new SplitRegionHandler(server, this,
regionState.getRegion(), sn, daughters));
break;
case M_ZK_REGION_CLOSING:
- hri = checkIfInFailover(regionState, encodedName, regionName);
- if (hri != null) {
- regionState = regionStates.updateRegionState(
- hri, RegionState.State.CLOSING, createTime, sn);
- failoverProcessedRegions.put(encodedName, hri);
- break;
- }
// Should see CLOSING after we have asked it to CLOSE or additional
// times after already being in state of CLOSING
- if (regionState == null ||
+ if (regionState != null &&
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
" from server " + sn + " but region was in " +
@@ -766,18 +738,8 @@ public class AssignmentManager extends Z
break;
case RS_ZK_REGION_CLOSED:
- hri = checkIfInFailover(regionState, encodedName, regionName);
- if (hri != null) {
- regionState = regionStates.updateRegionState(
- hri, RegionState.State.CLOSED, createTime, sn);
- removeClosedRegion(regionState.getRegion());
- new ClosedRegionHandler(master, this, regionState.getRegion())
- .process();
- failoverProcessedRegions.put(encodedName, hri);
- break;
- }
// Should see CLOSED after CLOSING but possible after PENDING_CLOSE
- if (regionState == null ||
+ if (regionState != null &&
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
" from server " + sn + " but region was in " +
@@ -788,23 +750,16 @@ public class AssignmentManager extends Z
// Handle CLOSED by assigning elsewhere or stopping if a disable
// If we got here all is good. Need to update RegionState -- else
// what follows will fail because not in expected state.
- regionStates.updateRegionState(rt, RegionState.State.CLOSED);
- removeClosedRegion(regionState.getRegion());
- this.executorService.submit(new ClosedRegionHandler(master,
- this, regionState.getRegion()));
+ regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+ if (regionState != null) {
+ removeClosedRegion(regionState.getRegion());
+ this.executorService.submit(new ClosedRegionHandler(server,
+ this, regionState.getRegion()));
+ }
break;
case RS_ZK_REGION_FAILED_OPEN:
- hri = checkIfInFailover(regionState, encodedName, regionName);
- if (hri != null) {
- regionState = regionStates.updateRegionState(
- hri, RegionState.State.CLOSED, createTime, sn);
- new ClosedRegionHandler(master, this, regionState.getRegion())
- .process();
- failoverProcessedRegions.put(encodedName, hri);
- break;
- }
- if (regionState == null ||
+ if (regionState != null &&
(!regionState.isPendingOpen() && !regionState.isOpening())) {
LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
" from server " + sn + " but region was in " +
@@ -812,25 +767,20 @@ public class AssignmentManager extends Z
return;
}
// Handle this the same as if it were opened and then closed.
- regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+ regionState = regionStates.updateRegionState(rt, RegionState.State.CLOSED);
// When there are more than one region server a new RS is selected as the
// destination and the same is updated in the regionplan. (HBASE-5546)
- getRegionPlan(regionState, sn, true);
- this.executorService.submit(new ClosedRegionHandler(master,
- this, regionState.getRegion()));
+ if (regionState != null) {
+ getRegionPlan(regionState, sn, true);
+ this.executorService.submit(new ClosedRegionHandler(server,
+ this, regionState.getRegion()));
+ }
break;
case RS_ZK_REGION_OPENING:
- hri = checkIfInFailover(regionState, encodedName, regionName);
- if (hri != null) {
- regionState = regionStates.updateRegionState(
- hri, RegionState.State.OPENING, createTime, sn);
- failoverProcessedRegions.put(encodedName, hri);
- break;
- }
// Should see OPENING after we have asked it to OPEN or additional
// times after already being in state of OPENING
- if (regionState == null ||
+ if (regionState != null &&
(!regionState.isPendingOpen() && !regionState.isOpening())) {
LOG.warn("Received OPENING for region " +
prettyPrintedRegionName +
@@ -844,16 +794,8 @@ public class AssignmentManager extends Z
break;
case RS_ZK_REGION_OPENED:
- hri = checkIfInFailover(regionState, encodedName, regionName);
- if (hri != null) {
- regionState = regionStates.updateRegionState(
- hri, RegionState.State.OPEN, createTime, sn);
- new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion).process();
- failoverProcessedRegions.put(encodedName, hri);
- break;
- }
// Should see OPENED after OPENING but possible after PENDING_OPEN
- if (regionState == null ||
+ if (regionState != null &&
(!regionState.isPendingOpen() && !regionState.isOpening())) {
LOG.warn("Received OPENED for region " +
prettyPrintedRegionName +
@@ -863,9 +805,11 @@ public class AssignmentManager extends Z
return;
}
// Handle OPENED by removing from transition and deleted zk node
- regionStates.updateRegionState(rt, RegionState.State.OPEN);
- this.executorService.submit(
- new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion));
+ regionState = regionStates.updateRegionState(rt, RegionState.State.OPEN);
+ if (regionState != null) {
+ this.executorService.submit(new OpenedRegionHandler(
+ server, this, regionState.getRegion(), sn, expectedVersion));
+ }
break;
default:
@@ -877,44 +821,6 @@ public class AssignmentManager extends Z
}
/**
- * Checks whether the callback came while RIT was not yet populated during
- * master failover.
- * @param regionState
- * @param encodedName
- * @param data
- * @return hri
- */
- private HRegionInfo checkIfInFailover(RegionState regionState,
- String encodedName, final byte [] regionName) {
- if (regionState == null && this.failover &&
- (failoverProcessedRegions.containsKey(encodedName) == false ||
- failoverProcessedRegions.get(encodedName) == null)) {
- HRegionInfo hri = this.failoverProcessedRegions.get(encodedName);
- if (hri == null) hri = getHRegionInfo(regionName);
- return hri;
- }
- return null;
- }
-
- /**
- * Gets the HRegionInfo from the META table
- * @param regionName
- * @return HRegionInfo hri for the region
- */
- private HRegionInfo getHRegionInfo(final byte [] regionName) {
- Pair<HRegionInfo, ServerName> p = null;
- try {
- p = MetaReader.getRegion(catalogTracker, regionName);
- if (p == null) return null;
- return p.getFirst();
- } catch (IOException e) {
- master.abort("Aborting because error occoured while reading " +
- Bytes.toStringBinary(regionName) + " from .META.", e);
- return null;
- }
- }
-
- /**
* @return Returns true if this RegionState is splittable; i.e. the
* RegionState is currently in splitting state or pending_close or
* null (Anything else will return false). (Anything else will return false).
@@ -1032,16 +938,16 @@ public class AssignmentManager extends Z
RegionTransition rt = RegionTransition.parseFrom(data);
handleRegion(rt, stat.getVersion());
} catch (KeeperException e) {
- master.abort("Unexpected ZK exception reading unassigned node data", e);
+ server.abort("Unexpected ZK exception reading unassigned node data", e);
} catch (DeserializationException e) {
- master.abort("Unexpected exception deserializing node data", e);
+ server.abort("Unexpected exception deserializing node data", e);
}
}
@Override
public void nodeDeleted(final String path) {
if (path.startsWith(this.watcher.assignmentZNode)) {
- String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
+ String regionName = ZKAssign.getRegionName(this.watcher, path);
RegionState rs = regionStates.getRegionTransitionState(regionName);
if (rs != null) {
HRegionInfo regionInfo = rs.getRegion();
@@ -1091,7 +997,7 @@ public class AssignmentManager extends Z
ZKUtil.listChildrenAndWatchThem(watcher,
watcher.assignmentZNode);
} catch(KeeperException e) {
- master.abort("Unexpected ZK exception reading unassigned children", e);
+ server.abort("Unexpected ZK exception reading unassigned children", e);
}
}
}
@@ -1105,9 +1011,9 @@ public class AssignmentManager extends Z
* @param sn
*/
void regionOnline(HRegionInfo regionInfo, ServerName sn) {
- if (!isServerOnline(sn)) {
+ if (!serverManager.isServerOnline(sn)) {
LOG.warn("A region was opened on a dead server, ServerName=" +
- sn.getServerName() + ", region=" + regionInfo.getEncodedName());
+ sn + ", region=" + regionInfo.getEncodedName());
}
regionStates.regionOnline(regionInfo, sn);
@@ -1190,7 +1096,7 @@ public class AssignmentManager extends Z
LOG.debug("Tried to delete closed node for " + regionInfo + " but it " +
"does not exist so just offlining");
} catch (KeeperException e) {
- this.master.abort("Error deleting CLOSED node in ZK", e);
+ this.server.abort("Error deleting CLOSED node in ZK", e);
}
regionOffline(regionInfo);
}
@@ -1242,14 +1148,9 @@ public class AssignmentManager extends Z
region.getRegionNameAsString());
return;
}
- RegionState state = forceRegionStateToOffline(region,
- hijack);
- // TODO: we can't synchronized on state any more since it could
- // be an new instance. We need to reconsider how to avoid
- // double/multiple assignments.
- // This is to prevent double assignments? Does it work?
+ RegionState state = forceRegionStateToOffline(region, hijack);
String encodedName = region.getEncodedName();
- Lock lock = assignLocker.acquireLock(encodedName);
+ Lock lock = locker.acquireLock(encodedName);
try {
assign(region, state, setOfflineInZK, forceNewPlan, hijack);
} finally {
@@ -1296,7 +1197,7 @@ public class AssignmentManager extends Z
}
// Wait until all unassigned nodes have been put up and watchers set.
int total = regions.size();
- for (int oldCounter = 0; true;) {
+ for (int oldCounter = 0; !server.isStopped();) {
int count = counter.get();
if (oldCounter != count) {
LOG.info(destination.toString() + " unassigned znodes=" + count +
@@ -1304,16 +1205,20 @@ public class AssignmentManager extends Z
oldCounter = count;
}
if (count == total) break;
- Threads.sleep(1);
+ Threads.sleep(10);
}
+ if (server.isStopped()) {
+ return false;
+ }
+
// Move on to open regions.
try {
// Send OPEN RPC. If it fails on a IOE or RemoteException, the
// TimeoutMonitor will pick up the pieces.
long maxWaitTime = System.currentTimeMillis() +
- this.master.getConfiguration().
+ this.server.getConfiguration().
getLong("hbase.regionserver.rpc.startup.waittime", 60000);
- while (!this.master.isStopped()) {
+ while (!this.server.isStopped()) {
try {
List<RegionOpeningState> regionOpeningStateList = this.serverManager
.sendRegionOpen(destination, regions);
@@ -1343,7 +1248,7 @@ public class AssignmentManager extends Z
if (now > maxWaitTime) throw e;
LOG.debug("Server is not yet up; waiting up to " +
(maxWaitTime - now) + "ms", e);
- Thread.sleep(1000);
+ Thread.sleep(100);
}
throw decodedException;
@@ -1362,52 +1267,6 @@ public class AssignmentManager extends Z
}
/**
- * Bulk assign regions to available servers if any with retry, else assign
- * region singly.
- *
- * @param regions all regions to assign
- * @param servers all available servers
- */
- public void assign(List<HRegionInfo> regions, List<ServerName> servers) {
- LOG.info("Quickly assigning " + regions.size() + " region(s) across "
- + servers.size() + " server(s)");
- Map<ServerName, List<HRegionInfo>> bulkPlan = balancer
- .roundRobinAssignment(regions, servers);
- if (bulkPlan == null || bulkPlan.isEmpty()) {
- LOG.info("Failed getting bulk plan, assigning region singly");
- for (HRegionInfo region : regions) {
- assign(region, true);
- }
- return;
- }
- Map<ServerName, List<HRegionInfo>> failedPlans = new HashMap<ServerName, List<HRegionInfo>>();
- for (Map.Entry<ServerName, List<HRegionInfo>> e : bulkPlan.entrySet()) {
- try {
- if (!assign(e.getKey(), e.getValue())) {
- failedPlans.put(e.getKey(), e.getValue());
- }
- } catch (Throwable t) {
- LOG.warn("Failed bulking assigning " + e.getValue().size()
- + " region(s) to " + e.getKey().getServerName()
- + ", and continue to bulk assign others", t);
- failedPlans.put(e.getKey(), e.getValue());
- }
- }
- if (!failedPlans.isEmpty()) {
- servers.removeAll(failedPlans.keySet());
- List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
- for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
- LOG.info("Failed assigning " + e.getValue().size()
- + " regions to server " + e.getKey() + ", reassigning them");
- reassigningRegions.addAll(e.getValue());
- }
- for (HRegionInfo region : reassigningRegions) {
- assign(region, true, true);
- }
- }
- }
-
- /**
* Callback handler for create unassigned znodes used during bulk assign.
*/
static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
@@ -1428,8 +1287,10 @@ public class AssignmentManager extends Z
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- if (rc != 0) {
- // Thisis resultcode. If non-zero, need to resubmit.
+ if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
+ LOG.warn("Node for " + path + " already exists");
+ } else if (rc != 0) {
+ // This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
this.zkw.abort("Connectionloss writing unassigned at " + path +
@@ -1565,7 +1426,7 @@ public class AssignmentManager extends Z
if (setOfflineInZK && versionOfOfflineNode == -1) {
return;
}
- if (this.master.isStopped()) {
+ if (this.server.isStopped()) {
LOG.debug("Server stopped; skipping assign of " + state);
return;
}
@@ -1643,14 +1504,14 @@ public class AssignmentManager extends Z
+ " to " + sn);
String encodedRegionName = region.getEncodedName();
try {
- ZKAssign.deleteOfflineNode(master.getZooKeeper(), encodedRegionName);
+ ZKAssign.deleteOfflineNode(watcher, encodedRegionName);
} catch (KeeperException.NoNodeException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("The unassigned node " + encodedRegionName
+ " doesnot exist.");
}
} catch (KeeperException e) {
- master.abort(
+ server.abort(
"Error deleting OFFLINED node in ZK for transition ZK node ("
+ encodedRegionName + ")", e);
}
@@ -1688,7 +1549,7 @@ public class AssignmentManager extends Z
if (!hijack && !state.isClosed() && !state.isOffline()) {
if (!regionAlreadyInTransitionException ) {
String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
- this.master.abort(msg, new IllegalStateException(msg));
+ this.server.abort(msg, new IllegalStateException(msg));
return -1;
} else {
LOG.debug("Unexpected state : " + state
@@ -1705,7 +1566,7 @@ public class AssignmentManager extends Z
// For all other cases we can change the in-memory state to OFFLINE.
if (hijack &&
(state.getState().equals(RegionState.State.PENDING_OPEN) ||
- state.getState().equals(RegionState.State.OPENING))) {
+ state.getState().equals(RegionState.State.OPENING))) {
regionStates.updateRegionState(state.getRegion(),
RegionState.State.PENDING_OPEN);
allowZNodeCreation = false;
@@ -1717,8 +1578,8 @@ public class AssignmentManager extends Z
int versionOfOfflineNode = -1;
try {
// get the version after setting the znode to OFFLINE
- versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
- state.getRegion(), this.master.getServerName(),
+ versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
+ state.getRegion(), this.server.getServerName(),
hijack, allowZNodeCreation);
if (versionOfOfflineNode == -1) {
LOG.warn("Attempted to create/force node into OFFLINE state before "
@@ -1726,7 +1587,7 @@ public class AssignmentManager extends Z
return -1;
}
} catch (KeeperException e) {
- master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
+ server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
return -1;
}
return versionOfOfflineNode;
@@ -1741,20 +1602,20 @@ public class AssignmentManager extends Z
boolean asyncSetOfflineInZooKeeper(final RegionState state,
final AsyncCallback.StringCallback cb, final Object ctx) {
if (!state.isClosed() && !state.isOffline()) {
- this.master.abort("Unexpected state trying to OFFLINE; " + state,
+ this.server.abort("Unexpected state trying to OFFLINE; " + state,
new IllegalStateException());
return false;
}
regionStates.updateRegionState(
state.getRegion(), RegionState.State.OFFLINE);
try {
- ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
- this.master.getServerName(), cb, ctx);
+ ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
+ this.server.getServerName(), cb, ctx);
} catch (KeeperException e) {
if (e instanceof NodeExistsException) {
LOG.warn("Node for " + state.getRegion() + " already exists");
- } else {
- master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
+ } else {
+ server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
}
return false;
}
@@ -1832,17 +1693,6 @@ public class AssignmentManager extends Z
}
/**
- * Loop through the deadNotExpired server list and remove them from the
- * servers.
- * @param servers
- * @deprecated the method is now available in ServerManager - deprecated in 0.96
- */
- @Deprecated
- void removeDeadNotExpiredServers(List<ServerName> servers) {
- this.serverManager.removeDeadNotExpiredServers(servers);
- }
-
- /**
* Unassign the list of regions. Configuration knobs:
* hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
* wait before unassigning another region from this region server
@@ -1851,7 +1701,7 @@ public class AssignmentManager extends Z
* @throws InterruptedException
*/
public void unassign(List<HRegionInfo> regions) {
- int waitTime = this.master.getConfiguration().getInt(
+ int waitTime = this.server.getConfiguration().getInt(
"hbase.bulk.waitbetween.reopen", 0);
for (HRegionInfo region : regions) {
if (regionStates.isRegionInTransition(region))
@@ -1929,7 +1779,7 @@ public class AssignmentManager extends Z
// Create the znode in CLOSING state
try {
versionOfClosingNode = ZKAssign.createNodeClosing(
- master.getZooKeeper(), region, master.getServerName());
+ watcher, region, server.getServerName());
if (versionOfClosingNode == -1) {
LOG.debug("Attempting to unassign region " +
region.getRegionNameAsString() + " but ZK closing node "
@@ -1963,7 +1813,7 @@ public class AssignmentManager extends Z
}
}
// If we get here, don't understand whats going on -- abort.
- master.abort("Unexpected ZK exception creating node CLOSING", e);
+ server.abort("Unexpected ZK exception creating node CLOSING", e);
return;
}
state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
@@ -2011,7 +1861,7 @@ public class AssignmentManager extends Z
// Presume that master has stale data. Presume remote side just split.
// Presume that the split message when it comes in will fix up the master's
// in memory cluster state.
- if (checkIfRegionBelongsToDisabling(region)) {
+ if (getZKTable().isDisablingTable(region.getTableNameAsString())) {
// Remove from the regionsinTransition map
LOG.info("While trying to recover the table "
+ region.getTableNameAsString()
@@ -2041,9 +1891,9 @@ public class AssignmentManager extends Z
*/
public void deleteClosingOrClosedNode(HRegionInfo region) {
try {
- if (!ZKAssign.deleteNode(master.getZooKeeper(), region.getEncodedName(),
+ if (!ZKAssign.deleteNode(watcher, region.getEncodedName(),
EventHandler.EventType.M_ZK_REGION_CLOSING)) {
- boolean deleteNode = ZKAssign.deleteNode(master.getZooKeeper(), region
+ boolean deleteNode = ZKAssign.deleteNode(watcher, region
.getEncodedName(), EventHandler.EventType.RS_ZK_REGION_CLOSED);
// TODO : We don't abort if the delete node returns false. Is there any
// such corner case?
@@ -2056,7 +1906,7 @@ public class AssignmentManager extends Z
LOG.debug("CLOSING/CLOSED node for the region " + region.getEncodedName()
+ " already deleted");
} catch (KeeperException ke) {
- master.abort(
+ server.abort(
"Unexpected ZK exception deleting node CLOSING/CLOSED for the region "
+ region.getEncodedName(), ke);
return;
@@ -2074,7 +1924,7 @@ public class AssignmentManager extends Z
boolean result = false;
// This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
// can get data from it.
- byte [] data = ZKAssign.getData(master.getZooKeeper(), path);
+ byte [] data = ZKAssign.getData(watcher, path);
if (data == null) return false;
RegionTransition rt = RegionTransition.parseFrom(data);
switch (rt.getEventType()) {
@@ -2098,7 +1948,7 @@ public class AssignmentManager extends Z
*/
public void waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
- while(!this.master.isStopped() &&
+ while(!this.server.isStopped() &&
!regionStates.isRegionAssigned(regionInfo)) {
// We should receive a notification, but it's
// better to have a timeout to recheck the condition here:
@@ -2118,7 +1968,7 @@ public class AssignmentManager extends Z
* @throws KeeperException
*/
public void assignRoot() throws KeeperException {
- RootRegionTracker.deleteRootLocation(this.master.getZooKeeper());
+ RootRegionTracker.deleteRootLocation(this.watcher);
assign(HRegionInfo.ROOT_REGIONINFO, true);
}
@@ -2136,38 +1986,32 @@ public class AssignmentManager extends Z
}
/**
- * Assigns all user regions to online servers. Use round-robin assignment.
- *
- * @param regions
- * @throws IOException
- * @throws InterruptedException
- */
- public void assignUserRegionsToOnlineServers(List<HRegionInfo> regions)
- throws IOException,
- InterruptedException {
- List<ServerName> destServers = serverManager.createDestinationServersList();
- assignUserRegions(regions, destServers);
- }
-
- /**
- * Assigns all user regions, if any. Used during cluster startup.
+ * Assigns specified regions round robin, if any.
* <p>
* This is a synchronous call and will return once every region has been
* assigned. If anything fails, an exception is thrown
* @throws InterruptedException
* @throws IOException
*/
- public void assignUserRegions(List<HRegionInfo> regions, List<ServerName> servers)
- throws IOException, InterruptedException {
- if (regions == null)
+ public void assign(List<HRegionInfo> regions)
+ throws IOException, InterruptedException {
+ if (regions == null || regions.isEmpty()) {
return;
- Map<ServerName, List<HRegionInfo>> bulkPlan = null;
+ }
+ List<ServerName> servers = serverManager.createDestinationServersList();
+ if (servers == null || servers.isEmpty()) {
+ throw new IOException("Found no destination server to assign region(s)");
+ }
+
// Generate a round-robin bulk assignment plan
- bulkPlan = balancer.roundRobinAssignment(regions, servers);
- LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " +
- servers.size() + " server(s)");
+ Map<ServerName, List<HRegionInfo>> bulkPlan
+ = balancer.roundRobinAssignment(regions, servers);
+
+ LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across "
+ + servers.size() + " server(s)");
+
// Use fixed count thread pool assigning.
- BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
+ BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
ba.bulkAssign();
LOG.info("Bulk assigning done");
}
@@ -2192,165 +2036,53 @@ public class AssignmentManager extends Z
* should be shutdown.
* @throws InterruptedException
* @throws IOException
+ * @throws KeeperException
*/
- public void assignAllUserRegions() throws IOException, InterruptedException {
+ private void assignAllUserRegions()
+ throws IOException, InterruptedException, KeeperException {
+ // Cleanup any existing ZK nodes and start watching
+ ZKAssign.deleteAllNodes(watcher);
+ ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
+ this.watcher.assignmentZNode);
+ failoverCleanupDone();
+
// Skip assignment for regions of tables in DISABLING state because during clean cluster startup
// no RS is alive and regions map also doesn't have any information about the regions.
// See HBASE-6281.
- Set<String> disablingAndDisabledTables = new HashSet<String>(this.disablingTables);
- disablingAndDisabledTables.addAll(this.zkTable.getDisabledTables());
+ Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisabledOrDisablingTables(watcher);
+ disabledOrDisablingOrEnabling.addAll(ZKTable.getEnablingTables(watcher));
// Scan META for all user regions, skipping any disabled tables
- Map<HRegionInfo, ServerName> allRegions = MetaReader.fullScan(catalogTracker,
- disablingAndDisabledTables, true);
+ Map<HRegionInfo, ServerName> allRegions = MetaReader.fullScan(
+ catalogTracker, disabledOrDisablingOrEnabling, true);
if (allRegions == null || allRegions.isEmpty()) return;
- // Get all available servers
- List<ServerName> destServers = serverManager.createDestinationServersList();
-
- // If there are no servers we need not proceed with region assignment.
- if (destServers.isEmpty()) return;
-
// Determine what type of assignment to do on startup
- boolean retainAssignment = master.getConfiguration().
+ boolean retainAssignment = server.getConfiguration().
getBoolean("hbase.master.startup.retainassign", true);
- Map<ServerName, List<HRegionInfo>> bulkPlan = null;
if (retainAssignment) {
+ List<ServerName> servers = serverManager.createDestinationServersList();
+ if (servers == null || servers.isEmpty()) {
+ throw new IOException("Found no destination server to assign region(s)");
+ }
+
// Reuse existing assignment info
- bulkPlan = balancer.retainAssignment(allRegions, destServers);
+ Map<ServerName, List<HRegionInfo>> bulkPlan =
+ balancer.retainAssignment(allRegions, servers);
+
+ LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
+ servers.size() + " server(s), retainAssignment=true");
+ BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
+ ba.bulkAssign();
+ LOG.info("Bulk assigning done");
} else {
- // assign regions in round-robin fashion
- assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), destServers);
- for (HRegionInfo hri : allRegions.keySet()) {
- setEnabledTable(hri);
- }
- return;
+ List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
+ assign(regions);
}
- LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
- destServers.size() + " server(s), retainAssignment=" + retainAssignment);
- // Use fixed count thread pool assigning.
- BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
- ba.bulkAssign();
for (HRegionInfo hri : allRegions.keySet()) {
setEnabledTable(hri);
}
- LOG.info("Bulk assigning done");
- }
-
- /**
- * Run bulk assign on startup. Does one RCP per regionserver passing a
- * batch of reginons using {@link SingleServerBulkAssigner}.
- * Uses default {@link #getUncaughtExceptionHandler()}
- * which will abort the Server if exception.
- */
- static class StartupBulkAssigner extends BulkAssigner {
- final Map<ServerName, List<HRegionInfo>> bulkPlan;
- final AssignmentManager assignmentManager;
-
- StartupBulkAssigner(final Server server,
- final Map<ServerName, List<HRegionInfo>> bulkPlan,
- final AssignmentManager am) {
- super(server);
- this.bulkPlan = bulkPlan;
- this.assignmentManager = am;
- }
-
- @Override
- public boolean bulkAssign(boolean sync) throws InterruptedException,
- IOException {
- // Disable timing out regions in transition up in zk while bulk assigning.
- this.assignmentManager.timeoutMonitor.bulkAssign(true);
- try {
- return super.bulkAssign(sync);
- } finally {
- // Reenable timing out regions in transition up in zi.
- this.assignmentManager.timeoutMonitor.bulkAssign(false);
- }
- }
-
- @Override
- protected String getThreadNamePrefix() {
- return this.server.getServerName() + "-StartupBulkAssigner";
- }
-
- @Override
- protected void populatePool(java.util.concurrent.ExecutorService pool) {
- for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
- pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
- this.assignmentManager));
- }
- }
-
- /**
- *
- * @param timeout How long to wait.
- * @return true if done.
- */
- @Override
- protected boolean waitUntilDone(final long timeout)
- throws InterruptedException {
- Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
- for (List<HRegionInfo> regionList : bulkPlan.values()) {
- regionSet.addAll(regionList);
- }
- return this.assignmentManager.waitUntilNoRegionsInTransition(timeout, regionSet);
- }
-
- @Override
- protected long getTimeoutOnRIT() {
- // Guess timeout. Multiply the number of regions on a random server
- // by how long we thing one region takes opening.
- long perRegionOpenTimeGuesstimate =
- this.server.getConfiguration().getLong("hbase.bulk.assignment.perregion.open.time", 1000);
- int regionsPerServer =
- this.bulkPlan.entrySet().iterator().next().getValue().size();
- long timeout = perRegionOpenTimeGuesstimate * regionsPerServer;
- LOG.debug("Timeout-on-RIT=" + timeout);
- return timeout;
- }
- }
-
- /**
- * Bulk user region assigner.
- * If failed assign, lets timeout in RIT do cleanup.
- */
- static class GeneralBulkAssigner extends StartupBulkAssigner {
- GeneralBulkAssigner(final Server server,
- final Map<ServerName, List<HRegionInfo>> bulkPlan,
- final AssignmentManager am) {
- super(server, bulkPlan, am);
- }
-
- @Override
- protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
- return new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- LOG.warn("Assigning regions in " + t.getName(), e);
- }
- };
- }
- }
-
- /**
- * Manage bulk assigning to a server.
- */
- static class SingleServerBulkAssigner implements Runnable {
- private final ServerName regionserver;
- private final List<HRegionInfo> regions;
- private final AssignmentManager assignmentManager;
-
- SingleServerBulkAssigner(final ServerName regionserver,
- final List<HRegionInfo> regions, final AssignmentManager am) {
- this.regionserver = regionserver;
- this.regions = regions;
- this.assignmentManager = am;
- }
- @Override
- public void run() {
- this.assignmentManager.assign(this.regionserver, this.regions);
- }
}
/**
@@ -2369,7 +2101,7 @@ public class AssignmentManager extends Z
// state of the Master.
final long endTime = System.currentTimeMillis() + timeout;
- while (!this.master.isStopped() && regionStates.isRegionsInTransition()
+ while (!this.server.isStopped() && regionStates.isRegionsInTransition()
&& endTime > System.currentTimeMillis()) {
regionStates.waitForUpdate(100);
}
@@ -2378,35 +2110,6 @@ public class AssignmentManager extends Z
}
/**
- * Wait until no regions from set regions are in transition.
- * @param timeout How long to wait.
- * @param regions set of regions to wait for. It will be modified by this method.
- * @return True if none of the regions in the set is in transition
- * @throws InterruptedException
- */
- boolean waitUntilNoRegionsInTransition(final long timeout, Set<HRegionInfo> regions)
- throws InterruptedException {
- final long endTime = System.currentTimeMillis() + timeout;
-
- // We're not synchronizing on regionsInTransition now because we don't use any iterator.
- while (!regions.isEmpty() && !this.master.isStopped() && endTime > System.currentTimeMillis()) {
- Iterator<HRegionInfo> regionInfoIterator = regions.iterator();
- while (regionInfoIterator.hasNext()) {
- HRegionInfo hri = regionInfoIterator.next();
- if (!regionStates.isRegionInTransition(hri)) {
- regionInfoIterator.remove();
- }
- }
-
- if (!regions.isEmpty()) {
- regionStates.waitForUpdate(100);
- }
- }
-
- return regions.isEmpty();
- }
-
- /**
* Rebuild the list of user regions and assignment information.
* <p>
* Returns a map of servers that are not found to be online and the regions
@@ -2415,19 +2118,22 @@ public class AssignmentManager extends Z
* in META
* @throws IOException
*/
- Map<ServerName, List<Pair<HRegionInfo, Result>>> rebuildUserRegions()
- throws IOException, KeeperException {
+ Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws IOException, KeeperException {
+ Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
+ Set<String> disabledOrEnablingTables = ZKTable.getDisabledTables(watcher);
+ disabledOrEnablingTables.addAll(enablingTables);
+ Set<String> disabledOrDisablingOrEnabling = ZKTable.getDisablingTables(watcher);
+ disabledOrDisablingOrEnabling.addAll(disabledOrEnablingTables);
+
// Region assignment from META
List<Result> results = MetaReader.fullScan(this.catalogTracker);
// Get any new but slow to checkin region server that joined the cluster
Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
// Map of offline servers and their regions to be returned
- Map<ServerName, List<Pair<HRegionInfo,Result>>> offlineServers =
- new TreeMap<ServerName, List<Pair<HRegionInfo, Result>>>();
+ Map<ServerName, List<HRegionInfo>> offlineServers =
+ new TreeMap<ServerName, List<HRegionInfo>>();
// Iterate regions in META
for (Result result : results) {
- boolean disabled = false;
- boolean disablingOrEnabling = false;
Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
if (region == null) continue;
HRegionInfo regionInfo = region.getFirst();
@@ -2446,29 +2152,25 @@ public class AssignmentManager extends Z
// from ENABLED state when application calls disableTable.
// It can't be in DISABLED state, because DISABLED states transitions
// from DISABLING state.
- if (false == checkIfRegionsBelongsToEnabling(regionInfo)) {
+ if (!enablingTables.contains(tableName)) {
LOG.warn("Region " + regionInfo.getEncodedName() +
" has null regionLocation." + " But its table " + tableName +
" isn't in ENABLING state.");
}
- addTheTablesInPartialState(this.disablingTables, this.enablingTables, regionInfo,
- tableName);
} else if (!onlineServers.contains(regionLocation)) {
// Region is located on a server that isn't online
- List<Pair<HRegionInfo, Result>> offlineRegions =
- offlineServers.get(regionLocation);
+ List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
if (offlineRegions == null) {
- offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
+ offlineRegions = new ArrayList<HRegionInfo>(1);
offlineServers.put(regionLocation, offlineRegions);
}
- offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
- disabled = checkIfRegionBelongsToDisabled(regionInfo);
- disablingOrEnabling = addTheTablesInPartialState(this.disablingTables,
- this.enablingTables, regionInfo, tableName);
+ offlineRegions.add(regionInfo);
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
- enableTableIfNotDisabledOrDisablingOrEnabling(disabled,
- disablingOrEnabling, tableName);
+ if (!disabledOrDisablingOrEnabling.contains(tableName)
+ && !getZKTable().isEnabledTable(tableName)) {
+ setEnabledTable(tableName);
+ }
} else {
// If region is in offline and split state check the ZKNode
if (regionInfo.isOffline() && regionInfo.isSplit()) {
@@ -2476,7 +2178,7 @@ public class AssignmentManager extends Z
.getEncodedName());
Stat stat = new Stat();
byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
- // If znode does not exist dont consider this region
+ // If znode does not exist, don't consider this region
if (data == null) {
LOG.debug("Region " + regionInfo.getRegionNameAsString()
+ " split is completed. Hence need not add to regions list");
@@ -2484,92 +2186,56 @@ public class AssignmentManager extends Z
}
}
// Region is being served and on an active server
- // add only if region not in disabled and enabling table
- if (!checkIfRegionBelongsToDisabled(regionInfo)
- && !checkIfRegionsBelongsToEnabling(regionInfo)) {
+ // add only if region not in disabled or enabling table
+ if (!disabledOrEnablingTables.contains(tableName)) {
regionStates.regionOnline(regionInfo, regionLocation);
}
- disablingOrEnabling = addTheTablesInPartialState(this.disablingTables,
- this.enablingTables, regionInfo, tableName);
- disabled = checkIfRegionBelongsToDisabled(regionInfo);
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
- enableTableIfNotDisabledOrDisablingOrEnabling(disabled,
- disablingOrEnabling, tableName);
+ if (!disabledOrDisablingOrEnabling.contains(tableName)
+ && !getZKTable().isEnabledTable(tableName)) {
+ setEnabledTable(tableName);
+ }
}
}
return offlineServers;
}
- private void enableTableIfNotDisabledOrDisablingOrEnabling(boolean disabled,
- boolean disablingOrEnabling, String tableName) {
- if (!disabled && !disablingOrEnabling
- && !getZKTable().isEnabledTable(tableName)) {
- setEnabledTable(tableName);
- }
- }
-
- private Boolean addTheTablesInPartialState(Set<String> disablingTables,
- Set<String> enablingTables, HRegionInfo regionInfo,
- String disablingTableName) {
- if (checkIfRegionBelongsToDisabling(regionInfo)) {
- disablingTables.add(disablingTableName);
- return true;
- } else if (checkIfRegionsBelongsToEnabling(regionInfo)) {
- enablingTables.add(disablingTableName);
- return true;
- }
- return false;
- }
-
/**
* Recover the tables that were not fully moved to DISABLED state. These
* tables are in DISABLING state when the master restarted/switched.
*
- * @param disablingTables
- * @return
* @throws KeeperException
* @throws TableNotFoundException
* @throws IOException
*/
- private boolean recoverTableInDisablingState(Set<String> disablingTables)
+ private void recoverTableInDisablingState()
throws KeeperException, TableNotFoundException, IOException {
- boolean isWatcherCreated = false;
+ Set<String> disablingTables = ZKTable.getDisablingTables(watcher);
if (disablingTables.size() != 0) {
- // Create a watcher on the zookeeper node
- ZKUtil.listChildrenAndWatchForNewChildren(watcher,
- watcher.assignmentZNode);
- isWatcherCreated = true;
for (String tableName : disablingTables) {
// Recover by calling DisableTableHandler
LOG.info("The table " + tableName
+ " is in DISABLING state. Hence recovering by moving the table"
+ " to DISABLED state.");
- new DisableTableHandler(this.master, tableName.getBytes(),
+ new DisableTableHandler(this.server, tableName.getBytes(),
catalogTracker, this, true).process();
}
}
- return isWatcherCreated;
}
/**
* Recover the tables that are not fully moved to ENABLED state. These tables
* are in ENABLING state when the master restarted/switched
*
- * @param enablingTables
- * @param isWatcherCreated
* @throws KeeperException
* @throws TableNotFoundException
* @throws IOException
*/
- private void recoverTableInEnablingState(Set<String> enablingTables,
- boolean isWatcherCreated) throws KeeperException, TableNotFoundException,
- IOException {
+ private void recoverTableInEnablingState()
+ throws KeeperException, TableNotFoundException, IOException {
+ Set<String> enablingTables = ZKTable.getEnablingTables(watcher);
if (enablingTables.size() != 0) {
- if (false == isWatcherCreated) {
- ZKUtil.listChildrenAndWatchForNewChildren(watcher,
- watcher.assignmentZNode);
- }
for (String tableName : enablingTables) {
// Recover by calling EnableTableHandler
LOG.info("The table " + tableName
@@ -2577,27 +2243,12 @@ public class AssignmentManager extends Z
+ " to ENABLED state.");
// enableTable in sync way during master startup,
// no need to invoke coprocessor
- new EnableTableHandler(this.master, tableName.getBytes(),
+ new EnableTableHandler(this.server, tableName.getBytes(),
catalogTracker, this, true).process();
}
}
}
- private boolean checkIfRegionsBelongsToEnabling(HRegionInfo regionInfo) {
- String tableName = regionInfo.getTableNameAsString();
- return getZKTable().isEnablingTable(tableName);
- }
-
- private boolean checkIfRegionBelongsToDisabled(HRegionInfo regionInfo) {
- String tableName = regionInfo.getTableNameAsString();
- return getZKTable().isDisabledTable(tableName);
- }
-
- private boolean checkIfRegionBelongsToDisabling(HRegionInfo regionInfo) {
- String tableName = regionInfo.getTableNameAsString();
- return getZKTable().isDisablingTable(tableName);
- }
-
/**
* Processes list of dead servers from result of META scan and regions in RIT
* <p>
@@ -2615,82 +2266,35 @@ public class AssignmentManager extends Z
* @throws KeeperException
*/
private void processDeadServersAndRecoverLostRegions(
- Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers, List<String> nodes)
+ Map<ServerName, List<HRegionInfo>> deadServers, List<String> nodes)
throws IOException, KeeperException {
- processDeadServers(deadServers, nodes);
+ if (deadServers != null) {
+ for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
+ ServerName serverName = server.getKey();
+ if (!serverManager.isServerDead(serverName)) {
+ serverManager.expireServer(serverName); // Let SSH do region re-assign
+ }
+ }
+ }
+ nodes = ZKUtil.listChildrenAndWatchForNewChildren(
+ this.watcher, this.watcher.assignmentZNode);
if (!nodes.isEmpty()) {
for (String encodedRegionName : nodes) {
processRegionInTransition(encodedRegionName, null, deadServers);
}
}
- }
-
- private void processDeadServers(Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
- final List<String> nodes)
- throws KeeperException, IOException {
- if (deadServers == null) return;
- Set<ServerName> actualDeadServers = this.serverManager.getDeadServers();
- for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer: deadServers.entrySet()) {
- // skip regions of dead servers because SSH will process regions during rs expiration.
- // see HBASE-5916
- if (actualDeadServers.contains(deadServer.getKey())) {
- for (Pair<HRegionInfo, Result> deadRegion : deadServer.getValue()) {
- nodes.remove(deadRegion.getFirst().getEncodedName());
- }
- continue;
- }
- List<Pair<HRegionInfo, Result>> regions = deadServer.getValue();
- for (Pair<HRegionInfo, Result> region : regions) {
- HRegionInfo regionInfo = region.getFirst();
- Result result = region.getSecond();
- try {
- // If region was in transition (was in zk) force it offline for reassign. Check if node
- // up in zk at all first.
- if (ZKUtil.checkExists(this.watcher,
- ZKAssign.getPath(this.watcher, regionInfo.getEncodedName())) != -1) {
- byte [] data = ZKAssign.getData(watcher, regionInfo.getEncodedName());
- if (data == null) {
- LOG.warn("No data in znode for " + regionInfo.getEncodedName());
- continue;
- }
- RegionTransition rt;
- try {
- rt = RegionTransition.parseFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse of znode data for " + regionInfo.getEncodedName(), e);
- continue;
- }
- // If zk node of this region has been updated by a live server,
- // we consider that this region is being handled.
- // So we should skip it and process it in processRegionsInTransition.
- ServerName sn = rt.getServerName();
- if (isServerOnline(sn)) {
- LOG.info("The region " + regionInfo.getEncodedName() + "is being handled on " + sn);
- continue;
- }
- }
- // Process with existing RS shutdown code
- boolean assign = ServerShutdownHandler.processDeadRegion(
- regionInfo, result, this, this.catalogTracker);
- if (assign) {
- ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
- master.getServerName());
- if (!nodes.contains(regionInfo.getEncodedName())) {
- nodes.add(regionInfo.getEncodedName());
- }
- }
- } catch (KeeperException.NoNodeException nne) {
- // This is fine
- }
- }
- }
+ // Now we can safely claim failover cleanup completed and enable
+ // ServerShutdownHandler for further processing. The nodes (below)
+ // in transition, if any, are for regions not related to those
+ // dead servers at all, and can be done in parallel to SSH.
+ failoverCleanupDone();
}
/**
* Set Regions in transitions metrics.
* This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
- * This iterator is not fail fast, wich may lead to stale read; but that's better than
+ * This iterator is not fail fast, which may lead to stale read; but that's better than
* creating a copy of the map for metrics computation, as this method will be invoked
* on a frequent interval.
*/
@@ -2699,7 +2303,7 @@ public class AssignmentManager extends Z
int totalRITs = 0;
int totalRITsOverThreshold = 0;
long oldestRITTime = 0;
- int ritThreshold = this.master.getConfiguration().
+ int ritThreshold = this.server.getConfiguration().
getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
for (RegionState state: regionStates.getRegionsInTransition().values()) {
totalRITs++;
@@ -2738,11 +2342,11 @@ public class AssignmentManager extends Z
RegionState rs = null;
// There is already a timeout monitor on regions in transition so I
// should not have to have one here too?
- while(!this.master.isStopped() && regionStates.isRegionInTransition(hri)) {
+ while(!this.server.isStopped() && regionStates.isRegionInTransition(hri)) {
LOG.info("Waiting on " + rs + " to clear regions-in-transition");
- regionStates.waitForUpdate(1000);
+ regionStates.waitForUpdate(100);
}
- if (this.master.isStopped()) {
+ if (this.server.isStopped()) {
LOG.info("Giving up wait on regions in " +
"transition because stoppable.isStopped is set");
}
@@ -2876,16 +2480,7 @@ public class AssignmentManager extends Z
case OPEN:
LOG.error("Region has been OPEN for too long, " +
"we don't know where region was opened so can't do anything");
- // TODO: do we need synchronization here?
- // could not synchronized on regionState since it can be
- // an new instance
- String encodedName = regionState.getRegion().getEncodedName();
- Lock lock = assignLocker.acquireLock(encodedName);
- try {
- regionState.updateTimestampToNow();
- } finally {
- lock.unlock();
- }
+ regionState.updateTimestampToNow();
break;
case PENDING_CLOSE:
@@ -2973,13 +2568,13 @@ public class AssignmentManager extends Z
public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
RegionTransition rt = null;
try {
- byte [] data = ZKAssign.getData(master.getZooKeeper(), hri.getEncodedName());
+ byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
// This call can legitimately come by null
rt = data == null? null: RegionTransition.parseFrom(data);
} catch (KeeperException e) {
- master.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
+ server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
} catch (DeserializationException e) {
- master.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
+ server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
}
ServerName addressFromZK = rt != null? rt.getServerName(): null;
@@ -3065,14 +2660,6 @@ public class AssignmentManager extends Z
}
/**
- * Check whether the RegionServer is online.
- * @param serverName
- * @return True if online.
- */
- public boolean isServerOnline(ServerName serverName) {
- return serverName != null && this.serverManager.isServerOnline(serverName);
- }
- /**
* Shutdown the threadpool executor service
*/
public void shutdown() {
@@ -3089,7 +2676,7 @@ public class AssignmentManager extends Z
String errorMsg = "Unable to ensure that the table " + tableName
+ " will be" + " enabled because of a ZooKeeper issue";
LOG.error(errorMsg);
- this.master.abort(errorMsg, e);
+ this.server.abort(errorMsg, e);
}
}
}