You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2014/08/07 01:22:51 UTC
[07/10] HBASE-11611 Clean up ZK-based region assignment
http://git-wip-us.apache.org/repos/asf/hbase/blob/17dff681/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 4803227..c234767 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -36,7 +35,6 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -55,24 +53,16 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.TableStateManager;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
-import org.apache.hadoop.hbase.coordination.OpenRegionCoordination;
-import org.apache.hadoop.hbase.coordination.RegionMergeCoordination;
-import org.apache.hadoop.hbase.coordination.SplitTransactionCoordination.SplitTransactionDetails;
-import org.apache.hadoop.hbase.coordination.ZkOpenRegionCoordination;
-import org.apache.hadoop.hbase.coordination.ZkRegionMergeCoordination;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
@@ -82,10 +72,8 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.RegionState.State;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeAssignmentHelper;
import org.apache.hadoop.hbase.master.balancer.FavoredNodeLoadBalancer;
-import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
-import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
@@ -94,42 +82,25 @@ import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
-import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.util.Triple;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKAssign;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.ipc.RemoteException;
-import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NoNodeException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.data.Stat;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.LinkedHashMultimap;
/**
* Manages and performs region assignment.
- * <p>
- * Monitors ZooKeeper for events related to regions in transition.
- * <p>
- * Handles existing regions in transition during master failover.
+ * Related communications with regionserver are all done over RPC.
*/
@InterfaceAudience.Private
-public class AssignmentManager extends ZooKeeperListener {
+public class AssignmentManager {
private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
- public static final ServerName HBCK_CODE_SERVERNAME = ServerName.valueOf(HConstants.HBCK_CODE_NAME,
- -1, -1L);
-
static final String ALREADY_IN_TRANSITION_WAITTIME
= "hbase.assignment.already.intransition.waittime";
static final int DEFAULT_ALREADY_IN_TRANSITION_WAITTIME = 60000; // 1 minute
@@ -187,21 +158,9 @@ public class AssignmentManager extends ZooKeeperListener {
private final ExecutorService executorService;
- // For unit tests, keep track of calls to ClosedRegionHandler
- private Map<HRegionInfo, AtomicBoolean> closedRegionHandlerCalled = null;
-
- // For unit tests, keep track of calls to OpenedRegionHandler
- private Map<HRegionInfo, AtomicBoolean> openedRegionHandlerCalled = null;
-
- //Thread pool executor service for timeout monitor
+ // Thread pool executor service. TODO, consolidate with executorService?
private java.util.concurrent.ExecutorService threadPoolExecutorService;
- // A bunch of ZK events workers. Each is a single thread executor service
- private final java.util.concurrent.ExecutorService zkEventWorkers;
-
- private List<EventType> ignoreStatesRSOffline = Arrays.asList(
- EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED);
-
private final RegionStates regionStates;
// The threshold to use bulk assigning. Using bulk assignment
@@ -236,9 +195,6 @@ public class AssignmentManager extends ZooKeeperListener {
private final ConcurrentHashMap<String, AtomicInteger>
failedOpenTracker = new ConcurrentHashMap<String, AtomicInteger>();
- // A flag to indicate if we are using ZK for region assignment
- private final boolean useZKForAssignment;
-
// In case not using ZK for region assignment, region states
// are persisted in meta with a state store
private final RegionStateStore regionStateStore;
@@ -261,15 +217,14 @@ public class AssignmentManager extends ZooKeeperListener {
* @param service Executor service
* @param metricsMaster metrics manager
* @param tableLockManager TableLock manager
- * @throws KeeperException
+ * @throws CoordinatedStateException
* @throws IOException
*/
public AssignmentManager(Server server, ServerManager serverManager,
final LoadBalancer balancer,
final ExecutorService service, MetricsMaster metricsMaster,
- final TableLockManager tableLockManager) throws KeeperException,
- IOException, CoordinatedStateException {
- super(server.getZooKeeper());
+ final TableLockManager tableLockManager)
+ throws IOException, CoordinatedStateException {
this.server = server;
this.serverManager = serverManager;
this.executorService = service;
@@ -307,14 +262,8 @@ public class AssignmentManager extends ZooKeeperListener {
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
- int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
- ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
- zkEventWorkers = Threads.getBoundedCachedThreadPool(workers, 60L,
- TimeUnit.SECONDS, threadFactory);
- this.tableLockManager = tableLockManager;
-
this.metricsAssignmentManager = new MetricsAssignmentManager();
- useZKForAssignment = ConfigUtil.useZKForAssignment(conf);
+ this.tableLockManager = tableLockManager;
}
/**
@@ -406,9 +355,9 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public Pair<Integer, Integer> getReopenStatus(TableName tableName)
throws IOException {
- List <HRegionInfo> hris =
- MetaTableAccessor.getTableRegions(this.watcher, this.server.getShortCircuitConnection(),
- tableName, true);
+ List <HRegionInfo> hris = MetaTableAccessor.getTableRegions(
+ this.server.getZooKeeper(), this.server.getShortCircuitConnection(),
+ tableName, true);
Integer pending = 0;
for (HRegionInfo hri : hris) {
String name = hri.getEncodedName();
@@ -476,10 +425,6 @@ public class AssignmentManager extends ZooKeeperListener {
// previous master process.
boolean failover = processDeadServersAndRegionsInTransition(deadServers);
- if (!useZKForAssignment) {
- // Not use ZK for assignment any more, remove the ZNode
- ZKUtil.deleteNodeRecursively(watcher, watcher.assignmentZNode);
- }
recoverTableInDisablingState();
recoverTableInEnablingState();
LOG.info("Joined the cluster in " + (System.currentTimeMillis()
@@ -493,22 +438,12 @@ public class AssignmentManager extends ZooKeeperListener {
* startup, will assign all user regions.
* @param deadServers
* Map of dead servers and their regions. Can be null.
- * @throws KeeperException
* @throws IOException
* @throws InterruptedException
+ * @throws CoordinatedStateException
*/
- boolean processDeadServersAndRegionsInTransition(
- final Set<ServerName> deadServers) throws KeeperException,
- IOException, InterruptedException, CoordinatedStateException {
- List<String> nodes = ZKUtil.listChildrenNoWatch(watcher,
- watcher.assignmentZNode);
-
- if (useZKForAssignment && nodes == null) {
- String errorMessage = "Failed to get the children from ZK";
- server.abort(errorMessage, new IOException(errorMessage));
- return true; // Doesn't matter in this case
- }
-
+ boolean processDeadServersAndRegionsInTransition(final Set<ServerName> deadServers)
+ throws IOException, InterruptedException, CoordinatedStateException {
boolean failover = !serverManager.getDeadServers().isEmpty();
if (failover) {
// This may not be a failover actually, especially if meta is on this master.
@@ -517,36 +452,28 @@ public class AssignmentManager extends ZooKeeperListener {
}
} else {
// If any one region except meta is assigned, it's a failover.
- for (HRegionInfo hri: regionStates.getRegionAssignments().keySet()) {
- if (!hri.isMetaTable()) {
+ Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
+ for (Map.Entry<HRegionInfo, ServerName> en:
+ regionStates.getRegionAssignments().entrySet()) {
+ HRegionInfo hri = en.getKey();
+ if (!hri.isMetaTable()
+ && onlineServers.contains(en.getValue())) {
LOG.debug("Found " + hri + " out on cluster");
failover = true;
break;
}
}
- }
- if (!failover && nodes != null) {
- // If any one region except meta is in transition, it's a failover.
- for (String encodedName: nodes) {
- RegionState regionState = regionStates.getRegionState(encodedName);
- if (regionState != null && !regionState.getRegion().isMetaRegion()) {
- LOG.debug("Found " + regionState + " in RITs");
- failover = true;
- break;
- }
- }
- }
- if (!failover && !useZKForAssignment) {
- // If any region except meta is in transition on a live server, it's a failover.
- Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
- if (!regionsInTransition.isEmpty()) {
- Set<ServerName> onlineServers = serverManager.getOnlineServers().keySet();
- for (RegionState regionState: regionsInTransition.values()) {
- if (!regionState.getRegion().isMetaRegion()
- && onlineServers.contains(regionState.getServerName())) {
- LOG.debug("Found " + regionState + " in RITs");
- failover = true;
- break;
+ if (!failover) {
+ // If any region except meta is in transition on a live server, it's a failover.
+ Map<String, RegionState> regionsInTransition = regionStates.getRegionsInTransition();
+ if (!regionsInTransition.isEmpty()) {
+ for (RegionState regionState: regionsInTransition.values()) {
+ if (!regionState.getRegion().isMetaRegion()
+ && onlineServers.contains(regionState.getServerName())) {
+ LOG.debug("Found " + regionState + " in RITs");
+ failover = true;
+ break;
+ }
}
}
}
@@ -596,19 +523,8 @@ public class AssignmentManager extends ZooKeeperListener {
// Now region states are restored
regionStateStore.start();
- // If we found user regions out on cluster, its a failover.
if (failover) {
- LOG.info("Found regions out on cluster or in RIT; presuming failover");
- // Process list of dead servers and regions in RIT.
- // See HBASE-4580 for more information.
- processDeadServersAndRecoverLostRegions(deadServers);
- }
-
- if (!failover && useZKForAssignment) {
- // Cleanup any existing ZK nodes and start watching
- ZKAssign.deleteAllNodes(watcher);
- ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
- this.watcher.assignmentZNode);
+ processDeadServers(deadServers);
}
// Now we can safely claim failover cleanup completed and enable
@@ -632,254 +548,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
- * If region is up in zk in transition, then do fixup and block and wait until
- * the region is assigned and out of transition. Used on startup for
- * catalog regions.
- * @param hri Region to look for.
- * @return True if we processed a region in transition else false if region
- * was not up in zk in transition.
- * @throws InterruptedException
- * @throws KeeperException
- * @throws IOException
- */
- boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
- throws InterruptedException, KeeperException, IOException {
- String encodedRegionName = hri.getEncodedName();
- if (!processRegionInTransition(encodedRegionName, hri)) {
- return false; // The region is not in transition
- }
- LOG.debug("Waiting on " + HRegionInfo.prettyPrint(encodedRegionName));
- while (!this.server.isStopped() &&
- this.regionStates.isRegionInTransition(encodedRegionName)) {
- RegionState state = this.regionStates.getRegionTransitionState(encodedRegionName);
- if (state == null || !serverManager.isServerOnline(state.getServerName())) {
- // The region is not in transition, or not in transition on an online
- // server. Doesn't help to block here any more. Caller need to
- // verify the region is actually assigned.
- break;
- }
- this.regionStates.waitForUpdate(100);
- }
- return true;
- }
-
- /**
- * Process failover of new master for region <code>encodedRegionName</code>
- * up in zookeeper.
- * @param encodedRegionName Region to process failover for.
- * @param regionInfo If null we'll go get it from meta table.
- * @return True if we processed <code>regionInfo</code> as a RIT.
- * @throws KeeperException
- * @throws IOException
- */
- boolean processRegionInTransition(final String encodedRegionName,
- final HRegionInfo regionInfo) throws KeeperException, IOException {
- // We need a lock here to ensure that we will not put the same region twice
- // It has no reason to be a lock shared with the other operations.
- // We can do the lock on the region only, instead of a global lock: what we want to ensure
- // is that we don't have two threads working on the same region.
- Lock lock = locker.acquireLock(encodedRegionName);
- try {
- Stat stat = new Stat();
- byte [] data = ZKAssign.getDataAndWatch(watcher, encodedRegionName, stat);
- if (data == null) return false;
- RegionTransition rt;
- try {
- rt = RegionTransition.parseFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse znode data", e);
- return false;
- }
- HRegionInfo hri = regionInfo;
- if (hri == null) {
- // The region info is not passed in. We will try to find the region
- // from region states map/meta based on the encoded region name. But we
- // may not be able to find it. This is valid for online merge that
- // the region may have not been created if the merge is not completed.
- // Therefore, it is not in meta at master recovery time.
- hri = regionStates.getRegionInfo(rt.getRegionName());
- EventType et = rt.getEventType();
- if (hri == null && et != EventType.RS_ZK_REGION_MERGING
- && et != EventType.RS_ZK_REQUEST_REGION_MERGE) {
- LOG.warn("Couldn't find the region in recovering " + rt);
- return false;
- }
- }
-
- // TODO: This code is tied to ZK anyway, so for now leaving it as is,
- // will refactor when whole region assignment will be abstracted from ZK
- BaseCoordinatedStateManager cp =
- (BaseCoordinatedStateManager) this.server.getCoordinatedStateManager();
- OpenRegionCoordination openRegionCoordination = cp.getOpenRegionCoordination();
-
- ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
- new ZkOpenRegionCoordination.ZkOpenRegionDetails();
- zkOrd.setVersion(stat.getVersion());
- zkOrd.setServerName(cp.getServer().getServerName());
-
- return processRegionsInTransition(
- rt, hri, openRegionCoordination, zkOrd);
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * This call is invoked only (1) master assign meta;
- * (2) during failover mode startup, zk assignment node processing.
- * The locker is set in the caller. It returns true if the region
- * is in transition for sure, false otherwise.
- *
- * It should be private but it is used by some test too.
- */
- boolean processRegionsInTransition(
- final RegionTransition rt, final HRegionInfo regionInfo,
- OpenRegionCoordination coordination,
- final OpenRegionCoordination.OpenRegionDetails ord) throws KeeperException {
- EventType et = rt.getEventType();
- // Get ServerName. Could not be null.
- final ServerName sn = rt.getServerName();
- final byte[] regionName = rt.getRegionName();
- final String encodedName = HRegionInfo.encodeRegionName(regionName);
- final String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
- LOG.info("Processing " + prettyPrintedRegionName + " in state: " + et);
-
- if (regionStates.isRegionInTransition(encodedName)
- && (regionInfo.isMetaRegion() || !useZKForAssignment)) {
- LOG.info("Processed region " + prettyPrintedRegionName + " in state: "
- + et + ", does nothing since the region is already in transition "
- + regionStates.getRegionTransitionState(encodedName));
- // Just return
- return true;
- }
- if (!serverManager.isServerOnline(sn)) {
- // It was transitioning on a dead server, so it's closed now.
- // Force to OFFLINE and put it in transition, but not assign it
- // since log splitting for the dead server is not done yet.
- LOG.debug("RIT " + encodedName + " in state=" + rt.getEventType() +
- " was on deadserver; forcing offline");
- if (regionStates.isRegionOnline(regionInfo)) {
- // Meta could still show the region is assigned to the previous
- // server. If that server is online, when we reload the meta, the
- // region is put back to online, we need to offline it.
- regionStates.regionOffline(regionInfo);
- sendRegionClosedNotification(regionInfo);
- }
- // Put it back in transition so that SSH can re-assign it
- regionStates.updateRegionState(regionInfo, State.OFFLINE, sn);
-
- if (regionInfo.isMetaRegion()) {
- // If it's meta region, reset the meta location.
- // So that master knows the right meta region server.
- MetaTableLocator.setMetaLocation(watcher, sn);
- } else {
- // No matter the previous server is online or offline,
- // we need to reset the last region server of the region.
- regionStates.setLastRegionServerOfRegion(sn, encodedName);
- // Make sure we know the server is dead.
- if (!serverManager.isServerDead(sn)) {
- serverManager.expireServer(sn);
- }
- }
- return false;
- }
- switch (et) {
- case M_ZK_REGION_CLOSING:
- // Insert into RIT & resend the query to the region server: may be the previous master
- // died before sending the query the first time.
- final RegionState rsClosing = regionStates.updateRegionState(rt, State.CLOSING);
- this.executorService.submit(
- new EventHandler(server, EventType.M_MASTER_RECOVERY) {
- @Override
- public void process() throws IOException {
- ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
- try {
- final int expectedVersion = ((ZkOpenRegionCoordination.ZkOpenRegionDetails) ord)
- .getVersion();
- unassign(regionInfo, rsClosing, expectedVersion, null, useZKForAssignment, null);
- if (regionStates.isRegionOffline(regionInfo)) {
- assign(regionInfo, true);
- }
- } finally {
- lock.unlock();
- }
- }
- });
- break;
-
- case RS_ZK_REGION_CLOSED:
- case RS_ZK_REGION_FAILED_OPEN:
- // Region is closed, insert into RIT and handle it
- regionStates.updateRegionState(regionInfo, State.CLOSED, sn);
- if (!replicasToClose.contains(regionInfo)) {
- invokeAssign(regionInfo);
- } else {
- offlineDisabledRegion(regionInfo);
- }
- break;
-
- case M_ZK_REGION_OFFLINE:
- // Insert in RIT and resend to the regionserver
- regionStates.updateRegionState(rt, State.PENDING_OPEN);
- final RegionState rsOffline = regionStates.getRegionState(regionInfo);
- this.executorService.submit(
- new EventHandler(server, EventType.M_MASTER_RECOVERY) {
- @Override
- public void process() throws IOException {
- ReentrantLock lock = locker.acquireLock(regionInfo.getEncodedName());
- try {
- RegionPlan plan = new RegionPlan(regionInfo, null, sn);
- addPlan(encodedName, plan);
- assign(rsOffline, false, false);
- } finally {
- lock.unlock();
- }
- }
- });
- break;
-
- case RS_ZK_REGION_OPENING:
- regionStates.updateRegionState(rt, State.OPENING);
- break;
-
- case RS_ZK_REGION_OPENED:
- // Region is opened, insert into RIT and handle it
- // This could be done asynchronously, we would need then to acquire the lock in the
- // handler.
- regionStates.updateRegionState(rt, State.OPEN);
- new OpenedRegionHandler(server, this, regionInfo, coordination, ord).process();
- break;
- case RS_ZK_REQUEST_REGION_SPLIT:
- case RS_ZK_REGION_SPLITTING:
- case RS_ZK_REGION_SPLIT:
- // Splitting region should be online. We could have skipped it during
- // user region rebuilding since we may consider the split is completed.
- // Put it in SPLITTING state to avoid complications.
- regionStates.regionOnline(regionInfo, sn);
- regionStates.updateRegionState(rt, State.SPLITTING);
- if (!handleRegionSplitting(
- rt, encodedName, prettyPrintedRegionName, sn)) {
- deleteSplittingNode(encodedName, sn);
- }
- break;
- case RS_ZK_REQUEST_REGION_MERGE:
- case RS_ZK_REGION_MERGING:
- case RS_ZK_REGION_MERGED:
- if (!handleRegionMerging(
- rt, encodedName, prettyPrintedRegionName, sn)) {
- deleteMergingNode(encodedName, sn);
- }
- break;
- default:
- throw new IllegalStateException("Received region in state:" + et + " is not valid.");
- }
- LOG.info("Processed region " + prettyPrintedRegionName + " in state "
- + et + ", on " + (serverManager.isServerOnline(sn) ? "" : "dead ")
- + "server: " + sn);
- return true;
- }
-
- /**
* When a region is closed, it should be removed from the regionsToReopen
* @param hri HRegionInfo of the region which was closed
*/
@@ -889,247 +557,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
- /**
- * Handles various states an unassigned node can be in.
- * <p>
- * Method is called when a state change is suspected for an unassigned node.
- * <p>
- * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
- * yet).
- * @param rt region transition
- * @param coordination coordination for opening region
- * @param ord details about opening region
- */
- void handleRegion(final RegionTransition rt, OpenRegionCoordination coordination,
- OpenRegionCoordination.OpenRegionDetails ord) {
- if (rt == null) {
- LOG.warn("Unexpected NULL input for RegionTransition rt");
- return;
- }
- final ServerName sn = rt.getServerName();
- // Check if this is a special HBCK transition
- if (sn.equals(HBCK_CODE_SERVERNAME)) {
- handleHBCK(rt);
- return;
- }
- final long createTime = rt.getCreateTime();
- final byte[] regionName = rt.getRegionName();
- String encodedName = HRegionInfo.encodeRegionName(regionName);
- String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
- // Verify this is a known server
- if (!serverManager.isServerOnline(sn)
- && !ignoreStatesRSOffline.contains(rt.getEventType())) {
- LOG.warn("Attempted to handle region transition for server but " +
- "it is not online: " + prettyPrintedRegionName + ", " + rt);
- return;
- }
-
- RegionState regionState =
- regionStates.getRegionState(encodedName);
- long startTime = System.currentTimeMillis();
- if (LOG.isDebugEnabled()) {
- boolean lateEvent = createTime < (startTime - 15000);
- LOG.debug("Handling " + rt.getEventType() +
- ", server=" + sn + ", region=" +
- (prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
- (lateEvent ? ", which is more than 15 seconds late" : "") +
- ", current_state=" + regionState);
- }
- // We don't do anything for this event,
- // so separate it out, no need to lock/unlock anything
- if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
- return;
- }
-
- // We need a lock on the region as we could update it
- Lock lock = locker.acquireLock(encodedName);
- try {
- RegionState latestState =
- regionStates.getRegionState(encodedName);
- if ((regionState == null && latestState != null)
- || (regionState != null && latestState == null)
- || (regionState != null && latestState != null
- && latestState.getState() != regionState.getState())) {
- LOG.warn("Region state changed from " + regionState + " to "
- + latestState + ", while acquiring lock");
- }
- long waitedTime = System.currentTimeMillis() - startTime;
- if (waitedTime > 5000) {
- LOG.warn("Took " + waitedTime + "ms to acquire the lock");
- }
- regionState = latestState;
- switch (rt.getEventType()) {
- case RS_ZK_REQUEST_REGION_SPLIT:
- case RS_ZK_REGION_SPLITTING:
- case RS_ZK_REGION_SPLIT:
- if (!handleRegionSplitting(
- rt, encodedName, prettyPrintedRegionName, sn)) {
- deleteSplittingNode(encodedName, sn);
- }
- break;
-
- case RS_ZK_REQUEST_REGION_MERGE:
- case RS_ZK_REGION_MERGING:
- case RS_ZK_REGION_MERGED:
- // Merged region is a new region, we can't find it in the region states now.
- // However, the two merging regions are not new. They should be in state for merging.
- if (!handleRegionMerging(
- rt, encodedName, prettyPrintedRegionName, sn)) {
- deleteMergingNode(encodedName, sn);
- }
- break;
-
- case M_ZK_REGION_CLOSING:
- // Should see CLOSING after we have asked it to CLOSE or additional
- // times after already being in state of CLOSING
- if (regionState == null
- || !regionState.isPendingCloseOrClosingOnServer(sn)) {
- LOG.warn("Received CLOSING for " + prettyPrintedRegionName
- + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
- + regionStates.getRegionState(encodedName));
- return;
- }
- // Transition to CLOSING (or update stamp if already CLOSING)
- regionStates.updateRegionState(rt, State.CLOSING);
- break;
-
- case RS_ZK_REGION_CLOSED:
- // Should see CLOSED after CLOSING but possible after PENDING_CLOSE
- if (regionState == null
- || !regionState.isPendingCloseOrClosingOnServer(sn)) {
- LOG.warn("Received CLOSED for " + prettyPrintedRegionName
- + " from " + sn + " but the region isn't PENDING_CLOSE/CLOSING here: "
- + regionStates.getRegionState(encodedName));
- return;
- }
- // Handle CLOSED by assigning elsewhere or stopping if a disable
- // If we got here all is good. Need to update RegionState -- else
- // what follows will fail because not in expected state.
- new ClosedRegionHandler(server, this, regionState.getRegion()).process();
- updateClosedRegionHandlerTracker(regionState.getRegion());
- break;
-
- case RS_ZK_REGION_FAILED_OPEN:
- if (regionState == null
- || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
- LOG.warn("Received FAILED_OPEN for " + prettyPrintedRegionName
- + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
- + regionStates.getRegionState(encodedName));
- return;
- }
- AtomicInteger failedOpenCount = failedOpenTracker.get(encodedName);
- if (failedOpenCount == null) {
- failedOpenCount = new AtomicInteger();
- // No need to use putIfAbsent, or extra synchronization since
- // this whole handleRegion block is locked on the encoded region
- // name, and failedOpenTracker is updated only in this block
- failedOpenTracker.put(encodedName, failedOpenCount);
- }
- if (failedOpenCount.incrementAndGet() >= maximumAttempts) {
- regionStates.updateRegionState(rt, State.FAILED_OPEN);
- // remove the tracking info to save memory, also reset
- // the count for next open initiative
- failedOpenTracker.remove(encodedName);
- } else {
- // Handle this the same as if it were opened and then closed.
- regionState = regionStates.updateRegionState(rt, State.CLOSED);
- if (regionState != null) {
- // When there are more than one region server a new RS is selected as the
- // destination and the same is updated in the regionplan. (HBASE-5546)
- try {
- getRegionPlan(regionState.getRegion(), sn, true);
- new ClosedRegionHandler(server, this, regionState.getRegion()).process();
- } catch (HBaseIOException e) {
- LOG.warn("Failed to get region plan", e);
- }
- }
- }
- break;
-
- case RS_ZK_REGION_OPENING:
- // Should see OPENING after we have asked it to OPEN or additional
- // times after already being in state of OPENING
- if (regionState == null
- || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
- LOG.warn("Received OPENING for " + prettyPrintedRegionName
- + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
- + regionStates.getRegionState(encodedName));
- return;
- }
- // Transition to OPENING (or update stamp if already OPENING)
- regionStates.updateRegionState(rt, State.OPENING);
- break;
-
- case RS_ZK_REGION_OPENED:
- // Should see OPENED after OPENING but possible after PENDING_OPEN.
- if (regionState == null
- || !regionState.isPendingOpenOrOpeningOnServer(sn)) {
- LOG.warn("Received OPENED for " + prettyPrintedRegionName
- + " from " + sn + " but the region isn't PENDING_OPEN/OPENING here: "
- + regionStates.getRegionState(encodedName));
-
- if (regionState != null) {
- // Close it without updating the internal region states,
- // so as not to create double assignments in unlucky scenarios
- // mentioned in OpenRegionHandler#process
- unassign(regionState.getRegion(), null, -1, null, false, sn);
- }
- return;
- }
- // Handle OPENED by removing from transition and deleted zk node
- regionState = regionStates.updateRegionState(rt, State.OPEN);
- if (regionState != null) {
- failedOpenTracker.remove(encodedName); // reset the count, if any
- new OpenedRegionHandler(
- server, this, regionState.getRegion(), coordination, ord).process();
- updateOpenedRegionHandlerTracker(regionState.getRegion());
- }
- break;
-
- default:
- throw new IllegalStateException("Received event is not valid.");
- }
- } finally {
- lock.unlock();
- }
- }
-
- //For unit tests only
- boolean wasClosedHandlerCalled(HRegionInfo hri) {
- AtomicBoolean b = closedRegionHandlerCalled.get(hri);
- //compareAndSet to be sure that unit tests don't see stale values. Means,
- //we will return true exactly once unless the handler code resets to true
- //this value.
- return b == null ? false : b.compareAndSet(true, false);
- }
-
- //For unit tests only
- boolean wasOpenedHandlerCalled(HRegionInfo hri) {
- AtomicBoolean b = openedRegionHandlerCalled.get(hri);
- //compareAndSet to be sure that unit tests don't see stale values. Means,
- //we will return true exactly once unless the handler code resets to true
- //this value.
- return b == null ? false : b.compareAndSet(true, false);
- }
-
- //For unit tests only
- void initializeHandlerTrackers() {
- closedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
- openedRegionHandlerCalled = new HashMap<HRegionInfo, AtomicBoolean>();
- }
-
- void updateClosedRegionHandlerTracker(HRegionInfo hri) {
- if (closedRegionHandlerCalled != null) { //only for unit tests this is true
- closedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
- }
- }
-
- void updateOpenedRegionHandlerTracker(HRegionInfo hri) {
- if (openedRegionHandlerCalled != null) { //only for unit tests this is true
- openedRegionHandlerCalled.put(hri, new AtomicBoolean(true));
- }
- }
-
// TODO: processFavoredNodes might throw an exception, for e.g., if the
// meta could not be contacted/updated. We need to see how seriously to treat
// this problem as. Should we fail the current assignment. We should be able
@@ -1150,264 +577,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
- * Handle a ZK unassigned node transition triggered by HBCK repair tool.
- * <p>
- * This is handled in a separate code path because it breaks the normal rules.
- * @param rt
- */
- @SuppressWarnings("deprecation")
- private void handleHBCK(RegionTransition rt) {
- String encodedName = HRegionInfo.encodeRegionName(rt.getRegionName());
- LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
- ", server=" + rt.getServerName() + ", region=" +
- HRegionInfo.prettyPrint(encodedName));
- RegionState regionState = regionStates.getRegionTransitionState(encodedName);
- switch (rt.getEventType()) {
- case M_ZK_REGION_OFFLINE:
- HRegionInfo regionInfo;
- if (regionState != null) {
- regionInfo = regionState.getRegion();
- } else {
- try {
- byte [] name = rt.getRegionName();
- Pair<HRegionInfo, ServerName> p = MetaTableAccessor.getRegion(
- this.server.getShortCircuitConnection(), name);
- regionInfo = p.getFirst();
- } catch (IOException e) {
- LOG.info("Exception reading hbase:meta doing HBCK repair operation", e);
- return;
- }
- }
- LOG.info("HBCK repair is triggering assignment of region=" +
- regionInfo.getRegionNameAsString());
- // trigger assign, node is already in OFFLINE so don't need to update ZK
- assign(regionInfo, false);
- break;
-
- default:
- LOG.warn("Received unexpected region state from HBCK: " + rt.toString());
- break;
- }
-
- }
-
- // ZooKeeper events
-
- /**
- * New unassigned node has been created.
- *
- * <p>This happens when an RS begins the OPENING or CLOSING of a region by
- * creating an unassigned node.
- *
- * <p>When this happens we must:
- * <ol>
- * <li>Watch the node for further events</li>
- * <li>Read and handle the state in the node</li>
- * </ol>
- */
- @Override
- public void nodeCreated(String path) {
- handleAssignmentEvent(path);
- }
-
- /**
- * Existing unassigned node has had data changed.
- *
- * <p>This happens when an RS transitions from OFFLINE to OPENING, or between
- * OPENING/OPENED and CLOSING/CLOSED.
- *
- * <p>When this happens we must:
- * <ol>
- * <li>Watch the node for further events</li>
- * <li>Read and handle the state in the node</li>
- * </ol>
- */
- @Override
- public void nodeDataChanged(String path) {
- handleAssignmentEvent(path);
- }
-
-
- // We don't want to have two events on the same region managed simultaneously.
- // For this reason, we need to wait if an event on the same region is currently in progress.
- // So we track the region names of the events in progress, and we keep a waiting list.
- private final Set<String> regionsInProgress = new HashSet<String>();
- // In a LinkedHashMultimap, the put order is kept when we retrieve the collection back. We need
- // this as we want the events to be managed in the same order as we received them.
- private final LinkedHashMultimap <String, RegionRunnable>
- zkEventWorkerWaitingList = LinkedHashMultimap.create();
-
- /**
- * A specific runnable that works only on a region.
- */
- private interface RegionRunnable extends Runnable{
- /**
- * @return - the name of the region it works on.
- */
- String getRegionName();
- }
-
- /**
- * Submit a task, ensuring that there is only one task at a time that working on a given region.
- * Order is respected.
- */
- protected void zkEventWorkersSubmit(final RegionRunnable regRunnable) {
-
- synchronized (regionsInProgress) {
- // If we're there is already a task with this region, we add it to the
- // waiting list and return.
- if (regionsInProgress.contains(regRunnable.getRegionName())) {
- synchronized (zkEventWorkerWaitingList){
- zkEventWorkerWaitingList.put(regRunnable.getRegionName(), regRunnable);
- }
- return;
- }
-
- // No event in progress on this region => we can submit a new task immediately.
- regionsInProgress.add(regRunnable.getRegionName());
- zkEventWorkers.submit(new Runnable() {
- @Override
- public void run() {
- try {
- regRunnable.run();
- } finally {
- // now that we have finished, let's see if there is an event for the same region in the
- // waiting list. If it's the case, we can now submit it to the pool.
- synchronized (regionsInProgress) {
- regionsInProgress.remove(regRunnable.getRegionName());
- synchronized (zkEventWorkerWaitingList) {
- java.util.Set<RegionRunnable> waiting = zkEventWorkerWaitingList.get(
- regRunnable.getRegionName());
- if (!waiting.isEmpty()) {
- // We want the first object only. The only way to get it is through an iterator.
- RegionRunnable toSubmit = waiting.iterator().next();
- zkEventWorkerWaitingList.remove(toSubmit.getRegionName(), toSubmit);
- zkEventWorkersSubmit(toSubmit);
- }
- }
- }
- }
- }
- });
- }
- }
-
- @Override
- public void nodeDeleted(final String path) {
- if (path.startsWith(watcher.assignmentZNode)) {
- final String regionName = ZKAssign.getRegionName(watcher, path);
- zkEventWorkersSubmit(new RegionRunnable() {
- @Override
- public String getRegionName() {
- return regionName;
- }
-
- @Override
- public void run() {
- Lock lock = locker.acquireLock(regionName);
- try {
- RegionState rs = regionStates.getRegionTransitionState(regionName);
- if (rs == null) {
- rs = regionStates.getRegionState(regionName);
- if (rs == null || !rs.isMergingNew()) {
- // MergingNew is an offline state
- return;
- }
- }
-
- HRegionInfo regionInfo = rs.getRegion();
- String regionNameStr = regionInfo.getRegionNameAsString();
- LOG.debug("Znode " + regionNameStr + " deleted, state: " + rs);
-
- boolean disabled = getTableStateManager().isTableState(regionInfo.getTable(),
- ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING);
-
- ServerName serverName = rs.getServerName();
- if (serverManager.isServerOnline(serverName)) {
- if (rs.isOnServer(serverName)
- && (rs.isOpened() || rs.isSplitting())) {
- regionOnline(regionInfo, serverName);
- if (disabled) {
- // if server is offline, no hurt to unassign again
- LOG.info("Opened " + regionNameStr
- + "but this table is disabled, triggering close of region");
- unassign(regionInfo);
- }
- } else if (rs.isMergingNew()) {
- synchronized (regionStates) {
- String p = regionInfo.getEncodedName();
- PairOfSameType<HRegionInfo> regions = mergingRegions.get(p);
- if (regions != null) {
- onlineMergingRegion(disabled, regions.getFirst(), serverName);
- onlineMergingRegion(disabled, regions.getSecond(), serverName);
- }
- }
- }
- }
- } finally {
- lock.unlock();
- }
- }
-
- private void onlineMergingRegion(boolean disabled,
- final HRegionInfo hri, final ServerName serverName) {
- RegionState regionState = regionStates.getRegionState(hri);
- if (regionState != null && regionState.isMerging()
- && regionState.isOnServer(serverName)) {
- regionOnline(regionState.getRegion(), serverName);
- if (disabled) {
- unassign(hri);
- }
- }
- }
- });
- }
- }
-
- /**
- * New unassigned node has been created.
- *
- * <p>This happens when an RS begins the OPENING, SPLITTING or CLOSING of a
- * region by creating a znode.
- *
- * <p>When this happens we must:
- * <ol>
- * <li>Watch the node for further children changed events</li>
- * <li>Watch all new children for changed events</li>
- * </ol>
- */
- @Override
- public void nodeChildrenChanged(String path) {
- if (path.equals(watcher.assignmentZNode)) {
- zkEventWorkers.submit(new Runnable() {
- @Override
- public void run() {
- try {
- // Just make sure we see the changes for the new znodes
- List<String> children =
- ZKUtil.listChildrenAndWatchForNewChildren(
- watcher, watcher.assignmentZNode);
- if (children != null) {
- Stat stat = new Stat();
- for (String child : children) {
- // if region is in transition, we already have a watch
- // on it, so no need to watch it again. So, as I know for now,
- // this is needed to watch splitting nodes only.
- if (!regionStates.isRegionInTransition(child)) {
- ZKAssign.getDataAndWatch(watcher, child, stat);
- }
- }
- }
- } catch (KeeperException e) {
- server.abort("Unexpected ZK exception reading unassigned children", e);
- }
- }
- });
- }
- }
-
-
- /**
* Marks the region as online. Removes it from regions in transition and
* updates the in-memory assignment information.
* <p>
@@ -1432,55 +601,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
- * Pass the assignment event to a worker for processing.
- * Each worker is a single thread executor service. The reason
- * for just one thread is to make sure all events for a given
- * region are processed in order.
- *
- * @param path
- */
- private void handleAssignmentEvent(final String path) {
- if (path.startsWith(watcher.assignmentZNode)) {
- final String regionName = ZKAssign.getRegionName(watcher, path);
-
- zkEventWorkersSubmit(new RegionRunnable() {
- @Override
- public String getRegionName() {
- return regionName;
- }
-
- @Override
- public void run() {
- try {
- Stat stat = new Stat();
- byte [] data = ZKAssign.getDataAndWatch(watcher, path, stat);
- if (data == null) return;
-
- RegionTransition rt = RegionTransition.parseFrom(data);
-
- // TODO: This code is tied to ZK anyway, so for now leaving it as is,
- // will refactor when whole region assignment will be abstracted from ZK
- BaseCoordinatedStateManager csm =
- (BaseCoordinatedStateManager) server.getCoordinatedStateManager();
- OpenRegionCoordination openRegionCoordination = csm.getOpenRegionCoordination();
-
- ZkOpenRegionCoordination.ZkOpenRegionDetails zkOrd =
- new ZkOpenRegionCoordination.ZkOpenRegionDetails();
- zkOrd.setVersion(stat.getVersion());
- zkOrd.setServerName(csm.getServer().getServerName());
-
- handleRegion(rt, openRegionCoordination, zkOrd);
- } catch (KeeperException e) {
- server.abort("Unexpected ZK exception reading unassigned node data", e);
- } catch (DeserializationException e) {
- server.abort("Unexpected exception deserializing node data", e);
- }
- }
- });
- }
- }
-
- /**
* Marks the region as offline. Removes it from regions in transition and
* removes in-memory assignment information.
* <p>
@@ -1492,15 +612,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
public void offlineDisabledRegion(HRegionInfo regionInfo) {
- if (useZKForAssignment) {
- // Disabling so should not be reassigned, just delete the CLOSED node
- LOG.debug("Table being disabled so deleting ZK node and removing from " +
- "regions in transition, skipping assignment of region " +
- regionInfo.getRegionNameAsString());
- String encodedName = regionInfo.getEncodedName();
- deleteNodeInStates(encodedName, "closed", null,
- EventType.RS_ZK_REGION_CLOSED, EventType.M_ZK_REGION_OFFLINE);
- }
replicasToClose.remove(regionInfo);
regionOffline(regionInfo);
}
@@ -1517,23 +628,19 @@ public class AssignmentManager extends ZooKeeperListener {
* Updates the RegionState and sends the OPEN RPC.
* <p>
* This will only succeed if the region is in transition and in a CLOSED or
- * OFFLINE state or not in transition (in-memory not zk), and of course, the
- * chosen server is up and running (It may have just crashed!). If the
- * in-memory checks pass, the zk node is forced to OFFLINE before assigning.
+ * OFFLINE state or not in transition, and of course, the
+ * chosen server is up and running (It may have just crashed!).
*
* @param region server to be assigned
- * @param setOfflineInZK whether ZK node should be created/transitioned to an
- * OFFLINE state before assigning the region
*/
- public void assign(HRegionInfo region, boolean setOfflineInZK) {
- assign(region, setOfflineInZK, false);
+ public void assign(HRegionInfo region) {
+ assign(region, false);
}
/**
* Use care with forceNewPlan. It could cause double assignment.
*/
- public void assign(HRegionInfo region,
- boolean setOfflineInZK, boolean forceNewPlan) {
+ public void assign(HRegionInfo region, boolean forceNewPlan) {
if (isDisabledorDisablingRegionInRIT(region)) {
return;
}
@@ -1553,7 +660,7 @@ public class AssignmentManager extends ZooKeeperListener {
+ " is dead but not processed yet");
return;
}
- assign(state, setOfflineInZK && useZKForAssignment, forceNewPlan);
+ assign(state, forceNewPlan);
}
} finally {
lock.unlock();
@@ -1583,12 +690,8 @@ public class AssignmentManager extends ZooKeeperListener {
List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
Map<String, Lock> locks = locker.acquireLocks(encodedNames);
try {
- AtomicInteger counter = new AtomicInteger(0);
- Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
- OfflineCallback cb = new OfflineCallback(
- watcher, destination, counter, offlineNodesVersions);
- Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
- List<RegionState> states = new ArrayList<RegionState>(regions.size());
+ Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regionCount);
+ List<RegionState> states = new ArrayList<RegionState>(regionCount);
for (HRegionInfo region : regions) {
String encodedName = region.getEncodedName();
if (!isDisabledorDisablingRegionInRIT(region)) {
@@ -1600,8 +703,7 @@ public class AssignmentManager extends ZooKeeperListener {
+ ", it's host " + regionStates.getLastRegionServerOfRegion(encodedName)
+ " is dead but not processed yet");
onDeadServer = true;
- } else if (!useZKForAssignment
- || asyncSetOfflineInZooKeeper(state, cb, destination)) {
+ } else {
RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
plans.put(encodedName, plan);
states.add(state);
@@ -1610,8 +712,8 @@ public class AssignmentManager extends ZooKeeperListener {
}
// Reassign if the region wasn't on a dead server
if (!onDeadServer) {
- LOG.info("failed to force region state to offline or "
- + "failed to set it offline in ZK, will reassign later: " + region);
+ LOG.info("failed to force region state to offline, "
+ + "will reassign later: " + region);
failedToOpenRegions.add(region); // assign individually later
}
}
@@ -1621,21 +723,6 @@ public class AssignmentManager extends ZooKeeperListener {
lock.unlock();
}
- if (useZKForAssignment) {
- // Wait until all unassigned nodes have been put up and watchers set.
- int total = states.size();
- for (int oldCounter = 0; !server.isStopped();) {
- int count = counter.get();
- if (oldCounter != count) {
- LOG.debug(destination.toString() + " unassigned znodes=" + count +
- " of total=" + total + "; oldCounter=" + oldCounter);
- oldCounter = count;
- }
- if (count >= total) break;
- Thread.sleep(5);
- }
- }
-
if (server.isStopped()) {
return false;
}
@@ -1644,27 +731,18 @@ public class AssignmentManager extends ZooKeeperListener {
// that unnecessary timeout on RIT is reduced.
this.addPlans(plans);
- List<Triple<HRegionInfo, Integer, List<ServerName>>> regionOpenInfos =
- new ArrayList<Triple<HRegionInfo, Integer, List<ServerName>>>(states.size());
+ List<Pair<HRegionInfo, List<ServerName>>> regionOpenInfos =
+ new ArrayList<Pair<HRegionInfo, List<ServerName>>>(states.size());
for (RegionState state: states) {
HRegionInfo region = state.getRegion();
- String encodedRegionName = region.getEncodedName();
- Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
- if (useZKForAssignment && (nodeVersion == null || nodeVersion == -1)) {
- LOG.warn("failed to offline in zookeeper: " + region);
- failedToOpenRegions.add(region); // assign individually later
- Lock lock = locks.remove(encodedRegionName);
- lock.unlock();
- } else {
- regionStates.updateRegionState(
- region, State.PENDING_OPEN, destination);
- List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
- if (this.shouldAssignRegionsWithFavoredNodes) {
- favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
- }
- regionOpenInfos.add(new Triple<HRegionInfo, Integer, List<ServerName>>(
- region, nodeVersion, favoredNodes));
+ regionStates.updateRegionState(
+ region, State.PENDING_OPEN, destination);
+ List<ServerName> favoredNodes = ServerName.EMPTY_SERVER_LIST;
+ if (this.shouldAssignRegionsWithFavoredNodes) {
+ favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
}
+ regionOpenInfos.add(new Pair<HRegionInfo, List<ServerName>>(
+ region, favoredNodes));
}
// Move on to open regions.
@@ -1686,15 +764,8 @@ public class AssignmentManager extends ZooKeeperListener {
RegionOpeningState openingState = regionOpeningStateList.get(k);
if (openingState != RegionOpeningState.OPENED) {
HRegionInfo region = regionOpenInfos.get(k).getFirst();
- if (openingState == RegionOpeningState.ALREADY_OPENED) {
- processAlreadyOpenedRegion(region, destination);
- } else if (openingState == RegionOpeningState.FAILED_OPENING) {
- // Failed opening this region, reassign it later
- failedToOpenRegions.add(region);
- } else {
- LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
- + openingState + " in assigning region " + region);
- }
+ // Failed opening this region, reassign it later
+ failedToOpenRegions.add(region);
}
}
break;
@@ -1771,8 +842,7 @@ public class AssignmentManager extends ZooKeeperListener {
* on an unexpected server scenario, for an example)
*/
private void unassign(final HRegionInfo region,
- final RegionState state, final int versionOfClosingNode,
- final ServerName dest, final boolean transitionInZK,
+ final RegionState state, final ServerName dest,
final ServerName src) {
ServerName server = src;
if (state != null) {
@@ -1788,10 +858,6 @@ public class AssignmentManager extends ZooKeeperListener {
if (!serverManager.isServerOnline(server)) {
LOG.debug("Offline " + region.getRegionNameAsString()
+ ", no need to unassign since it's on a dead server: " + server);
- if (transitionInZK) {
- // delete the node. if no node exists need not bother.
- deleteClosingOrClosedNode(region, server);
- }
if (state != null) {
regionOffline(region);
}
@@ -1799,16 +865,9 @@ public class AssignmentManager extends ZooKeeperListener {
}
try {
// Send CLOSE RPC
- if (serverManager.sendRegionClose(server, region,
- versionOfClosingNode, dest, transitionInZK)) {
+ if (serverManager.sendRegionClose(server, region, dest)) {
LOG.debug("Sent CLOSE to " + server + " for region " +
region.getRegionNameAsString());
- if (useZKForAssignment && !transitionInZK && state != null) {
- // Retry to make sure the region is
- // closed so as to avoid double assignment.
- unassign(region, state, versionOfClosingNode,
- dest, transitionInZK, src);
- }
return;
}
// This never happens. Currently regionserver close always return true.
@@ -1825,9 +884,6 @@ public class AssignmentManager extends ZooKeeperListener {
|| t instanceof ServerNotRunningYetException) {
LOG.debug("Offline " + region.getRegionNameAsString()
+ ", it's not any more on " + server, t);
- if (transitionInZK) {
- deleteClosingOrClosedNode(region, server);
- }
if (state != null) {
regionOffline(region);
}
@@ -1840,9 +896,6 @@ public class AssignmentManager extends ZooKeeperListener {
sleepTime = 1 + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
} else {
- // RS is already processing this region, only need to update the timestamp
- LOG.debug("update " + state + " the timestamp.");
- state.updateTimestampToNow();
if (maxWaitTime < 0) {
maxWaitTime =
EnvironmentEdgeManager.currentTimeMillis()
@@ -1898,7 +951,6 @@ public class AssignmentManager extends ZooKeeperListener {
state = regionStates.createRegionState(region);
}
- ServerName sn = state.getServerName();
if (forceNewPlan && LOG.isDebugEnabled()) {
LOG.debug("Force region state offline " + state);
}
@@ -1916,7 +968,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
case FAILED_CLOSE:
case FAILED_OPEN:
- unassign(region, state, -1, null, false, null);
+ unassign(region, state, null, null);
state = regionStates.getRegionState(region);
if (state.isFailedClose()) {
// If we can't close the region, we can't re-assign
@@ -1926,21 +978,6 @@ public class AssignmentManager extends ZooKeeperListener {
return null;
}
case OFFLINE:
- // This region could have been open on this server
- // for a while. If the server is dead and not processed
- // yet, we can move on only if the meta shows the
- // region is not on this server actually, or on a server
- // not dead, or dead and processed already.
- // In case not using ZK, we don't need this check because
- // we have the latest info in memory, and the caller
- // will do another round checking any way.
- if (useZKForAssignment
- && regionStates.isServerDeadAndNotProcessed(sn)
- && wasRegionOnDeadServerByMeta(region, sn)) {
- LOG.info("Skip assigning " + region.getRegionNameAsString()
- + ", it is on a dead but not processed yet server: " + sn);
- return null;
- }
case CLOSED:
break;
default:
@@ -1951,49 +988,15 @@ public class AssignmentManager extends ZooKeeperListener {
return state;
}
- @SuppressWarnings("deprecation")
- private boolean wasRegionOnDeadServerByMeta(
- final HRegionInfo region, final ServerName sn) {
- try {
- if (region.isMetaRegion()) {
- ServerName server = this.server.getMetaTableLocator().
- getMetaRegionLocation(this.server.getZooKeeper());
- return regionStates.isServerDeadAndNotProcessed(server);
- }
- while (!server.isStopped()) {
- try {
- this.server.getMetaTableLocator().waitMetaRegionLocation(server.getZooKeeper());
- Result r = MetaTableAccessor.getRegionResult(server.getShortCircuitConnection(),
- region.getRegionName());
- if (r == null || r.isEmpty()) return false;
- ServerName server = HRegionInfo.getServerName(r);
- return regionStates.isServerDeadAndNotProcessed(server);
- } catch (IOException ioe) {
- LOG.info("Received exception accessing hbase:meta during force assign "
- + region.getRegionNameAsString() + ", retrying", ioe);
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.info("Interrupted accessing hbase:meta", e);
- }
- // Call is interrupted or server is stopped.
- return regionStates.isServerDeadAndNotProcessed(sn);
- }
-
/**
* Caller must hold lock on the passed <code>state</code> object.
* @param state
- * @param setOfflineInZK
* @param forceNewPlan
*/
- private void assign(RegionState state,
- final boolean setOfflineInZK, final boolean forceNewPlan) {
+ private void assign(RegionState state, boolean forceNewPlan) {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
try {
Configuration conf = server.getConfiguration();
- RegionState currentState = state;
- int versionOfOfflineNode = -1;
RegionPlan plan = null;
long maxWaitTime = -1;
HRegionInfo region = state.getRegion();
@@ -2027,14 +1030,6 @@ public class AssignmentManager extends ZooKeeperListener {
regionStates.updateRegionState(region, State.FAILED_OPEN);
return;
}
- if (setOfflineInZK && versionOfOfflineNode == -1) {
- // get the version of the znode after setting it to OFFLINE.
- // versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
- versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
- if (versionOfOfflineNode != -1) {
- if (isDisabledorDisablingRegionInRIT(region)) {
- return;
- }
// In case of assignment from EnableTableHandler table state is ENABLING. Any how
// EnableTableHandler will set ENABLED after assigning all the table regions. If we
// try to set to ENABLED directly then client API may think table is enabled.
@@ -2047,22 +1042,10 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.debug("Setting table " + tableName + " to ENABLED state.");
setEnabledTable(tableName);
}
- }
- }
- if (setOfflineInZK && versionOfOfflineNode == -1) {
- LOG.info("Unable to set offline in ZooKeeper to assign " + region);
- // Setting offline in ZK must have been failed due to ZK racing or some
- // exception which may make the server to abort. If it is ZK racing,
- // we should retry since we already reset the region state,
- // existing (re)assignment will fail anyway.
- if (!server.isAborted()) {
- continue;
- }
- }
LOG.info("Assigning " + region.getRegionNameAsString() +
" to " + plan.getDestination().toString());
// Transition RegionState to PENDING_OPEN
- currentState = regionStates.updateRegionState(region,
+ regionStates.updateRegionState(region,
State.PENDING_OPEN, plan.getDestination());
boolean needNewPlan;
@@ -2074,7 +1057,7 @@ public class AssignmentManager extends ZooKeeperListener {
favoredNodes = ((FavoredNodeLoadBalancer)this.balancer).getFavoredNodes(region);
}
regionOpenState = serverManager.sendRegionOpen(
- plan.getDestination(), region, versionOfOfflineNode, favoredNodes);
+ plan.getDestination(), region, favoredNodes);
if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
// Failed opening this region, looping again on a new server.
@@ -2084,9 +1067,6 @@ public class AssignmentManager extends ZooKeeperListener {
"try=" + i + " of " + this.maximumAttempts);
} else {
// we're done
- if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
- processAlreadyOpenedRegion(region, plan.getDestination());
- }
return;
}
@@ -2186,8 +1166,7 @@ public class AssignmentManager extends ZooKeeperListener {
// Clean out plan we failed execute and one that doesn't look like it'll
// succeed anyways; we need a new plan!
// Transition back to OFFLINE
- currentState = regionStates.updateRegionState(region, State.OFFLINE);
- versionOfOfflineNode = -1;
+ regionStates.updateRegionState(region, State.OFFLINE);
plan = newPlan;
} else if(plan.getDestination().equals(newPlan.getDestination()) &&
previousException instanceof FailedServerException) {
@@ -2213,17 +1192,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
- private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
- // Remove region from in-memory transition and unassigned node from ZK
- // While trying to enable the table the regions of the table were
- // already enabled.
- LOG.debug("ALREADY_OPENED " + region.getRegionNameAsString()
- + " to " + sn);
- String encodedName = region.getEncodedName();
- deleteNodeInStates(encodedName, "offline", sn, EventType.M_ZK_REGION_OFFLINE);
- regionStates.regionOnline(region, sn);
- }
-
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
if (this.tableStateManager.isTableState(region.getTable(),
ZooKeeperProtos.Table.State.DISABLED,
@@ -2237,37 +1205,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
- * Set region as OFFLINED up in zookeeper
- *
- * @param state
- * @return the version of the offline node if setting of the OFFLINE node was
- * successful, -1 otherwise.
- */
- private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
- if (!state.isClosed() && !state.isOffline()) {
- String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
- this.server.abort(msg, new IllegalStateException(msg));
- return -1;
- }
- regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
- int versionOfOfflineNode;
- try {
- // get the version after setting the znode to OFFLINE
- versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
- state.getRegion(), destination);
- if (versionOfOfflineNode == -1) {
- LOG.warn("Attempted to create/force node into OFFLINE state before "
- + "completing assignment but failed to do so for " + state);
- return -1;
- }
- } catch (KeeperException e) {
- server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
- return -1;
- }
- return versionOfOfflineNode;
- }
-
- /**
* @param region the region to assign
* @return Plan for passed <code>region</code> (If none currently, it creates one or
* if no servers to assign, it returns null).
@@ -2388,7 +1325,6 @@ public class AssignmentManager extends ZooKeeperListener {
String encodedName = region.getEncodedName();
// Grab the state of this region and synchronize on it
- int versionOfClosingNode = -1;
// We need a lock here as we're going to do a put later and we don't want multiple states
// creation
ReentrantLock lock = locker.acquireLock(encodedName);
@@ -2404,56 +1340,12 @@ public class AssignmentManager extends ZooKeeperListener {
// Offline region will be reassigned below
return;
}
- // Create the znode in CLOSING state
- try {
- if (state == null || state.getServerName() == null) {
- // We don't know where the region is, offline it.
- // No need to send CLOSE RPC
- LOG.warn("Attempting to unassign a region not in RegionStates"
- + region.getRegionNameAsString() + ", offlined");
- regionOffline(region);
- return;
- }
- if (useZKForAssignment) {
- versionOfClosingNode = ZKAssign.createNodeClosing(
- watcher, region, state.getServerName());
- if (versionOfClosingNode == -1) {
- LOG.info("Attempting to unassign " +
- region.getRegionNameAsString() + " but ZK closing node "
- + "can't be created.");
- reassign = false; // not unassigned at all
- return;
- }
- }
- } catch (KeeperException e) {
- if (e instanceof NodeExistsException) {
- // Handle race between master initiated close and regionserver
- // orchestrated splitting. See if existing node is in a
- // SPLITTING or SPLIT state. If so, the regionserver started
- // an op on node before we could get our CLOSING in. Deal.
- NodeExistsException nee = (NodeExistsException)e;
- String path = nee.getPath();
- try {
- if (isSplitOrSplittingOrMergedOrMerging(path)) {
- LOG.debug(path + " is SPLIT or SPLITTING or MERGED or MERGING; " +
- "skipping unassign because region no longer exists -- its split or merge");
- reassign = false; // no need to reassign for split/merged region
- return;
- }
- } catch (KeeperException.NoNodeException ke) {
- LOG.warn("Failed getData on SPLITTING/SPLIT at " + path +
- "; presuming split and that the region to unassign, " +
- encodedName + ", no longer exists -- confirm", ke);
- return;
- } catch (KeeperException ke) {
- LOG.error("Unexpected zk state", ke);
- } catch (DeserializationException de) {
- LOG.error("Failed parse", de);
- }
- }
- // If we get here, don't understand whats going on -- abort.
- server.abort("Unexpected ZK exception creating node CLOSING", e);
- reassign = false; // heading out already
+ if (state == null || state.getServerName() == null) {
+ // We don't know where the region is, offline it.
+ // No need to send CLOSE RPC
+ LOG.warn("Attempting to unassign a region not in RegionStates"
+ + region.getRegionNameAsString() + ", offlined");
+ regionOffline(region);
return;
}
state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
@@ -2468,7 +1360,6 @@ public class AssignmentManager extends ZooKeeperListener {
if (state.isFailedClose()) {
state = regionStates.updateRegionState(region, State.PENDING_CLOSE);
}
- state.updateTimestampToNow();
} else {
LOG.debug("Attempting to unassign " +
region.getRegionNameAsString() + " but it is " +
@@ -2476,13 +1367,13 @@ public class AssignmentManager extends ZooKeeperListener {
return;
}
- unassign(region, state, versionOfClosingNode, dest, useZKForAssignment, null);
+ unassign(region, state, dest, null);
} finally {
lock.unlock();
// Region is expected to be reassigned afterwards
if (!replicasToClose.contains(region) && reassign && regionStates.isRegionOffline(region)) {
- assign(region, true);
+ assign(region);
}
}
}
@@ -2492,48 +1383,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
- * @param region regioninfo of znode to be deleted.
- */
- public void deleteClosingOrClosedNode(HRegionInfo region, ServerName sn) {
- String encodedName = region.getEncodedName();
- deleteNodeInStates(encodedName, "closing", sn, EventType.M_ZK_REGION_CLOSING,
- EventType.RS_ZK_REGION_CLOSED);
- }
-
- /**
- * @param path
- * @return True if znode is in SPLIT or SPLITTING or MERGED or MERGING state.
- * @throws KeeperException Can happen if the znode went away in meantime.
- * @throws DeserializationException
- */
- private boolean isSplitOrSplittingOrMergedOrMerging(final String path)
- throws KeeperException, DeserializationException {
- boolean result = false;
- // This may fail if the SPLIT or SPLITTING or MERGED or MERGING znode gets
- // cleaned up before we can get data from it.
- byte [] data = ZKAssign.getData(watcher, path);
- if (data == null) {
- LOG.info("Node " + path + " is gone");
- return false;
- }
- RegionTransition rt = RegionTransition.parseFrom(data);
- switch (rt.getEventType()) {
- case RS_ZK_REQUEST_REGION_SPLIT:
- case RS_ZK_REGION_SPLIT:
- case RS_ZK_REGION_SPLITTING:
- case RS_ZK_REQUEST_REGION_MERGE:
- case RS_ZK_REGION_MERGED:
- case RS_ZK_REGION_MERGING:
- result = true;
- break;
- default:
- LOG.info("Node " + path + " is in " + rt.getEventType());
- break;
- }
- return result;
- }
-
- /**
* Used by unit tests. Return the number of regions opened so far in the life
* of the master. Increases by one every time the master opens a region
* @return the counter value of the number of regions opened so far
@@ -2577,8 +1426,8 @@ public class AssignmentManager extends ZooKeeperListener {
* @throws KeeperException
*/
public void assignMeta() throws KeeperException {
- this.server.getMetaTableLocator().deleteMetaLocation(this.watcher);
- assign(HRegionInfo.FIRST_META_REGIONINFO, true);
+ this.server.getMetaTableLocator().deleteMetaLocation(this.server.getZooKeeper());
+ assign(HRegionInfo.FIRST_META_REGIONINFO);
}
/**
@@ -2735,30 +1584,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
/**
- * Wait until no regions in transition.
- * @param timeout How long to wait.
- * @return True if nothing in regions in transition.
- * @throws InterruptedException
- */
- boolean waitUntilNoRegionsInTransition(final long timeout)
- throws InterruptedException {
- // Blocks until there are no regions in transition. It is possible that
- // there
- // are regions in transition immediately after this returns but guarantees
- // that if it returns without an exception that there was a period of time
- // with no regions in transition from the point-of-view of the in-memory
- // state of the Master.
- final long endTime = System.currentTimeMillis() + timeout;
-
- while (!this.server.isStopped() && regionStates.isRegionsInTransition()
- && endTime > System.currentTimeMillis()) {
- regionStates.waitForUpdate(100);
- }
-
- return !regionStates.isRegionsInTransition();
- }
-
- /**
* Rebuild the list of user regions and assignment information.
* <p>
* Returns a set of servers that are not found to be online that hosted
@@ -2829,16 +1654,11 @@ public class AssignmentManager extends ZooKeeperListener {
if (!onlineServers.contains(regionLocation)) {
// Region is located on a server that isn't online
offlineServers.add(regionLocation);
- if (useZKForAssignment) {
- regionStates.regionOffline(regionInfo);
- }
} else if (!disabledOrEnablingTables.contains(tableName)) {
// Region is being served and on an active server
// add only if region not in disabled or enabling table
regionStates.regionOnline(regionInfo, regionLocation);
balancer.regionOnline(regionInfo, regionLocation);
- } else if (useZKForAssignment) {
- regionStates.regionOffline(regionInfo);
}
// need to enable the table if not disabled or disabling or enabling
// this will be used in rolling restarts
@@ -2911,21 +1731,12 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* Processes list of dead servers from result of hbase:meta scan and regions in RIT
- * <p>
- * This is used for failover to recover the lost regions that belonged to
- * RegionServers which failed while there was no active master or regions
- * that were in RIT.
- * <p>
- *
*
* @param deadServers
* The list of dead servers which failed while there was no active
* master. Can be null.
- * @throws IOException
- * @throws KeeperException
*/
- private void processDeadServersAndRecoverLostRegions(
- Set<ServerName> deadServers) throws IOException, KeeperException {
+ private void processDeadServers(Set<ServerName> deadServers) {
if (deadServers != null && !deadServers.isEmpty()) {
for (ServerName serverName: deadServers) {
if (!serverManager.isServerDead(serverName)) {
@@ -2934,36 +1745,27 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
- List<String> nodes = useZKForAssignment ?
- ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.assignmentZNode)
- : ZKUtil.listChildrenNoWatch(watcher, watcher.assignmentZNode);
- if (nodes != null && !nodes.isEmpty()) {
- for (String encodedRegionName : nodes) {
- processRegionInTransition(encodedRegionName, null);
+ // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
+ // in case the RPC call is not sent out yet before the master was shut down
+ // since we update the state before we send the RPC call. We can't update
+ // the state after the RPC call. Otherwise, we don't know what's happened
+ // to the region if the master dies right after the RPC call is out.
+ Map<String, RegionState> rits = regionStates.getRegionsInTransition();
+ for (RegionState regionState: rits.values()) {
+ if (!serverManager.isServerOnline(regionState.getServerName())) {
+ continue; // SSH will handle it
}
- } else if (!useZKForAssignment) {
- // We need to send RPC call again for PENDING_OPEN/PENDING_CLOSE regions
- // in case the RPC call is not sent out yet before the master was shut down
- // since we update the state before we send the RPC call. We can't update
- // the state after the RPC call. Otherwise, we don't know what's happened
- // to the region if the master dies right after the RPC call is out.
- Map<String, RegionState> rits = regionStates.getRegionsInTransition();
- for (RegionState regionState: rits.values()) {
- if (!serverManager.isServerOnline(regionState.getServerName())) {
- continue; // SSH will handle it
- }
- State state = regionState.getState();
- LOG.info("Processing " + regionState);
- switch (state) {
- case PENDING_OPEN:
- retrySendRegionOpen(regionState);
- break;
- case PENDING_CLOSE:
- retrySendRegionClose(regionState);
- break;
- default:
- // No process for other states
- }
+ State state = regionState.getState();
+ LOG.info("Processing " + regionState);
+ switch (state) {
+ case PENDING_OPEN:
+ retrySendRegionOpen(regionState);
+ break;
+ case PENDING_CLOSE:
+ retrySendRegionClose(regionState);
+ break;
+ default:
+ // No process for other states
}
}
}
@@ -2992,7 +1794,7 @@ public class AssignmentManager extends ZooKeeperListener {
favoredNodes = ((FavoredNodeLoadBalancer)balancer).getFavoredNodes(hri);
}
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(
- serverName, hri, -1, favoredNodes);
+ serverName, hri, favoredNodes);
if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
// Failed opening this region, this means the target server didn't get
@@ -3045,7 +1847,7 @@ public class AssignmentManager extends ZooKeeperListener {
while (serverManager.isServerOnline(serverName)
&& !server.isStopped() && !server.isAborted()) {
try {
- if (!serverManager.sendRegionClose(serverName, hri, -1, null, false)) {
+ if (!serverManager.sendRegionClose(serverName, hri, null)) {
// This means the region is still on the target server
LOG.debug("Got false in retry sendRegionClose for "
+ regionState + ", re-close it");
@@ -3179,43 +1981,26 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* Check if the shutdown server carries the specific region.
- * We have a bunch of places that store region location
- * Those values aren't consistent. There is a delay of notification.
- * The location from zookeeper unassigned node has the most recent data;
- * but the node could be deleted after the region is opened by AM.
- * The AM's info could be old when OpenedRegionHandler
- * processing hasn't finished yet when server shutdown occurs.
* @return whether the serverName currently hosts the region
*/
private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
- RegionTransition rt = null;
- try {
- byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
- // This call can legitimately come by null
- rt = data == null? null: RegionTransition.parseFrom(data);
- } catch (KeeperException e) {
- server.abort("Exception reading unassigned node for region=" + hri.getEncodedName(), e);
- } catch (DeserializationException e) {
- server.abort("Exception parsing unassigned node for region=" + hri.getEncodedName(), e);
- }
-
- ServerName addressFromZK = rt != null? rt.getServerName(): null;
- if (addressFromZK != null) {
- // if we get something from ZK, we will use the data
- boolean matchZK = addressFromZK.equals(serverName);
- LOG.debug("Checking region=" + hri.getRegionNameAsString() + ", zk server=" + addressFromZK +
- " current=" + serverName + ", matches=" + matchZK);
- return matchZK;
+ RegionState regionState = regionStates.getRegionTransitionState(hri);
+ ServerName transitionAddr = regionState != null? regionState.getServerName(): null;
+ if (transitionAddr != null) {
+ boolean matchTransitionAddr = transitionAddr.equals(serverName);
+ LOG.debug("Checking region=" + hri.getRegionNameAsString()
+ + ", transitioning on server=" + matchTransitionAddr
+ + " server being checked: " + serverName
+ + ", matches=" + matchTransitionAddr);
+ return matchTransitionAddr;
}
- ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
- boolean matchAM = (addressFromAM != null &&
- addressFromAM.equals(serverName));
- LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
- " is on server=" + (addressFromAM != null ? addressFromAM : "null") +
- " server being checked: " + serverName);
-
- return matchAM;
+ ServerName assignedAddr = regionStates.getRegionServerOfRegion(hri);
+ boolean matchAssignedAddr = serverName.equals(assignedAddr);
+ LOG.debug("based on AM, current region=" + hri.getRegionNameAsString()
+ + " is on server=" + assignedAddr + ", server being checked: "
+ + serverName);
+ return matchAssignedAddr;
}
/**
@@ -3237,7 +2022,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
- List<HRegionInfo> regions = regionStates.serverOffline(watcher, sn);
+ List<HRegionInfo> regions = regionStates.serverOffline(sn);
for (Iterator<HRegionInfo> it = regions.iterator(); it.hasNext(); ) {
HRegionInfo hri = it.next();
String encodedName = hri.getEncodedName();
@@ -3255,12 +2040,6 @@ public class AssignmentManager extends ZooKeeperListener {
+ " on the dead server any more: " + sn);
it.remove();
} else {
- try {
- // Delete the ZNode if exists
- ZKAssign.deleteNodeFailSilent(watcher, hri);
- } catch (KeeperException ke) {
- server.abort("Unexpected ZK exception deleting node " + hri, ke);
- }
if (tableStateManager.isTableState(hri.getTable(),
ZooKeeperProtos.Table.State.DISABLED, ZooKeeperProtos.Table.State.DISABLING)) {
regionStates.regionOffline(hri);
@@ -3317,12 +2096,7 @@ public class AssignmentManager extends ZooKeeperListener {
* Shutdown the threadpool executor service
*/
public void shutdown() {
- // It's an immediate shutdown, so we're clearing the remaining tasks.
- synchronized (zkEventWorkerWaitingList){
- zkEventWorkerWaitingList.clear();
- }
threadPoolExecutorService.shutdownNow();
- zkEventWorkers.shutdownNow();
regionStateStore.stop();
}
@@ -3339,65 +2113,6 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
- /**
- * Set region as OFFLINED up in zookeeper asynchronously.
- * @param state
- * @return True if we succeeded, false otherwise (State was incorrect or failed
- * updating zk).
- */
- private boolean asyncSetOfflineInZooKeeper(final RegionState state,
- final AsyncCallback.StringCallback cb, final ServerName destination) {
- if (!state.isClosed() && !state.isOffline()) {
- this.server.abort("Unexpected state trying to OFFLINE; " + state,
- new IllegalStateException());
- return false;
- }
- regionStates.updateRegionState(state.getRegion(), State.OFFLINE);
- try {
- ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
- destination, cb, state);
- } catch (KeeperException e) {
- if (e instanceof NodeExistsException) {
- LOG.warn("Node for " + state.getRegion() + " already exists");
- } else {
- server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
- }
- return false;
- }
- return true;
- }
-
- private boolean deleteNodeInStates(String encodedName,
- String desc, ServerName sn, EventType... types) {
- try {
- for (EventType et: types) {
- if (ZKAssign.deleteNode(watcher, encodedName, et, sn)) {
- return true;
- }
- }
- LOG.info("Failed to delete the " + desc + " node for "
- + encodedName + ". The node type may not match");
- } catch (NoNodeException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("The " + desc + " node for " + encodedName + " already deleted");
- }
- } catch (KeeperException ke) {
- server.abort("Unexpected ZK exception deleting " + desc
- + " node for the region " + encodedName, ke);
- }
- return false;
- }
-
- private void deleteMergingNode(String encodedName, ServerName sn) {
- deleteNodeInStates
<TRUNCATED>