You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/06/14 17:40:38 UTC
[3/5] HBASE-11059 ZK-less region assignment
http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
index a3b4d74..fbd6e05 100644
--- a/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
+++ b/hbase-protocol/src/main/protobuf/RegionServerStatus.proto
@@ -78,6 +78,47 @@ message GetLastFlushedSequenceIdResponse {
required uint64 last_flushed_sequence_id = 1;
}
+message RegionTransition {
+ required TransitionCode transition_code = 1;
+
+ /** Mutliple regions are involved during merging/splitting */
+ repeated RegionInfo region_info = 2;
+
+ /** For newly opened region, the open seq num is needed */
+ optional uint64 open_seq_num = 3;
+
+ enum TransitionCode {
+ OPENED = 0;
+ FAILED_OPEN = 1;
+ /** No failed_close, in which case region server will abort */
+ CLOSED = 2;
+
+ /** Ask master for ok to split/merge region(s) */
+ READY_TO_SPLIT = 3;
+ READY_TO_MERGE = 4;
+
+ SPLIT_PONR = 5;
+ MERGE_PONR = 6;
+
+ SPLIT = 7;
+ MERGED = 8;
+ SPLIT_REVERTED = 9;
+ MERGE_REVERTED = 10;
+ }
+}
+
+message ReportRegionTransitionRequest {
+ /** This region server's server name */
+ required ServerName server = 1;
+
+ repeated RegionTransition transition = 2;
+}
+
+message ReportRegionTransitionResponse {
+ /** Error message if failed to update the region state */
+ optional string error_message = 1;
+}
+
service RegionServerStatusService {
/** Called when a region server first starts. */
rpc RegionServerStartup(RegionServerStartupRequest)
@@ -99,4 +140,12 @@ service RegionServerStatusService {
* log splitting. */
rpc GetLastFlushedSequenceId(GetLastFlushedSequenceIdRequest)
returns(GetLastFlushedSequenceIdResponse);
+
+ /**
+ * Called by a region server to report the progress of a region
+ * transition. If the request fails, the transition should
+ * be aborted.
+ */
+ rpc ReportRegionTransition(ReportRegionTransitionRequest)
+ returns(ReportRegionTransitionResponse);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
index adeeed4..ddab430 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
@@ -34,15 +34,18 @@ public class AssignCallable implements Callable<Object> {
private AssignmentManager assignmentManager;
private HRegionInfo hri;
+ private boolean newPlan;
- public AssignCallable(AssignmentManager assignmentManager, HRegionInfo hri) {
+ public AssignCallable(
+ AssignmentManager assignmentManager, HRegionInfo hri, boolean newPlan) {
this.assignmentManager = assignmentManager;
+ this.newPlan = newPlan;
this.hri = hri;
}
@Override
public Object call() throws Exception {
- assignmentManager.assign(hri, true, true);
+ assignmentManager.assign(hri, true, newPlan);
return null;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 597ddf9..73b06b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -44,6 +44,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CoordinatedStateException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -58,10 +60,10 @@ import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
-import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
+import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
+import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
@@ -76,13 +78,18 @@ import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
+import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
@@ -99,6 +106,7 @@ import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.LinkedHashMultimap;
/**
@@ -221,6 +229,13 @@ public class AssignmentManager extends ZooKeeperListener {
private final ConcurrentHashMap<String, AtomicInteger>
failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
+ // A flag to indicate if we are using ZK for region assignment
+ private final boolean useZKForAssignment;
+
+ // In case not using ZK for region assignment, region states
+ // are persisted in meta with a state store
+ private final RegionStateStore regionStateStore;
+
/**
* For testing only! Set to true to skip handling of split.
*/
@@ -250,6 +265,7 @@ public class AssignmentManager extends ZooKeeperListener {
this.serverManager = serverManager;
this.catalogTracker = catalogTracker;
this.executorService = service;
+ this.regionStateStore = new RegionStateStore(server);
this.regionsToReopen = Collections.synchronizedMap
(new HashMap<String, HRegionInfo> ());
Configuration conf = server.getConfiguration();
@@ -275,7 +291,7 @@ public class AssignmentManager extends ZooKeeperListener {
int maxThreads = conf.getInt("hbase.assignment.threads.max", 30);
this.threadPoolExecutorService = Threads.getBoundedCachedThreadPool(
maxThreads, 60L, TimeUnit.SECONDS, Threads.newDaemonThreadFactory("AM."));
- this.regionStates = new RegionStates(server, serverManager);
+ this.regionStates = new RegionStates(server, serverManager, regionStateStore);
this.bulkAssignWaitTillAllAssigned =
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
@@ -289,6 +305,7 @@ public class AssignmentManager extends ZooKeeperListener {
this.tableLockManager = tableLockManager;
this.metricsAssignmentManager = new MetricsAssignmentManager();
+ useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
}
/**
@@ -326,6 +343,14 @@ public class AssignmentManager extends ZooKeeperListener {
return regionStates;
}
+ /**
+ * Used in some tests to mock up region state in meta
+ */
+ @VisibleForTesting
+ RegionStateStore getRegionStateStore() {
+ return regionStateStore;
+ }
+
public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
}
@@ -422,6 +447,7 @@ public class AssignmentManager extends ZooKeeperListener {
*/
void joinCluster() throws IOException,
KeeperException, InterruptedException, CoordinatedStateException {
+ long startTime = System.currentTimeMillis();
// Concurrency note: In the below the accesses on regionsInTransition are
// outside of a synchronization block where usually all accesses to RIT are
// synchronized. The presumption is that in this case it is safe since this
@@ -438,10 +464,16 @@ public class AssignmentManager extends ZooKeeperListener {
// This method will assign all user regions if a clean server startup or
// it will reconstruct master state and cleanup any leftovers from
// previous master process.
- processDeadServersAndRegionsInTransition(deadServers);
+ boolean failover = processDeadServersAndRegionsInTransition(deadServers);
+ if (!useZKForAssignment) {
+ // Not use ZK for assignment any more, remove the ZNode
+ ZKUtil.deleteNodeFailSilent(watcher, watcher.assignmentZNode);
+ }
recoverTableInDisablingState();
recoverTableInEnablingState();
+ LOG.info("Joined the cluster in " + (System.currentTimeMillis()
+ - startTime) + "ms, failover=" + failover);
}
/**
@@ -455,43 +487,102 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException
* @throws InterruptedException
*/
- void processDeadServersAndRegionsInTransition(
+ boolean processDeadServersAndRegionsInTransition(
final Map<ServerName, List<HRegionInfo>> deadServers)
throws KeeperException, IOException, InterruptedException, CoordinatedStateException {
List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
watcher.assignmentZNode);
- if (nodes == null) {
+ if (useZKForAssignment && nodes == null) {
String errorMessage = "Failed to get the children from ZK";
server.abort(errorMessage, new IOException(errorMessage));
- return;
+ return true; // Doesn't matter in this case
}
- boolean failover = (!serverManager.getDeadServers().isEmpty() || !serverManager
- .getRequeuedDeadServers().isEmpty());
-
- if (!failover) {
+ boolean failover = !serverManager.getDeadServers().isEmpty();
+ if (failover) {
+ // This may not be a failover actually, especially if meta is on this master.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Found dead servers out on cluster " + serverManager.getDeadServers());
+ }
+ } else {
// If any one region except meta is assigned, it's a failover.
- Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
- for (HRegionInfo hri: regions.keySet()) {
+ for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) {
if (!hri.isMetaTable()) {
LOG.debug("Found " + hri + " out on cluster");
failover = true;
break;
}
}
- if (!failover) {
- // If any one region except meta is in transition, it's a failover.
- for (String encodedName: nodes) {
- RegionState state = regionStates.getRegionState(encodedName);
- if (state != null && !state.getRegion().isMetaRegion()) {
- LOG.debug("Found " + state.getRegion().getRegionNameAsString() + " in RITs");
+ }
+ if (!failover && nodes != null) {
+ // If any one region except meta is in transition, it's a failover.
+ for (String encodedName: nodes) {
+ RegionState regionState = regionStates.getRegionState(encodedName);
+ if (regionState != null && !regionState.getRegion().isMetaRegion()) {
+ LOG.debug("Found " + regionState + " in RITs");
+ failover = true;
+ break;
+ }
+ }
+ }
+ if (!failover && !useZKForAssignment) {
+ // If any region except meta is in transition on a live server, it's a failover.
+ Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+ if (!regionsInTransition.isEmpty()) {
+ Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+ for (RegionState regionState: regionsInTransition.values()) {
+ if (!regionState.getRegion().isMetaRegion()
+ && onlineServers.contains(regionState.getServerName())) {
+ LOG.debug("Found " + regionState + " in RITs");
failover = true;
break;
}
}
}
}
+ if (!failover) {
+ // If we get here, we have a full cluster restart. It is a failover only
+ // if there are some HLogs are not split yet. For meta HLogs, they should have
+ // been split already, if any. We can walk through those queued dead servers,
+ // if they don't have any HLogs, this restart should be considered as a clean one
+ Set<ServerName> queuedDeadServers = serverManager.getRequeuedDeadServers().keySet();
+ if (!queuedDeadServers.isEmpty()) {
+ Configuration conf = server.getConfiguration();
+ Path rootdir = FSUtils.getRootDir(conf);
+ FileSystem fs = rootdir.getFileSystem(conf);
+ for (ServerName serverName: queuedDeadServers) {
+ Path logDir = new Path(rootdir, HLogUtil.getHLogDirectoryName(serverName.toString()));
+ Path splitDir = logDir.suffix(HLog.SPLITTING_EXT);
+ if (fs.exists(logDir) || fs.exists(splitDir)) {
+ LOG.debug("Found queued dead server " + serverName);
+ failover = true;
+ break;
+ }
+ }
+ if (!failover) {
+ // We figured that it's not a failover, so no need to
+ // work on these re-queued dead servers any more.
+ LOG.info("AM figured that it's not a failover and cleaned up "
+ + queuedDeadServers.size() + " queued dead servers");
+ serverManager.removeRequeuedDeadServers();
+ }
+ }
+ }
+
+ Set<TableName> disabledOrDisablingOrEnabling = null;
+
+ if (!failover) {
+ disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING,
+ ZooKeeperProtos.Table.State.ENABLING);
+
+ // Clean re/start, mark all user regions closed before reassignment
+ regionStates.closeAllUserRegions(disabledOrDisablingOrEnabling);
+ }
+
+ // Now region states are restored
+ regionStateStore.start();
// If we found user regions out on cluster, its a failover.
if (failover) {
@@ -499,11 +590,26 @@ public class AssignmentManager extends ZooKeeperListener {
// Process list of dead servers and regions in RIT.
// See HBASE-4580 for more information.
processDeadServersAndRecoverLostRegions(deadServers);
- } else {
+ }
+
+ if (!failover && useZKForAssignment) {
+ // Cleanup any existing ZK nodes and start watching
+ ZKAssign.deleteAllNodes(watcher);
+ ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
+ this.watcher.assignmentZNode);
+ }
+
+ // Now we can safely claim failover cleanup completed and enable
+ // ServerShutdownHandler for further processing. The nodes (below)
+ // in transition, if any, are for regions not related to those
+ // dead servers at all, and can be done in parallel to SSH.
+ failoverCleanupDone();
+ if (!failover) {
// Fresh cluster startup.
- LOG.info("Clean cluster startup. Assigning userregions");
- assignAllUserRegions();
+ LOG.info("Clean cluster startup. Assigning user regions");
+ assignAllUserRegions(disabledOrDisablingOrEnabling);
}
+ return failover;
}
/**
@@ -670,7 +776,7 @@ public class AssignmentManager extends ZooKeeperListener {
try {
final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
.getVersion();
- unassign(regionInfo, rsClosing, expectedVersion, null, true, null);
+ unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
if (regionStates.isRegionOffline(regionInfo)) {
assign(regionInfo, true);
}
@@ -1273,6 +1379,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
+
/**
* Marks the region as online. Removes it from regions in transition and
* updates the in-memory assignment information.
@@ -1282,8 +1389,12 @@ public class AssignmentManager extends ZooKeeperListener {
* @param sn
*/
void regionOnline(HRegionInfo regionInfo, ServerName sn) {
+ regionOnline(regionInfo, sn, HConstants.NO_SEQNUM);
+ }
+
+ void regionOnline(HRegionInfo regionInfo, ServerName sn, long openSeqNum) {
numRegionsOpened.incrementAndGet();
- regionStates.regionOnline(regionInfo, sn);
+ regionStates.regionOnline(regionInfo, sn, openSeqNum);
// Remove plan if one.
clearRegionPlan(regionInfo);
@@ -1354,13 +1465,15 @@ public class AssignmentManager extends ZooKeeperListener {
}
public void offlineDisabledRegion(HRegionInfo regionInfo) {
- // Disabling so should not be reassigned, just delete the CLOSED node
- LOG.debug("Table being disabled so deleting ZK node and removing from " +
- "regions in transition, skipping assignment of region " +
- regionInfo.getRegionNameAsString());
- String encodedName = regionInfo.getEncodedName();
- deleteNodeInStates(encodedName, "closed", null,
- EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
+ if (useZKForAssignment) {
+ // Disabling so should not be reassigned, just delete the CLOSED node
+ LOG.debug("Table being disabled so deleting ZK node and removing from " +
+ "regions in transition, skipping assignment of region " +
+ regionInfo.getRegionNameAsString());
+ String encodedName = regionInfo.getEncodedName();
+ deleteNodeInStates(encodedName, "closed", null,
+ EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
+ }
regionOffline(regionInfo);
}
@@ -1412,7 +1525,7 @@ public class AssignmentManager extends ZooKeeperListener {
+ " is dead but not processed yet");
return;
}
- assign(state, setOfflineInZK, forceNewPlan);
+ assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
}
} finally {
lock.unlock();
@@ -1459,7 +1572,8 @@ public class AssignmentManager extends ZooKeeperListener {
+ ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+ " is dead but not processed yet");
onDeadServer = true;
- } else if (asyncSetOfflineInZooKeeper(state, cb, destination)) {
+ } else if (!useZKForAssignment
+ || asyncSetOfflineInZooKeeper(state, cb, destination)) {
RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
plans.put(encodedName, plan);
states.add(state);
@@ -1479,17 +1593,19 @@ public class AssignmentManager extends ZooKeeperListener {
lock.unlock();
}
- // Wait until all unassigned nodes have been put up and watchers set.
- int total = states.size();
- for (int oldCounter = 0; !server.isStopped();) {
- int count = counter.get();
- if (oldCounter != count) {
- LOG.debug(destination.toString() + " unassigned znodes=" + count +
- " of total=" + total + "; oldCounter=" + oldCounter);
- oldCounter = count;
+ if (useZKForAssignment) {
+ // Wait until all unassigned nodes have been put up and watchers set.
+ int total = states.size();
+ for (int oldCounter = 0; !server.isStopped();) {
+ int count = counter.get();
+ if (oldCounter != count) {
+ LOG.debug(destination.toString() + " unassigned znodes=" + count +
+ " of total=" + total + "; oldCounter=" + oldCounter);
+ oldCounter = count;
+ }
+ if (count >= total) break;
+ Thread.sleep(5);
}
- if (count >= total) break;
- Thread.sleep(5);
}
if (server.isStopped()) {
@@ -1506,7 +1622,7 @@ public class AssignmentManager extends ZooKeeperListener {
HRegionInfo region = state.getRegion();
String encodedRegionName = region.getEncodedName();
Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
- if (nodeVersion == null || nodeVersion == -1) {
+ if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
LOG.warn("failed to offline in zookeeper: " + region);
failedToOpenRegions.add(region); // assign individually later
Lock lock = locks.remove(encodedRegionName);
@@ -1659,11 +1775,11 @@ public class AssignmentManager extends ZooKeeperListener {
versionOfClosingNode, dest, transitionInZK)) {
LOG.debug("Sent CLOSE to " + server + " for region " +
region.getRegionNameAsString());
- if (!transitionInZK && state != null) {
+ if (useZKForAssignment && !transitionInZK && state != null) {
// Retry to make sure the region is
// closed so as to avoid double assignment.
unassign(region, state, versionOfClosingNode,
- dest, transitionInZK,src);
+ dest, transitionInZK, src);
}
return;
}
@@ -1787,10 +1903,14 @@ public class AssignmentManager extends ZooKeeperListener {
// yet, we can move on only if the meta shows the
// region is not on this server actually, or on a server
// not dead, or dead and processed already.
- if (regionStates.isServerDeadAndNotProcessed(sn)
+ // In case not using ZK, we don't need this check because
+ // we have the latest info in memory, and the caller
+ // will do another round checking any way.
+ if (useZKForAssignment
+ && regionStates.isServerDeadAndNotProcessed(sn)
&& wasRegionOnDeadServerByMeta(region, sn)) {
LOG.info("Skip assigning " + region.getRegionNameAsString()
- + ", it is on a dead but not processed yet server");
+ + ", it is on a dead but not processed yet server: " + sn);
return null;
}
case CLOSED:
@@ -1813,9 +1933,9 @@ public class AssignmentManager extends ZooKeeperListener {
while (!server.isStopped()) {
try {
catalogTracker.waitForMeta();
- Pair<HRegionInfo, ServerName> r =
- MetaReader.getRegion(catalogTracker, region.getRegionName());
- ServerName server = r == null ? null : r.getSecond();
+ Result r = MetaReader.getRegionResult(catalogTracker, region.getRegionName());
+ if (r == null || r.isEmpty()) return false;
+ ServerName server = HRegionInfo.getServerName(r);
return regionStates.isServerDeadAndNotProcessed(server);
} catch (IOException ioe) {
LOG.info("Received exception accessing hbase:meta during force assign "
@@ -2263,14 +2383,16 @@ public class AssignmentManager extends ZooKeeperListener {
regionOffline(region);
return;
}
- versionOfClosingNode = ZKAssign.createNodeClosing(
- watcher, region, state.getServerName());
- if (versionOfClosingNode == -1) {
- LOG.info("Attempting to unassign " +
- region.getRegionNameAsString() + " but ZK closing node "
- + "can't be created.");
- reassign = false; // not unassigned at all
- return;
+ if (useZKForAssignment) {
+ versionOfClosingNode = ZKAssign.createNodeClosing(
+ watcher, region, state.getServerName());
+ if (versionOfClosingNode == -1) {
+ LOG.info("Attempting to unassign " +
+ region.getRegionNameAsString() + " but ZK closing node "
+ + "can't be created.");
+ reassign = false; // not unassigned at all
+ return;
+ }
}
} catch (KeeperException e) {
if (e instanceof NodeExistsException) {
@@ -2323,7 +2445,7 @@ public class AssignmentManager extends ZooKeeperListener {
return;
}
- unassign(region, state, versionOfClosingNode, dest, true, null);
+ unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
} finally {
lock.unlock();
@@ -2527,29 +2649,20 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws IOException
* @throws KeeperException
*/
- private void assignAllUserRegions()
+ private void assignAllUserRegions(Set<TableName> disabledOrDisablingOrEnabling)
throws IOException, InterruptedException, KeeperException, CoordinatedStateException {
- // Cleanup any existing ZK nodes and start watching
- ZKAssign.deleteAllNodes(watcher);
- ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
- this.watcher.assignmentZNode);
- failoverCleanupDone();
-
// Skip assignment for regions of tables in DISABLING state because during clean cluster startup
// no RS is alive and regions map also doesn't have any information about the regions.
// See HBASE-6281.
- Set<TableName> disabledOrDisablingOrEnabling = tableStateManager.getTablesInStates(
- ZooKeeperProtos.Table.State.DISABLED,
- ZooKeeperProtos.Table.State.DISABLING,
- ZooKeeperProtos.Table.State.ENABLING);
-
// Scan hbase:meta for all user regions, skipping any disabled tables
Map<HRegionInfo, ServerName> allRegions;
SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment =
new SnapshotOfRegionAssignmentFromMeta(catalogTracker, disabledOrDisablingOrEnabling, true);
snapshotOfRegionAssignment.initialize();
allRegions = snapshotOfRegionAssignment.getRegionToRegionServerMap();
- if (allRegions == null || allRegions.isEmpty()) return;
+ if (allRegions == null || allRegions.isEmpty()) {
+ return;
+ }
// Determine what type of assignment to do on startup
boolean retainAssignment = server.getConfiguration().
@@ -2606,8 +2719,6 @@ public class AssignmentManager extends ZooKeeperListener {
*/
Map<ServerName, List<HRegionInfo>> rebuildUserRegions() throws
IOException, KeeperException, CoordinatedStateException {
- Set<TableName> enablingTables = tableStateManager.getTablesInStates(
- ZooKeeperProtos.Table.State.ENABLING);
Set<TableName> disabledOrEnablingTables = tableStateManager.getTablesInStates(
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.ENABLING);
@@ -2625,66 +2736,39 @@ public class AssignmentManager extends ZooKeeperListener {
new TreeMap<ServerName, List<HRegionInfo>>();
// Iterate regions in META
for (Result result : results) {
- Pair<HRegionInfo, ServerName> region = HRegionInfo.getHRegionInfoAndServerName(result);
- if (region == null) continue;
- HRegionInfo regionInfo = region.getFirst();
- ServerName regionLocation = region.getSecond();
+ HRegionInfo regionInfo = HRegionInfo.getHRegionInfo(result);
if (regionInfo == null) continue;
- regionStates.createRegionState(regionInfo);
- if (regionStates.isRegionInState(regionInfo, State.SPLIT)) {
- // Split is considered to be completed. If the split znode still
- // exists, the region will be put back to SPLITTING state later
- LOG.debug("Region " + regionInfo.getRegionNameAsString()
- + " split is completed. Hence need not add to regions list");
+ State state = RegionStateStore.getRegionState(result);
+ ServerName regionLocation = RegionStateStore.getRegionServer(result);
+ regionStates.createRegionState(regionInfo, state, regionLocation);
+ if (!regionStates.isRegionInState(regionInfo, State.OPEN)) {
+ // Region is not open (either offline or in transition), skip
continue;
}
TableName tableName = regionInfo.getTable();
- if (regionLocation == null) {
- // regionLocation could be null if createTable didn't finish properly.
- // When createTable is in progress, HMaster restarts.
- // Some regions have been added to hbase:meta, but have not been assigned.
- // When this happens, the region's table must be in ENABLING state.
- // It can't be in ENABLED state as that is set when all regions are
- // assigned.
- // It can't be in DISABLING state, because DISABLING state transitions
- // from ENABLED state when application calls disableTable.
- // It can't be in DISABLED state, because DISABLED states transitions
- // from DISABLING state.
- if (!enablingTables.contains(tableName)) {
- LOG.warn("Region " + regionInfo.getEncodedName() +
- " has null regionLocation." + " But its table " + tableName +
- " isn't in ENABLING state.");
- }
- } else if (!onlineServers.contains(regionLocation)) {
+ if (!onlineServers.contains(regionLocation)) {
// Region is located on a server that isn't online
List<HRegionInfo> offlineRegions = offlineServers.get(regionLocation);
if (offlineRegions == null) {
offlineRegions = new ArrayList<HRegionInfo>(1);
offlineServers.put(regionLocation, offlineRegions);
}
+ regionStates.regionOffline(regionInfo);
offlineRegions.add(regionInfo);
- // need to enable the table if not disabled or disabling or enabling
- // this will be used in rolling restarts
- if (!disabledOrDisablingOrEnabling.contains(tableName)
- && !getTableStateManager().isTableState(tableName,
- ZooKeeperProtos.Table.State.ENABLED)) {
- setEnabledTable(tableName);
- }
- } else {
+ } else if (!disabledOrEnablingTables.contains(tableName)) {
// Region is being served and on an active server
// add only if region not in disabled or enabling table
- if (!disabledOrEnablingTables.contains(tableName)) {
- regionStates.updateRegionState(regionInfo, State.OPEN, regionLocation);
- regionStates.regionOnline(regionInfo, regionLocation);
- balancer.regionOnline(regionInfo, regionLocation);
- }
- // need to enable the table if not disabled or disabling or enabling
- // this will be used in rolling restarts
- if (!disabledOrDisablingOrEnabling.contains(tableName)
- && !getTableStateManager().isTableState(tableName,
- ZooKeeperProtos.Table.State.ENABLED)) {
- setEnabledTable(tableName);
- }
+ regionStates.regionOnline(regionInfo, regionLocation);
+ balancer.regionOnline(regionInfo, regionLocation);
+ } else if (useZKForAssignment) {
+ regionStates.regionOffline(regionInfo);
+ }
+ // need to enable the table if not disabled or disabling or enabling
+ // this will be used in rolling restarts
+ if (!disabledOrDisablingOrEnabling.contains(tableName)
+ && !getTableStateManager().isTableState(tableName,
+ ZooKeeperProtos.Table.State.ENABLED)) {
+ setEnabledTable(tableName);
}
}
return offlineServers;
@@ -2775,19 +2859,152 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
- List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(
- this.watcher, this.watcher.assignmentZNode);
- if (!nodes.isEmpty()) {
+
+ List<String> nodes = useZKForAssignment ?
+ ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
+ : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
+ if (nodes != null && !nodes.isEmpty()) {
for (String encodedRegionName : nodes) {
processRegionInTransition(encodedRegionName, null);
}
+ } else if (!useZKForAssignment) {
+ // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
+ // in case the RPC call is not sent out yet before the master was shut down
+ // since we update the state before we send the RPC call. We can't update
+ // the state after the RPC call. Otherwise, we don't know what's happened
+ // to the region if the master dies right after the RPC call is out.
+ Map<String, RegionState> rits = regionStates.getRegionsInTransition();
+ for (RegionState regionState: rits.values()) {
+ if (!serverManager.isServerOnline(regionState.getServerName())) {
+ continue; // SSH will handle it
+ }
+ State state = regionState.getState();
+ LOG.info("Processing " + regionState);
+ switch (state) {
+ case PENDING_OPEN:
+ retrySendRegionOpen(regionState);
+ break;
+ case PENDING_CLOSE:
+ retrySendRegionClose(regionState);
+ break;
+ default:
+ // No process for other states
+ }
+ }
}
+ }
- // Now we can safely claim failover cleanup completed and enable
- // ServerShutdownHandler for further processing. The nodes (below)
- // in transition, if any, are for regions not related to those
- // dead servers at all, and can be done in parallel to SSH.
- failoverCleanupDone();
+ /**
+ * At master failover, for pending_open region, make sure
+ * sendRegionOpen RPC call is sent to the target regionserver
+ */
+ private void retrySendRegionOpen(final RegionState regionState) {
+ this.executorService.submit(
+ new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+ @Override
+ public void process() throws IOException {
+ HRegionInfo hri = regionState.getRegion();
+ ServerName serverName = regionState.getServerName();
+ ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+ try {
+ if (!regionState.equals(regionStates.getRegionState(hri))) {
+ return; // Region is not in the expected state any more
+ }
+ while (serverManager.isServerOnline(serverName)
+ && !server.isStopped() && !server.isAborted()) {
+ try {
+ List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+ if (shouldAssignRegionsWithFavoredNodes) {
+ favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
+ }
+ RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
+ serverName, hri, -1, favoredNodes);
+
+ if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
+ // Failed opening this region, this means the target server didn't get
+ // the original region open RPC, so re-assign it with a new plan
+ LOG.debug("Got failed_opening in retry sendRegionOpen for "
+ + regionState + ", re-assign it");
+ invokeAssign(hri, true);
+ }
+ return; // Done.
+ } catch (Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException) t).unwrapRemoteException();
+ }
+ // In case SocketTimeoutException/FailedServerException, retry
+ if (t instanceof java.net.SocketTimeoutException
+ || t instanceof FailedServerException) {
+ Threads.sleep(100);
+ continue;
+ }
+ // For other exceptions, re-assign it
+ LOG.debug("Got exception in retry sendRegionOpen for "
+ + regionState + ", re-assign it", t);
+ invokeAssign(hri);
+ return; // Done.
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+ }
+
+ /**
+ * At master failover, for pending_close region, make sure
+ * sendRegionClose RPC call is sent to the target regionserver
+ */
+ private void retrySendRegionClose(final RegionState regionState) {
+ this.executorService.submit(
+ new EventHandler(server, EventType.M_MASTER_RECOVERY) {
+ @Override
+ public void process() throws IOException {
+ HRegionInfo hri = regionState.getRegion();
+ ServerName serverName = regionState.getServerName();
+ ReentrantLock lock = locker.acquireLock(hri.getEncodedName());
+ try {
+ if (!regionState.equals(regionStates.getRegionState(hri))) {
+ return; // Region is not in the expected state any more
+ }
+ while (serverManager.isServerOnline(serverName)
+ && !server.isStopped() && !server.isAborted()) {
+ try {
+ if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
+ // This means the region is still on the target server
+ LOG.debug("Got false in retry sendRegionClose for "
+ + regionState + ", re-close it");
+ invokeUnAssign(hri);
+ }
+ return; // Done.
+ } catch (Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException) t).unwrapRemoteException();
+ }
+ // In case SocketTimeoutException/FailedServerException, retry
+ if (t instanceof java.net.SocketTimeoutException
+ || t instanceof FailedServerException) {
+ Threads.sleep(100);
+ continue;
+ }
+ if (!(t instanceof NotServingRegionException
+ || t instanceof RegionAlreadyInTransitionException)) {
+ // NotServingRegionException/RegionAlreadyInTransitionException
+ // means the target server got the original region close request.
+ // For other exceptions, re-close it
+ LOG.debug("Got exception in retry sendRegionClose for "
+ + regionState + ", re-close it", t);
+ invokeUnAssign(hri);
+ }
+ return; // Done.
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
}
/**
@@ -2871,7 +3088,15 @@ public class AssignmentManager extends ZooKeeperListener {
}
void invokeAssign(HRegionInfo regionInfo) {
- threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
+ invokeAssign(regionInfo, true);
+ }
+
+ void invokeAssign(HRegionInfo regionInfo, boolean newPlan) {
+ threadPoolExecutorService.submit(new AssignCallable(this, regionInfo, newPlan));
+ }
+
+ void invokeUnAssign(HRegionInfo regionInfo) {
+ threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
public boolean isCarryingMeta(ServerName serverName) {
@@ -3024,6 +3249,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
threadPoolExecutorService.shutdownNow();
zkEventWorkers.shutdownNow();
+ regionStateStore.stop();
}
protected void setEnabledTable(TableName tableName) {
@@ -3098,6 +3324,180 @@ public class AssignmentManager extends ZooKeeperListener {
EventType.RS_ZK_REQUEST_REGION_SPLIT, EventType.RS_ZK_REGION_SPLIT);
}
+ private void onRegionFailedOpen(
+ final HRegionInfo hri, final ServerName sn) {
+ String encodedName = hri.getEncodedName();
+ AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
+ if (failedOpenCount == null) {
+ failedOpenCount = new AtomicInteger();
+ // No need to use putIfAbsent, or extra synchronization since
+ // this whole handleRegion block is locked on the encoded region
+ // name, and failedOpenTracker is updated only in this block
+ failedOpenTracker.put(encodedName, failedOpenCount);
+ }
+ if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
+ regionStates.updateRegionState(hri, State.FAILED_OPEN);
+ // remove the tracking info to save memory, also reset
+ // the count for next open initiative
+ failedOpenTracker.remove(encodedName);
+ } else {
+ // Handle this the same as if it were opened and then closed.
+ RegionState regionState = regionStates.updateRegionState(hri, State.CLOSED);
+ if (regionState != null) {
+ // When there are more than one region server a new RS is selected as the
+ // destination and the same is updated in the region plan. (HBASE-5546)
+ if (getTableStateManager().isTableState(hri.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ offlineDisabledRegion(hri);
+ return;
+ }
+ // ZK Node is in CLOSED state, assign it.
+ regionStates.updateRegionState(hri, RegionState.State.CLOSED);
+ // This below has to do w/ online enable/disable of a table
+ removeClosedRegion(hri);
+ try {
+ getRegionPlan(hri, sn, true);
+ } catch (HBaseIOException e) {
+ LOG.warn("Failed to get region plan", e);
+ }
+ invokeAssign(hri, false);
+ }
+ }
+ }
+
+ private void onRegionOpen(
+ final HRegionInfo hri, final ServerName sn, long openSeqNum) {
+ regionOnline(hri, sn, openSeqNum);
+ if (useZKForAssignment) {
+ try {
+ // Delete the ZNode if exists
+ ZKAssign.deleteNodeFailSilent(watcher, hri);
+ } catch (KeeperException ke) {
+ server.abort("Unexpected ZK exception deleting node " + hri, ke);
+ }
+ }
+
+ // reset the count, if any
+ failedOpenTracker.remove(hri.getEncodedName());
+ if (getTableStateManager().isTableState(hri.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ invokeUnAssign(hri);
+ }
+ }
+
+ private void onRegionClosed(final HRegionInfo hri) {
+ if (getTableStateManager().isTableState(hri.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ offlineDisabledRegion(hri);
+ return;
+ }
+ regionStates.updateRegionState(hri, RegionState.State.CLOSED);
+ sendRegionClosedNotification(hri);
+ // This below has to do w/ online enable/disable of a table
+ removeClosedRegion(hri);
+ invokeAssign(hri, false);
+ }
+
+ private String onRegionSplit(ServerName sn, TransitionCode code,
+ HRegionInfo p, HRegionInfo a, HRegionInfo b) {
+ RegionState rs_p = regionStates.getRegionState(p);
+ RegionState rs_a = regionStates.getRegionState(a);
+ RegionState rs_b = regionStates.getRegionState(b);
+ if (!(rs_p.isOpenOrSplittingOnServer(sn)
+ && (rs_a == null || rs_a.isOpenOrSplittingNewOnServer(sn))
+ && (rs_b == null || rs_b.isOpenOrSplittingNewOnServer(sn)))) {
+ return "Not in state good for split";
+ }
+
+ regionStates.updateRegionState(a, State.SPLITTING_NEW, sn);
+ regionStates.updateRegionState(b, State.SPLITTING_NEW, sn);
+ regionStates.updateRegionState(p, State.SPLITTING);
+
+ if (code == TransitionCode.SPLIT) {
+ if (TEST_SKIP_SPLIT_HANDLING) {
+ return "Skipping split message, TEST_SKIP_SPLIT_HANDLING is set";
+ }
+ regionOffline(p, State.SPLIT);
+ regionOnline(a, sn, 1);
+ regionOnline(b, sn, 1);
+
+ // User could disable the table before master knows the new region.
+ if (getTableStateManager().isTableState(p.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ invokeUnAssign(a);
+ invokeUnAssign(b);
+ }
+ } else if (code == TransitionCode.SPLIT_PONR) {
+ try {
+ regionStateStore.splitRegion(p, a, b, sn);
+ } catch (IOException ioe) {
+ LOG.info("Failed to record split region " + p.getShortNameToLog());
+ return "Failed to record the splitting in meta";
+ }
+ } else if (code == TransitionCode.SPLIT_REVERTED) {
+ regionOnline(p, sn);
+ regionOffline(a);
+ regionOffline(b);
+
+ if (getTableStateManager().isTableState(p.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ invokeUnAssign(p);
+ }
+ }
+ return null;
+ }
+
+ private String onRegionMerge(ServerName sn, TransitionCode code,
+ HRegionInfo p, HRegionInfo a, HRegionInfo b) {
+ RegionState rs_p = regionStates.getRegionState(p);
+ RegionState rs_a = regionStates.getRegionState(a);
+ RegionState rs_b = regionStates.getRegionState(b);
+ if (!(rs_a.isOpenOrMergingOnServer(sn) && rs_b.isOpenOrMergingOnServer(sn)
+ && (rs_p == null || rs_p.isOpenOrMergingNewOnServer(sn)))) {
+ return "Not in state good for merge";
+ }
+
+ regionStates.updateRegionState(a, State.MERGING);
+ regionStates.updateRegionState(b, State.MERGING);
+ regionStates.updateRegionState(p, State.MERGING_NEW, sn);
+
+ String encodedName = p.getEncodedName();
+ if (code == TransitionCode.READY_TO_MERGE) {
+ mergingRegions.put(encodedName,
+ new PairOfSameType<HRegionInfo>(a, b));
+ } else if (code == TransitionCode.MERGED) {
+ mergingRegions.remove(encodedName);
+ regionOffline(a, State.MERGED);
+ regionOffline(b, State.MERGED);
+ regionOnline(p, sn, 1);
+
+ // User could disable the table before master knows the new region.
+ if (getTableStateManager().isTableState(p.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ invokeUnAssign(p);
+ }
+ } else if (code == TransitionCode.MERGE_PONR) {
+ try {
+ regionStateStore.mergeRegions(p, a, b, sn);
+ } catch (IOException ioe) {
+ LOG.info("Failed to record merged region " + p.getShortNameToLog());
+ return "Failed to record the merging in meta";
+ }
+ } else {
+ mergingRegions.remove(encodedName);
+ regionOnline(a, sn);
+ regionOnline(b, sn);
+ regionOffline(p);
+
+ if (getTableStateManager().isTableState(p.getTable(),
+ ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
+ invokeUnAssign(a);
+ invokeUnAssign(b);
+ }
+ }
+ return null;
+ }
+
/**
* A helper to handle region merging transition event.
* It transitions merging regions to MERGING state.
@@ -3167,7 +3567,6 @@ public class AssignmentManager extends ZooKeeperListener {
regionStates.updateRegionState(p, State.MERGING_NEW, sn);
if (et != EventType.RS_ZK_REGION_MERGED) {
- regionStates.regionOffline(p, State.MERGING_NEW);
this.mergingRegions.put(encodedName,
new PairOfSameType<HRegionInfo>(hri_a, hri_b));
} else {
@@ -3288,8 +3687,6 @@ public class AssignmentManager extends ZooKeeperListener {
synchronized (regionStates) {
regionStates.updateRegionState(hri_a, State.SPLITTING_NEW, sn);
regionStates.updateRegionState(hri_b, State.SPLITTING_NEW, sn);
- regionStates.regionOffline(hri_a, State.SPLITTING_NEW);
- regionStates.regionOffline(hri_b, State.SPLITTING_NEW);
regionStates.updateRegionState(rt, State.SPLITTING);
// The below is for testing ONLY! We can't do fault injection easily, so
@@ -3373,6 +3770,121 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
+ * Try to update some region states. If the state machine prevents
+ * such update, an error message is returned to explain the reason.
+ *
+ * It's expected that in each transition there should have just one
+ * region for opening/closing, 3 regions for splitting/merging.
+ * These regions should be on the server that requested the change.
+ *
+ * Region state machine. Only these transitions
+ * are expected to be triggered by a region server.
+ *
+ * On the state transition:
+ * (1) Open/Close should be initiated by master
+ * (a) Master sets the region to pending_open/pending_close
+ * in memory and hbase:meta after sending the request
+ * to the region server
+ * (b) Region server reports back to the master
+ * after open/close is done (either success/failure)
+ * (c) If region server has problem to report the status
+ * to master, it must be because the master is down or some
+ * temporary network issue. Otherwise, the region server should
+ * abort since it must be a bug. If the master is not accessible,
+ * the region server should keep trying until the server is
+ * stopped or till the status is reported to the (new) master
+ * (d) If region server dies in the middle of opening/closing
+ * a region, SSH picks it up and finishes it
+ * (e) If master dies in the middle, the new master recovers
+ * the state during initialization from hbase:meta. Region server
+ * can report any transition that has not been reported to
+ * the previous active master yet
+ * (2) Split/merge is initiated by region servers
+ * (a) To split a region, a region server sends a request
+ * to master to try to set a region to splitting, together with
+ * two daughters (to be created) to splitting new. If approved
+ * by the master, the splitting can then move ahead
+ * (b) To merge two regions, a region server sends a request to
+ * master to try to set the new merged region (to be created) to
+ * merging_new, together with two regions (to be merged) to merging.
+ * If it is ok with the master, the merge can then move ahead
+ * (c) Once the splitting/merging is done, the region server
+ * reports the status back to the master either success/failure.
+ * (d) Other scenarios should be handled similarly as for
+ * region open/close
+ */
+ protected String onRegionTransition(final ServerName serverName,
+ final RegionServerStatusProtos.RegionTransition transition) {
+ TransitionCode code = transition.getTransitionCode();
+ HRegionInfo hri = HRegionInfo.convert(transition.getRegionInfo(0));
+ RegionState current = regionStates.getRegionState(hri);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got transition " + code + " for "
+ + (current != null ? current.toString() : hri.getShortNameToLog())
+ + " from " + serverName);
+ }
+ String errorMsg = null;
+ switch (code) {
+ case OPENED:
+ case FAILED_OPEN:
+ if (current == null
+ || !current.isPendingOpenOrOpeningOnServer(serverName)) {
+ errorMsg = hri.getShortNameToLog()
+ + " is not pending open on " + serverName;
+ } else if (code == TransitionCode.FAILED_OPEN) {
+ onRegionFailedOpen(hri, serverName);
+ } else {
+ long openSeqNum = HConstants.NO_SEQNUM;
+ if (transition.hasOpenSeqNum()) {
+ openSeqNum = transition.getOpenSeqNum();
+ }
+ if (openSeqNum < 0) {
+ errorMsg = "Newly opened region has invalid open seq num " + openSeqNum;
+ } else {
+ onRegionOpen(hri, serverName, openSeqNum);
+ }
+ }
+ break;
+
+ case CLOSED:
+ if (current == null
+ || !current.isPendingCloseOrClosingOnServer(serverName)) {
+ errorMsg = hri.getShortNameToLog()
+ + " is not pending close on " + serverName;
+ } else {
+ onRegionClosed(hri);
+ }
+ break;
+
+ case READY_TO_SPLIT:
+ case SPLIT_PONR:
+ case SPLIT:
+ case SPLIT_REVERTED:
+ errorMsg = onRegionSplit(serverName, code, hri,
+ HRegionInfo.convert(transition.getRegionInfo(1)),
+ HRegionInfo.convert(transition.getRegionInfo(2)));
+ break;
+
+ case READY_TO_MERGE:
+ case MERGE_PONR:
+ case MERGED:
+ case MERGE_REVERTED:
+ errorMsg = onRegionMerge(serverName, code, hri,
+ HRegionInfo.convert(transition.getRegionInfo(1)),
+ HRegionInfo.convert(transition.getRegionInfo(2)));
+ break;
+
+ default:
+ errorMsg = "Unexpected transition code " + code;
+ }
+ if (errorMsg != null) {
+ LOG.error("Failed to transtion region from " + current + " to "
+ + code + " by " + serverName + ": " + errorMsg);
+ }
+ return errorMsg;
+ }
+
+ /**
* @return Instance of load balancer
*/
public LoadBalancer getBalancer() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
index 1f43208..4de9d33 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -46,9 +47,7 @@ import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.*;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.ProcedureDescription;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
@@ -143,8 +142,11 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionTransition;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionTransitionResponse;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.snapshot.ClientSnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
@@ -1240,4 +1242,31 @@ public class MasterRpcServices extends RSRpcServices
throw new ServiceException(ioe);
}
}
+
+ @Override
+ public ReportRegionTransitionResponse reportRegionTransition(RpcController controller,
+ ReportRegionTransitionRequest req) throws ServiceException {
+ try {
+ master.checkServiceStarted();
+ RegionTransition rt = req.getTransition(0);
+ TableName tableName = ProtobufUtil.toTableName(
+ rt.getRegionInfo(0).getTableName());
+ if (!TableName.META_TABLE_NAME.equals(tableName)
+ && !master.assignmentManager.isFailoverCleanupDone()) {
+ // Meta region is assigned before master finishes the
+ // failover cleanup. So no need this check for it
+ throw new PleaseHoldException("Master is rebuilding user regions");
+ }
+ ServerName sn = ProtobufUtil.toServerName(req.getServer());
+ String error = master.assignmentManager.onRegionTransition(sn, rt);
+ ReportRegionTransitionResponse.Builder rrtr =
+ ReportRegionTransitionResponse.newBuilder();
+ if (error != null) {
+ rrtr.setErrorMessage(error);
+ }
+ return rrtr.build();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
new file mode 100644
index 0000000..e0f07ff
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStateStore.java
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConfigUtil;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A helper to persist region state in meta. We may change this class
+ * to StateStore later if we also use it to store other states in meta
+ */
+@InterfaceAudience.Private
+public class RegionStateStore {
+ private static final Log LOG = LogFactory.getLog(RegionStateStore.class);
+
+ private volatile HRegion metaRegion;
+ private volatile HTableInterface metaTable;
+ private volatile boolean initialized;
+
+ private final boolean noPersistence;
+ private final CatalogTracker catalogTracker;
+ private final Server server;
+
+ /**
+ * Returns the {@link ServerName} from catalog table {@link Result}
+ * where the region is transitioning. It should be the same as
+ * {@link HRegionInfo#getServerName(Result)} if the server is at OPEN state.
+ * @param r Result to pull the transitioning server name from
+ * @return A ServerName instance or {@link HRegionInfo#getServerName(Result)}
+ * if necessary fields not found or empty.
+ */
+ static ServerName getRegionServer(final Result r) {
+ Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, HConstants.SERVERNAME_QUALIFIER);
+ if (cell == null || cell.getValueLength() == 0) return HRegionInfo.getServerName(r);
+ return ServerName.parseServerName(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ /**
+ * Pull the region state from a catalog table {@link Result}.
+ * @param r Result to pull the region state from
+ * @return the region state, or OPEN if there's no value written.
+ */
+ static State getRegionState(final Result r) {
+ Cell cell = r.getColumnLatestCell(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER);
+ if (cell == null || cell.getValueLength() == 0) return State.OPEN;
+ return State.valueOf(Bytes.toString(cell.getValueArray(),
+ cell.getValueOffset(), cell.getValueLength()));
+ }
+
+ /**
+ * Check if we should persist a state change in meta. Generally it's
+ * better to persist all state changes. However, we should not do that
+ * if the region is not in meta at all. Based on the state and the
+ * previous state, we can identify if a user region has an entry
+ * in meta. For example, merged regions are deleted from meta;
+ * New merging parents, or splitting daughters are
+ * not created in meta yet.
+ */
+ private boolean shouldPersistStateChange(
+ HRegionInfo hri, RegionState state, RegionState oldState) {
+ return !hri.isMetaRegion() && !RegionStates.isOneOfStates(
+ state, State.MERGING_NEW, State.SPLITTING_NEW, State.MERGED)
+ && !(RegionStates.isOneOfStates(state, State.OFFLINE)
+ && RegionStates.isOneOfStates(oldState, State.MERGING_NEW,
+ State.SPLITTING_NEW, State.MERGED));
+ }
+
+ RegionStateStore(final Server server) {
+ Configuration conf = server.getConfiguration();
+ // No need to persist if using ZK but not migrating
+ noPersistence = ConfigUtil.useZKForAssignment(conf)
+ && !conf.getBoolean("hbase.assignment.usezk.migrating", false);
+ catalogTracker = server.getCatalogTracker();
+ this.server = server;
+ initialized = false;
+ }
+
+ @SuppressWarnings("deprecation")
+ void start() throws IOException {
+ if (!noPersistence) {
+ if (server instanceof RegionServerServices) {
+ metaRegion = ((RegionServerServices)server).getFromOnlineRegions(
+ HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
+ }
+ if (metaRegion == null) {
+ metaTable = new HTable(TableName.META_TABLE_NAME,
+ catalogTracker.getConnection());
+ }
+ }
+ initialized = true;
+ }
+
+ void stop() {
+ initialized = false;
+ if (metaTable != null) {
+ try {
+ metaTable.close();
+ } catch (IOException e) {
+ LOG.info("Got exception in closing meta table", e);
+ } finally {
+ metaTable = null;
+ }
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ void updateRegionState(long openSeqNum,
+ RegionState newState, RegionState oldState) {
+ if (noPersistence || !initialized) {
+ return;
+ }
+
+ HRegionInfo hri = newState.getRegion();
+ if (!shouldPersistStateChange(hri, newState, oldState)) {
+ return;
+ }
+
+ ServerName oldServer = oldState != null ? oldState.getServerName() : null;
+ ServerName serverName = newState.getServerName();
+ State state = newState.getState();
+
+ try {
+ Put put = new Put(hri.getRegionName());
+ StringBuilder info = new StringBuilder("Updating row ");
+ info.append(hri.getRegionNameAsString()).append(" with state=").append(state);
+ if (serverName != null && !serverName.equals(oldServer)) {
+ put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVERNAME_QUALIFIER,
+ Bytes.toBytes(serverName.getServerName()));
+ info.append("&sn=").append(serverName);
+ }
+ if (openSeqNum >= 0) {
+ Preconditions.checkArgument(state == State.OPEN
+ && serverName != null, "Open region should be on a server");
+ put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+ Bytes.toBytes(serverName.getHostAndPort()));
+ put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+ Bytes.toBytes(serverName.getStartcode()));
+ put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER,
+ Bytes.toBytes(openSeqNum));
+ info.append("&openSeqNum=").append(openSeqNum);
+ info.append("&server=").append(serverName);
+ }
+ put.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STATE_QUALIFIER,
+ Bytes.toBytes(state.name()));
+ LOG.info(info);
+
+ // Persist the state change to meta
+ if (metaRegion != null) {
+ try {
+ // Assume meta is pinned to master.
+ // At least, that's what we want.
+ metaRegion.put(put);
+ return; // Done here
+ } catch (Throwable t) {
+ // In unit tests, meta could be moved away by intention
+ // So, the shortcut is gone. We won't try to establish the
+ // shortcut any more because we prefer meta to be pinned
+ // to the master
+ synchronized (this) {
+ if (metaRegion != null) {
+ LOG.info("Meta region shortcut failed", t);
+ metaTable = new HTable(TableName.META_TABLE_NAME,
+ catalogTracker.getConnection());
+ metaRegion = null;
+ }
+ }
+ }
+ }
+ synchronized(metaTable) {
+ metaTable.put(put);
+ }
+ } catch (IOException ioe) {
+ LOG.error("Failed to persist region state " + newState, ioe);
+ server.abort("Failed to update region location", ioe);
+ }
+ }
+
+ void splitRegion(HRegionInfo p,
+ HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
+ MetaEditor.splitRegion(catalogTracker, p, a, b, sn);
+ }
+
+ void mergeRegions(HRegionInfo p,
+ HRegionInfo a, HRegionInfo b, ServerName sn) throws IOException {
+ MetaEditor.mergeRegions(catalogTracker, p, a, b, sn);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/58549428/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
index 5f96a22..39c42b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
@@ -109,6 +110,7 @@ public class RegionStates {
private final HashMap<ServerName, Long> processedServers;
private long lastProcessedServerCleanTime;
+ private final RegionStateStore regionStateStore;
private final ServerManager serverManager;
private final Server server;
@@ -116,7 +118,8 @@ public class RegionStates {
static final String LOG_SPLIT_TIME = "hbase.master.maximum.logsplit.keeptime";
static final long DEFAULT_LOG_SPLIT_TIME = 7200000L; // 2 hours
- RegionStates(final Server master, final ServerManager serverManager) {
+ RegionStates(final Server master,
+ final ServerManager serverManager, final RegionStateStore regionStateStore) {
regionStates = new HashMap<String, RegionState>();
regionsInTransition = new HashMap<String, RegionState>();
serverHoldings = new HashMap<ServerName, Set<HRegionInfo>>();
@@ -124,6 +127,7 @@ public class RegionStates {
lastAssignments = new HashMap<String, ServerName>();
processedServers = new HashMap<ServerName, Long>();
deadServers = new HashMap<String, Long>();
+ this.regionStateStore = regionStateStore;
this.serverManager = serverManager;
this.server = master;
}
@@ -188,7 +192,7 @@ public class RegionStates {
/**
* @return True if specified region is in one of the specified states.
*/
- public synchronized boolean isRegionInState(
+ public boolean isRegionInState(
final HRegionInfo hri, final State... states) {
return isRegionInState(hri.getEncodedName(), states);
}
@@ -196,14 +200,10 @@ public class RegionStates {
/**
* @return True if specified region is in one of the specified states.
*/
- public synchronized boolean isRegionInState(
+ public boolean isRegionInState(
final String encodedName, final State... states) {
RegionState regionState = getRegionState(encodedName);
- State s = regionState != null ? regionState.getState() : null;
- for (State state: states) {
- if (s == state) return true;
- }
- return false;
+ return isOneOfStates(regionState, states);
}
/**
@@ -217,9 +217,8 @@ public class RegionStates {
/**
* Get region transition state
*/
- public synchronized RegionState
- getRegionTransitionState(final HRegionInfo hri) {
- return regionsInTransition.get(hri.getEncodedName());
+ public RegionState getRegionTransitionState(final HRegionInfo hri) {
+ return getRegionTransitionState(hri.getEncodedName());
}
/**
@@ -235,7 +234,7 @@ public class RegionStates {
* and offline, its state will be SPLIT. Otherwise, its state will
* be OFFLINE. Region already in RegionStates will be skipped.
*/
- public synchronized void createRegionStates(
+ public void createRegionStates(
final List<HRegionInfo> hris) {
for (HRegionInfo hri: hris) {
createRegionState(hri);
@@ -248,16 +247,44 @@ public class RegionStates {
* be OFFLINE. If it is already in RegionStates, this call has
* no effect, and the original state is returned.
*/
- public synchronized RegionState createRegionState(final HRegionInfo hri) {
- State newState = (hri.isOffline() && hri.isSplit()) ? State.SPLIT : State.OFFLINE;
+ public RegionState createRegionState(final HRegionInfo hri) {
+ return createRegionState(hri, null, null);
+ }
+
+ /**
+ * Add a region to RegionStates with the specified state.
+ * If the region is already in RegionStates, this call has
+ * no effect, and the original state is returned.
+ */
+ public synchronized RegionState createRegionState(
+ final HRegionInfo hri, State newState, ServerName serverName) {
+ if (newState == null || (newState == State.OPEN && serverName == null)) {
+ newState = State.OFFLINE;
+ }
+ if (hri.isOffline() && hri.isSplit()) {
+ newState = State.SPLIT;
+ serverName = null;
+ }
String encodedName = hri.getEncodedName();
RegionState regionState = regionStates.get(encodedName);
if (regionState != null) {
LOG.warn("Tried to create a state for a region already in RegionStates, "
+ "used existing: " + regionState + ", ignored new: " + newState);
} else {
- regionState = new RegionState(hri, newState);
+ regionState = new RegionState(hri, newState, serverName);
regionStates.put(encodedName, regionState);
+ if (newState == State.OPEN) {
+ regionAssignments.put(hri, serverName);
+ lastAssignments.put(encodedName, serverName);
+ Set<HRegionInfo> regions = serverHoldings.get(serverName);
+ if (regions == null) {
+ regions = new HashSet<HRegionInfo>();
+ serverHoldings.put(serverName, regions);
+ }
+ regions.add(hri);
+ } else if (!regionState.isUnassignable()) {
+ regionsInTransition.put(encodedName, regionState);
+ }
}
return regionState;
}
@@ -265,9 +292,9 @@ public class RegionStates {
/**
* Update a region state. It will be put in transition if not already there.
*/
- public synchronized RegionState updateRegionState(
+ public RegionState updateRegionState(
final HRegionInfo hri, final State state) {
- RegionState regionState = regionStates.get(hri.getEncodedName());
+ RegionState regionState = getRegionState(hri.getEncodedName());
return updateRegionState(hri, state,
regionState == null ? null : regionState.getServerName());
}
@@ -278,7 +305,7 @@ public class RegionStates {
* If we can't find the region info based on the region name in
* the transition, log a warning and return null.
*/
- public synchronized RegionState updateRegionState(
+ public RegionState updateRegionState(
final RegionTransition transition, final State state) {
byte [] regionName = transition.getRegionName();
HRegionInfo regionInfo = getRegionInfo(regionName);
@@ -297,54 +324,14 @@ public class RegionStates {
/**
* Update a region state. It will be put in transition if not already there.
*/
- public synchronized RegionState updateRegionState(
+ public RegionState updateRegionState(
final HRegionInfo hri, final State state, final ServerName serverName) {
- if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
- LOG.warn("Failed to open/close " + hri.getShortNameToLog()
- + " on " + serverName + ", set to " + state);
- }
-
- String encodedName = hri.getEncodedName();
- RegionState regionState = new RegionState(
- hri, state, System.currentTimeMillis(), serverName);
- regionsInTransition.put(encodedName, regionState);
- RegionState oldState = regionStates.put(encodedName, regionState);
- ServerName oldServerName = oldState == null ? null : oldState.getServerName();
- if (oldState == null || oldState.getState() != regionState.getState()
- || (oldServerName == null && serverName != null)
- || (oldServerName != null && !oldServerName.equals(serverName))) {
- LOG.info("Transitioned " + oldState + " to " + regionState);
- }
-
- // For these states, region should be properly closed.
- // There should be no log splitting issue.
- if ((state == State.CLOSED || state == State.MERGED
- || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
- ServerName last = lastAssignments.get(encodedName);
- if (last.equals(serverName)) {
- lastAssignments.remove(encodedName);
- } else {
- LOG.warn(encodedName + " moved to " + state + " on "
- + serverName + ", expected " + last);
- }
- }
-
- // Once a region is opened, record its last assignment right away.
- if (serverName != null && state == State.OPEN) {
- ServerName last = lastAssignments.get(encodedName);
- if (!serverName.equals(last)) {
- lastAssignments.put(encodedName, serverName);
- if (last != null && isServerDeadAndNotProcessed(last)) {
- LOG.warn(encodedName + " moved to " + serverName
- + ", while it's previous host " + last
- + " is dead but not processed yet");
- }
- }
- }
+ return updateRegionState(hri, state, serverName, HConstants.NO_SEQNUM);
+ }
- // notify the change
- this.notifyAll();
- return regionState;
+ public void regionOnline(
+ final HRegionInfo hri, final ServerName serverName) {
+ regionOnline(hri, serverName, HConstants.NO_SEQNUM);
}
/**
@@ -352,8 +339,8 @@ public class RegionStates {
* We can't confirm it is really online on specified region server
* because it hasn't been put in region server's online region list yet.
*/
- public synchronized void regionOnline(
- final HRegionInfo hri, final ServerName serverName) {
+ public void regionOnline(final HRegionInfo hri,
+ final ServerName serverName, long openSeqNum) {
if (!serverManager.isServerOnline(serverName)) {
// This is possible if the region server dies before master gets a
// chance to handle ZK event in time. At this time, if the dead server
@@ -363,30 +350,26 @@ public class RegionStates {
+ " was opened on a dead server: " + serverName);
return;
}
+ updateRegionState(hri, State.OPEN, serverName, openSeqNum);
- String encodedName = hri.getEncodedName();
- RegionState oldState = regionStates.get(encodedName);
- if (oldState == null) {
- LOG.warn("Online region not in RegionStates: " + hri.getShortNameToLog());
- }
- updateRegionState(hri, State.OPEN, serverName);
- regionsInTransition.remove(encodedName);
-
- ServerName oldServerName = regionAssignments.put(hri, serverName);
- if (!serverName.equals(oldServerName)) {
- LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);
- Set<HRegionInfo> regions = serverHoldings.get(serverName);
- if (regions == null) {
- regions = new HashSet<HRegionInfo>();
- serverHoldings.put(serverName, regions);
- }
- regions.add(hri);
- if (oldServerName != null) {
- LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
- Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
- oldRegions.remove(hri);
- if (oldRegions.isEmpty()) {
- serverHoldings.remove(oldServerName);
+ synchronized (this) {
+ regionsInTransition.remove(hri.getEncodedName());
+ ServerName oldServerName = regionAssignments.put(hri, serverName);
+ if (!serverName.equals(oldServerName)) {
+ LOG.info("Onlined " + hri.getShortNameToLog() + " on " + serverName);
+ Set<HRegionInfo> regions = serverHoldings.get(serverName);
+ if (regions == null) {
+ regions = new HashSet<HRegionInfo>();
+ serverHoldings.put(serverName, regions);
+ }
+ regions.add(hri);
+ if (oldServerName != null) {
+ LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
+ Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
+ oldRegions.remove(hri);
+ if (oldRegions.isEmpty()) {
+ serverHoldings.remove(oldServerName);
+ }
}
}
}
@@ -405,6 +388,9 @@ public class RegionStates {
}
}
long now = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding to processed servers " + serverName);
+ }
processedServers.put(serverName, Long.valueOf(now));
Configuration conf = server.getConfiguration();
long obsoleteTime = conf.getLong(LOG_SPLIT_TIME, DEFAULT_LOG_SPLIT_TIME);
@@ -416,6 +402,9 @@ public class RegionStates {
= processedServers.entrySet().iterator(); it.hasNext();) {
Map.Entry<ServerName, Long> e = it.next();
if (e.getValue().longValue() < cutoff) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed from processed servers " + e.getKey());
+ }
it.remove();
}
}
@@ -425,7 +414,7 @@ public class RegionStates {
/**
* Log split is done for a given region, so it is assignable now.
*/
- public synchronized void logSplit(final HRegionInfo region) {
+ public void logSplit(final HRegionInfo region) {
clearLastAssignment(region);
}
@@ -445,24 +434,30 @@ public class RegionStates {
* should be the specified expected state, which can only be
* Split/Merged/Offline/null(=Offline)/SplittingNew/MergingNew.
*/
- public synchronized void regionOffline(
+ public void regionOffline(
final HRegionInfo hri, final State expectedState) {
Preconditions.checkArgument(expectedState == null
|| RegionState.isUnassignable(expectedState),
"Offlined region should not be " + expectedState);
- String encodedName = hri.getEncodedName();
+ if (isRegionInState(hri, State.SPLITTING_NEW, State.MERGING_NEW)) {
+ // Remove it from all region maps
+ deleteRegion(hri);
+ return;
+ }
State newState =
expectedState == null ? State.OFFLINE : expectedState;
updateRegionState(hri, newState);
- regionsInTransition.remove(encodedName);
- ServerName oldServerName = regionAssignments.remove(hri);
- if (oldServerName != null) {
- LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
- Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
- oldRegions.remove(hri);
- if (oldRegions.isEmpty()) {
- serverHoldings.remove(oldServerName);
+ synchronized (this) {
+ regionsInTransition.remove(hri.getEncodedName());
+ ServerName oldServerName = regionAssignments.remove(hri);
+ if (oldServerName != null && serverHoldings.containsKey(oldServerName)) {
+ LOG.info("Offlined " + hri.getShortNameToLog() + " from " + oldServerName);
+ Set<HRegionInfo> oldRegions = serverHoldings.get(oldServerName);
+ oldRegions.remove(hri);
+ if (oldRegions.isEmpty()) {
+ serverHoldings.remove(oldServerName);
+ }
}
}
}
@@ -580,12 +575,14 @@ public class RegionStates {
* A table is deleted. Remove its regions from all internal maps.
* We loop through all regions assuming we don't delete tables too much.
*/
- public synchronized void tableDeleted(final TableName tableName) {
+ public void tableDeleted(final TableName tableName) {
Set<HRegionInfo> regionsToDelete = new HashSet<HRegionInfo>();
- for (RegionState state: regionStates.values()) {
- HRegionInfo region = state.getRegion();
- if (region.getTable().equals(tableName)) {
- regionsToDelete.add(region);
+ synchronized (this) {
+ for (RegionState state: regionStates.values()) {
+ HRegionInfo region = state.getRegion();
+ if (region.getTable().equals(tableName)) {
+ regionsToDelete.add(region);
+ }
}
}
for (HRegionInfo region: regionsToDelete) {
@@ -654,6 +651,22 @@ public class RegionStates {
lastAssignments.put(encodedName, serverName);
}
+ synchronized void closeAllUserRegions(Set<TableName> excludedTables) {
+ boolean noExcludeTables = excludedTables == null || excludedTables.isEmpty();
+ Set<HRegionInfo> toBeClosed = new HashSet<HRegionInfo>(regionStates.size());
+ for(RegionState state: regionStates.values()) {
+ HRegionInfo hri = state.getRegion();
+ TableName tableName = hri.getTable();
+ if (!TableName.META_TABLE_NAME.equals(tableName) && !hri.isSplit()
+ && (noExcludeTables || !excludedTables.contains(tableName))) {
+ toBeClosed.add(hri);
+ }
+ }
+ for (HRegionInfo hri: toBeClosed) {
+ updateRegionState(hri, State.CLOSED);
+ }
+ }
+
/**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
@@ -739,8 +752,8 @@ public class RegionStates {
return result;
}
- protected synchronized RegionState getRegionState(final HRegionInfo hri) {
- return regionStates.get(hri.getEncodedName());
+ protected RegionState getRegionState(final HRegionInfo hri) {
+ return getRegionState(hri.getEncodedName());
}
protected synchronized RegionState getRegionState(final String encodedName) {
@@ -754,7 +767,7 @@ public class RegionStates {
*/
protected HRegionInfo getRegionInfo(final byte [] regionName) {
String encodedName = HRegionInfo.encodeRegionName(regionName);
- RegionState regionState = regionStates.get(encodedName);
+ RegionState regionState = getRegionState(encodedName);
if (regionState != null) {
return regionState.getRegion();
}
@@ -774,10 +787,74 @@ public class RegionStates {
}
}
+ static boolean isOneOfStates(RegionState regionState, State... states) {
+ State s = regionState != null ? regionState.getState() : null;
+ for (State state: states) {
+ if (s == state) return true;
+ }
+ return false;
+ }
+
+ /**
+ * Update a region state. It will be put in transition if not already there.
+ */
+ private RegionState updateRegionState(final HRegionInfo hri,
+ final State state, final ServerName serverName, long openSeqNum) {
+ if (state == State.FAILED_CLOSE || state == State.FAILED_OPEN) {
+ LOG.warn("Failed to open/close " + hri.getShortNameToLog()
+ + " on " + serverName + ", set to " + state);
+ }
+
+ String encodedName = hri.getEncodedName();
+ RegionState regionState = new RegionState(
+ hri, state, System.currentTimeMillis(), serverName);
+ RegionState oldState = getRegionState(encodedName);
+ if (!regionState.equals(oldState)) {
+ LOG.info("Transition " + oldState + " to " + regionState);
+ // Persist region state before updating in-memory info, if needed
+ regionStateStore.updateRegionState(openSeqNum, regionState, oldState);
+ }
+
+ synchronized (this) {
+ regionsInTransition.put(encodedName, regionState);
+ regionStates.put(encodedName, regionState);
+
+ // For these states, region should be properly closed.
+ // There should be no log splitting issue.
+ if ((state == State.CLOSED || state == State.MERGED
+ || state == State.SPLIT) && lastAssignments.containsKey(encodedName)) {
+ ServerName last = lastAssignments.get(encodedName);
+ if (last.equals(serverName)) {
+ lastAssignments.remove(encodedName);
+ } else {
+ LOG.warn(encodedName + " moved to " + state + " on "
+ + serverName + ", expected " + last);
+ }
+ }
+
+ // Once a region is opened, record its last assignment right away.
+ if (serverName != null && state == State.OPEN) {
+ ServerName last = lastAssignments.get(encodedName);
+ if (!serverName.equals(last)) {
+ lastAssignments.put(encodedName, serverName);
+ if (last != null && isServerDeadAndNotProcessed(last)) {
+ LOG.warn(encodedName + " moved to " + serverName
+ + ", while it's previous host " + last
+ + " is dead but not processed yet");
+ }
+ }
+ }
+
+ // notify the change
+ this.notifyAll();
+ }
+ return regionState;
+ }
+
/**
* Remove a region from all state maps.
*/
- private void deleteRegion(final HRegionInfo hri) {
+ private synchronized void deleteRegion(final HRegionInfo hri) {
String encodedName = hri.getEncodedName();
regionsInTransition.remove(encodedName);
regionStates.remove(encodedName);