You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/10/20 05:57:50 UTC
svn commit: r1400358 [1/4] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/master/handler/
main/java/org/apache/hadoop/hbase/protobuf/ main/java/org/apac...
Author: jxiang
Date: Sat Oct 20 03:57:49 2012
New Revision: 1400358
URL: http://svn.apache.org/viewvc?rev=1400358&view=rev
Log:
HBASE-6611 Forcing region state offline cause double assignment
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/OfflineCallback.java (with props)
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionState.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ResponseConverter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ZooKeeperProtos.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/OpenRegionHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKAssign.java
hbase/trunk/hbase-server/src/main/protobuf/Admin.proto
hbase/trunk/hbase-server/src/main/protobuf/ZooKeeper.proto
hbase/trunk/hbase-server/src/main/ruby/shell/commands/assign.rb
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestCloseRegionHandler.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/handler/TestOpenRegionHandler.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/RegionTransition.java Sat Oct 20 03:57:49 2012
@@ -54,7 +54,7 @@ public class RegionTransition {
}
public ServerName getServerName() {
- return ProtobufUtil.toServerName(this.rt.getOriginServerName());
+ return ProtobufUtil.toServerName(this.rt.getServerName());
}
public long getCreateTime() {
@@ -105,7 +105,7 @@ public class RegionTransition {
setHostName(sn.getHostname()).setPort(sn.getPort()).setStartCode(sn.getStartcode()).build();
ZooKeeperProtos.RegionTransition.Builder builder = ZooKeeperProtos.RegionTransition.newBuilder().
setEventTypeCode(type.getCode()).setRegionName(ByteString.copyFrom(regionName)).
- setOriginServerName(pbsn);
+ setServerName(pbsn);
builder.setCreateTime(System.currentTimeMillis());
if (payload != null) builder.setPayload(ByteString.copyFrom(payload));
return new RegionTransition(builder.build());
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignCallable.java Sat Oct 20 03:57:49 2012
@@ -42,7 +42,7 @@ public class AssignCallable implements C
@Override
public Object call() throws Exception {
- assignmentManager.assign(hri, true, true, true);
+ assignmentManager.assign(hri, true, true);
return null;
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Sat Oct 20 03:57:49 2012
@@ -23,12 +23,14 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -117,9 +119,10 @@ public class AssignmentManager extends Z
private final Map <String, HRegionInfo> regionsToReopen;
/*
- * Maximum times we recurse an assignment. See below in {@link #assign()}.
+ * Maximum times we recurse an assignment/unassignment.
+ * See below in {@link #assign()} and {@link #unassign()}.
*/
- private final int maximumAssignmentAttempts;
+ private final int maximumAttempts;
/** Plans for region movement. Key is the encoded version of a region name*/
// TODO: When do plans get cleaned out? Ever? In server open and in server
@@ -158,6 +161,18 @@ public class AssignmentManager extends Z
*/
final AtomicBoolean failoverCleanupDone = new AtomicBoolean(false);
+ // A temp ZK watcher for bulk assigner to avoid deadlock,
+ // will be removed in HBASE-6977
+ //
+ // A separate ZK watcher used for async ZK node offline.
+ // We can't use that exiting one because it could lead to
+ // deadlocks if its event thread asks for a locker held by a bulk
+ // assigner thread. This watcher is just for async ZK node offline.
+ // In HBASE-6977, we are going to process assignment ZK events
+ // outside of ZK event thread, so there won't be deadlock
+ // threat anymore. That's when this watcher to be removed.
+ private final ZooKeeperWatcher asyncOfflineZKWatcher;
+
/**
* Constructs a new assignment manager.
*
@@ -180,20 +195,24 @@ public class AssignmentManager extends Z
(new HashMap<String, HRegionInfo> ());
Configuration conf = server.getConfiguration();
this.timeoutMonitor = new TimeoutMonitor(
- conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
+ conf.getInt("hbase.master.assignment.timeoutmonitor.period", 60000),
server, serverManager,
- conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
+ conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1200000));
this.timerUpdater = new TimerUpdater(conf.getInt(
- "hbase.master.assignment.timerupdater.period", 10000), server);
+ "hbase.master.assignment.timerupdater.period", 10000), server);
Threads.setDaemonThreadRunning(timerUpdater.getThread(),
- server.getServerName() + ".timerUpdater");
+ server.getServerName() + ".timerUpdater");
this.zkTable = new ZKTable(this.watcher);
- this.maximumAssignmentAttempts =
+ this.maximumAttempts =
this.server.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = balancer;
this.threadPoolExecutorService = Executors.newCachedThreadPool();
this.masterMetrics = metrics;// can be null only with tests.
this.regionStates = new RegionStates(server, serverManager);
+ // A temp ZK watcher for bulk assigner to avoid deadlock,
+ // will be removed in HBASE-6977
+ asyncOfflineZKWatcher = new ZooKeeperWatcher(conf,
+ "async offline ZK watcher", server);
}
void startTimeOutMonitor() {
@@ -265,7 +284,7 @@ public class AssignmentManager extends Z
* @throws IOException
*/
public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
- throws IOException {
+ throws IOException {
List <HRegionInfo> hris =
MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName);
Integer pending = 0;
@@ -397,9 +416,8 @@ public class AssignmentManager extends Z
* @throws IOException
*/
boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
- throws InterruptedException, KeeperException, IOException {
- boolean intransistion =
- processRegionInTransition(hri.getEncodedName(), hri, null);
+ throws InterruptedException, KeeperException, IOException {
+ boolean intransistion = processRegionInTransition(hri.getEncodedName(), hri);
if (!intransistion) return intransistion;
LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
while (!this.server.isStopped() &&
@@ -416,14 +434,12 @@ public class AssignmentManager extends Z
* up in zookeeper.
* @param encodedRegionName Region to process failover for.
* @param regionInfo If null we'll go get it from meta table.
- * @param deadServers Can be null
* @return True if we processed <code>regionInfo</code> as a RIT.
* @throws KeeperException
* @throws IOException
*/
boolean processRegionInTransition(final String encodedRegionName,
- final HRegionInfo regionInfo, final Map<ServerName, List<HRegionInfo>> deadServers)
- throws KeeperException, IOException {
+ final HRegionInfo regionInfo) throws KeeperException, IOException {
// We need a lock here to ensure that we will not put the same region twice
// It has no reason to be a lock shared with the other operations.
// We can do the lock on the region only, instead of a global lock: what we want to ensure
@@ -445,7 +461,7 @@ public class AssignmentManager extends Z
hri = regionStates.getRegionInfo(rt.getRegionName());
if (hri == null) return false;
}
- processRegionsInTransition(rt, hri, deadServers, stat.getVersion());
+ processRegionsInTransition(rt, hri, stat.getVersion());
return true;
} finally {
lock.unlock();
@@ -453,16 +469,17 @@ public class AssignmentManager extends Z
}
/**
- * This call is invoked only during failover mode, zk assignment node processing.
+ * This call is invoked only (1) master assign root and meta;
+ * (2) during failover mode startup, zk assignment node processing.
* The locker is set in the caller.
*
* It should be private but it is used by some test too.
*/
- void processRegionsInTransition(final RegionTransition rt, final HRegionInfo regionInfo,
- final Map<ServerName, List<HRegionInfo>> deadServers, int expectedVersion)
- throws KeeperException {
+ void processRegionsInTransition(
+ final RegionTransition rt, final HRegionInfo regionInfo,
+ int expectedVersion) throws KeeperException {
EventType et = rt.getEventType();
- // Get ServerName. Could be null.
+ // Get ServerName. Could not be null.
ServerName sn = rt.getServerName();
String encodedRegionName = regionInfo.getEncodedName();
LOG.info("Processing region " + regionInfo.getRegionNameAsString() + " in state " + et);
@@ -475,9 +492,8 @@ public class AssignmentManager extends Z
case M_ZK_REGION_CLOSING:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers)
- && !serverManager.isServerOnline(sn)) {
- // If was on dead server, its closed now. Force to OFFLINE and this
+ if (!serverManager.isServerOnline(sn)) {
+ // If was not online, its closed now. Force to OFFLINE and this
// will get it reassigned if appropriate
forceOffline(regionInfo, rt);
} else {
@@ -496,58 +512,42 @@ public class AssignmentManager extends Z
case M_ZK_REGION_OFFLINE:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers)
- && (sn == null || !serverManager.isServerOnline(sn))) {
+ if (!serverManager.isServerOnline(sn)) {
// Region is offline, insert into RIT and handle it like a closed
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
- } else if (sn != null && !serverManager.isServerOnline(sn)) {
- // to handle cases where offline node is created but sendRegionOpen
- // RPC is not yet sent
- addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
} else {
+ // Just insert region into RIT.
+ // If this never updates the timeout will trigger new assignment
regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
}
break;
case RS_ZK_REGION_OPENING:
- // TODO: Could check if it was on deadServers. If it was, then we could
- // do what happens in TimeoutMonitor when it sees this condition.
- // Just insert region into RIT
- // If this never updates the timeout will trigger new assignment
- if (regionInfo.isMetaTable()) {
+ if (regionInfo.isMetaTable() || !serverManager.isServerOnline(sn)) {
regionStates.updateRegionState(rt, RegionState.State.OPENING);
// If ROOT or .META. table is waiting for timeout monitor to assign
// it may take lot of time when the assignment.timeout.period is
// the default value which may be very long. We will not be able
// to serve any request during this time.
// So we will assign the ROOT and .META. region immediately.
+ // For a user region, if the server is not online, it takes
+ // some time for timeout monitor to kick in. We know the region
+ // won't open. So we will assign the opening
+ // region immediately too.
processOpeningState(regionInfo);
- break;
- } else if (deadServers != null
- && deadServers.keySet().contains(sn)) {
- // if the region is found on a dead server, we can assign
- // it to a new RS. (HBASE-5882)
- processOpeningState(regionInfo);
- break;
+ } else {
+ // Just insert region into RIT.
+ // If this never updates the timeout will trigger new assignment
+ regionStates.updateRegionState(rt, RegionState.State.OPENING);
}
- regionStates.updateRegionState(rt, RegionState.State.OPENING);
break;
case RS_ZK_REGION_OPENED:
- // Region is opened, insert into RIT and handle it
- regionStates.updateRegionState(rt, RegionState.State.OPEN);
- // sn could be null if this server is no longer online. If
- // that is the case, just let this RIT timeout; it'll be assigned
- // to new server then.
- if (sn == null) {
- LOG.warn("Region in transition " + regionInfo.getEncodedName() +
- " references a null server; letting RIT timeout so will be " +
- "assigned elsewhere");
- } else if (!serverManager.isServerOnline(sn)
- && (isOnDeadServer(regionInfo, deadServers)
- || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
+ if (!serverManager.isServerOnline(sn)) {
forceOffline(regionInfo, rt);
} else {
+ // Region is opened, insert into RIT and handle it
+ regionStates.updateRegionState(rt, RegionState.State.OPEN);
new OpenedRegionHandler(server, this, regionInfo, sn, expectedVersion).process();
}
break;
@@ -562,7 +562,6 @@ public class AssignmentManager extends Z
}
}
-
/**
* Put the region <code>hri</code> into an offline state up in zk.
*
@@ -573,13 +572,12 @@ public class AssignmentManager extends Z
* @throws KeeperException
*/
private void forceOffline(final HRegionInfo hri, final RegionTransition oldRt)
- throws KeeperException {
+ throws KeeperException {
// If was on dead server, its closed now. Force to OFFLINE and then
// handle it like a close; this will get it reassigned if appropriate
LOG.debug("RIT " + hri.getEncodedName() + " in state=" + oldRt.getEventType() +
" was on deadserver; forcing offline");
- ZKAssign.createOrForceNodeOffline(this.watcher, hri,
- this.server.getServerName());
+ ZKAssign.createOrForceNodeOffline(this.watcher, hri, oldRt.getServerName());
addToRITandCallClose(hri, RegionState.State.OFFLINE, oldRt);
}
@@ -591,7 +589,7 @@ public class AssignmentManager extends Z
* @param oldData
*/
private void addToRITandCallClose(final HRegionInfo hri,
- final RegionState.State state, final RegionTransition oldData) {
+ final RegionState.State state, final RegionTransition oldData) {
regionStates.updateRegionState(oldData, state);
new ClosedRegionHandler(this.server, this, hri).process();
}
@@ -607,23 +605,6 @@ public class AssignmentManager extends Z
}
/**
- * @param regionInfo
- * @param deadServers Map of deadServers and the regions they were carrying;
- * can be null.
- * @return True if the passed regionInfo in the passed map of deadServers?
- */
- private boolean isOnDeadServer(final HRegionInfo regionInfo,
- final Map<ServerName, List<HRegionInfo>> deadServers) {
- if (deadServers == null) return false;
- for (List<HRegionInfo> deadRegions: deadServers.values()) {
- if (deadRegions.contains(regionInfo)) {
- return true;
- }
- }
- return false;
- }
-
- /**
* Handles various states an unassigned node can be in.
* <p>
* Method is called when a state change is suspected for an unassigned node.
@@ -639,10 +620,6 @@ public class AssignmentManager extends Z
return;
}
final ServerName sn = rt.getServerName();
- if (sn == null) {
- LOG.warn("Null servername: " + rt);
- return;
- }
// Check if this is a special HBCK transition
if (sn.equals(HBCK_CODE_SERVERNAME)) {
handleHBCK(rt);
@@ -653,30 +630,48 @@ public class AssignmentManager extends Z
String encodedName = HRegionInfo.encodeRegionName(regionName);
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
// Verify this is a known server
- if (!serverManager.isServerOnline(sn) &&
- !this.server.getServerName().equals(sn)
+ if (!serverManager.isServerOnline(sn)
&& !ignoreStatesRSOffline.contains(rt.getEventType())) {
LOG.warn("Attempted to handle region transition for server but " +
"server is not online: " + prettyPrintedRegionName);
return;
}
- // We need a lock on the region as we could update it
- Lock lock = locker.acquireLock(encodedName);
- try {
- // Printing if the event was created a long time ago helps debugging
- boolean lateEvent = createTime < (System.currentTimeMillis() - 15000);
- RegionState regionState = regionStates.getRegionTransitionState(encodedName);
+ RegionState regionState =
+ regionStates.getRegionTransitionState(encodedName);
+ long startTime = System.currentTimeMillis();
+ if (LOG.isDebugEnabled()) {
+ boolean lateEvent = createTime < (startTime - 15000);
LOG.debug("Handling transition=" + rt.getEventType() +
", server=" + sn + ", region=" +
(prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
(lateEvent ? ", which is more than 15 seconds late" : "") +
", current state from region state map =" + regionState);
- switch (rt.getEventType()) {
- case M_ZK_REGION_OFFLINE:
- // Nothing to do.
- break;
+ }
+ // We don't do anything for this event,
+ // so separate it out, no need to lock/unlock anything
+ if (rt.getEventType() == EventType.M_ZK_REGION_OFFLINE) {
+ return;
+ }
+ // We need a lock on the region as we could update it
+ Lock lock = locker.acquireLock(encodedName);
+ try {
+ RegionState latestState =
+ regionStates.getRegionTransitionState(encodedName);
+ if ((regionState == null && latestState != null)
+ || (regionState != null && latestState == null)
+ || (regionState != null && latestState != null
+ && latestState.getState() != regionState.getState())) {
+ LOG.warn("Region state changed from " + regionState + " to "
+ + latestState + ", while acquiring lock");
+ }
+ long waitedTime = System.currentTimeMillis() - startTime;
+ if (waitedTime > 5000) {
+ LOG.warn("Took " + waitedTime + "ms to acquire the lock");
+ }
+ regionState = latestState;
+ switch (rt.getEventType()) {
case RS_ZK_REGION_SPLITTING:
if (!isInStateForSplitting(regionState)) break;
regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
@@ -725,12 +720,12 @@ public class AssignmentManager extends Z
case M_ZK_REGION_CLOSING:
// Should see CLOSING after we have asked it to CLOSE or additional
// times after already being in state of CLOSING
- if (regionState != null &&
- (!regionState.isPendingClose() && !regionState.isClosing())) {
- LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
- " from server " + sn + " but region was in " +
- " the state " + regionState + " and not " +
- "in expected PENDING_CLOSE or CLOSING states");
+ if (regionState != null
+ && !regionState.isPendingCloseOrClosingOnServer(sn)) {
+ LOG.warn("Received CLOSING for region " + prettyPrintedRegionName
+ + " from server " + sn + " but region was in the state " + regionState
+ + " and not in expected PENDING_CLOSE or CLOSING states,"
+ + " or not on the expected server");
return;
}
// Transition to CLOSING (or update stamp if already CLOSING)
@@ -739,12 +734,12 @@ public class AssignmentManager extends Z
case RS_ZK_REGION_CLOSED:
// Should see CLOSED after CLOSING but possible after PENDING_CLOSE
- if (regionState != null &&
- (!regionState.isPendingClose() && !regionState.isClosing())) {
- LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
- " from server " + sn + " but region was in " +
- " the state " + regionState + " and not " +
- "in expected PENDING_CLOSE or CLOSING states");
+ if (regionState != null
+ && !regionState.isPendingCloseOrClosingOnServer(sn)) {
+ LOG.warn("Received CLOSED for region " + prettyPrintedRegionName
+ + " from server " + sn + " but region was in the state " + regionState
+ + " and not in expected PENDING_CLOSE or CLOSING states,"
+ + " or not on the expected server");
return;
}
// Handle CLOSED by assigning elsewhere or stopping if a disable
@@ -759,11 +754,12 @@ public class AssignmentManager extends Z
break;
case RS_ZK_REGION_FAILED_OPEN:
- if (regionState != null &&
- (!regionState.isPendingOpen() && !regionState.isOpening())) {
- LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName +
- " from server " + sn + " but region was in " +
- " the state " + regionState + " and not in PENDING_OPEN or OPENING");
+ if (regionState != null
+ && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+ LOG.warn("Received FAILED_OPEN for region " + prettyPrintedRegionName
+ + " from server " + sn + " but region was in the state " + regionState
+ + " and not in expected PENDING_OPEN or OPENING states,"
+ + " or not on the expected server");
return;
}
// Handle this the same as if it were opened and then closed.
@@ -771,7 +767,7 @@ public class AssignmentManager extends Z
// When there are more than one region server a new RS is selected as the
// destination and the same is updated in the regionplan. (HBASE-5546)
if (regionState != null) {
- getRegionPlan(regionState, sn, true);
+ getRegionPlan(regionState.getRegion(), sn, true);
this.executorService.submit(new ClosedRegionHandler(server,
this, regionState.getRegion()));
}
@@ -780,13 +776,12 @@ public class AssignmentManager extends Z
case RS_ZK_REGION_OPENING:
// Should see OPENING after we have asked it to OPEN or additional
// times after already being in state of OPENING
- if (regionState != null &&
- (!regionState.isPendingOpen() && !regionState.isOpening())) {
- LOG.warn("Received OPENING for region " +
- prettyPrintedRegionName +
- " from server " + sn + " but region was in " +
- " the state " + regionState + " and not " +
- "in expected PENDING_OPEN or OPENING states");
+ if (regionState != null
+ && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+ LOG.warn("Received OPENING for region " + prettyPrintedRegionName
+ + " from server " + sn + " but region was in the state " + regionState
+ + " and not in expected PENDING_OPEN or OPENING states,"
+ + " or not on the expected server");
return;
}
// Transition to OPENING (or update stamp if already OPENING)
@@ -795,13 +790,12 @@ public class AssignmentManager extends Z
case RS_ZK_REGION_OPENED:
// Should see OPENED after OPENING but possible after PENDING_OPEN
- if (regionState != null &&
- (!regionState.isPendingOpen() && !regionState.isOpening())) {
- LOG.warn("Received OPENED for region " +
- prettyPrintedRegionName +
- " from server " + sn + " but region was in " +
- " the state " + regionState + " and not " +
- "in expected PENDING_OPEN or OPENING states");
+ if (regionState != null
+ && !regionState.isPendingOpenOrOpeningOnServer(sn)) {
+ LOG.warn("Received OPENED for region " + prettyPrintedRegionName
+ + " from server " + sn + " but region was in the state " + regionState
+ + " and not in expected PENDING_OPEN or OPENING states,"
+ + " or not on the expected server");
return;
}
// Handle OPENED by removing from transition and deleted zk node
@@ -948,31 +942,36 @@ public class AssignmentManager extends Z
public void nodeDeleted(final String path) {
if (path.startsWith(this.watcher.assignmentZNode)) {
String regionName = ZKAssign.getRegionName(this.watcher, path);
- RegionState rs = regionStates.getRegionTransitionState(regionName);
- if (rs != null) {
- HRegionInfo regionInfo = rs.getRegion();
- if (rs.isSplit()) {
- LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
- "clearing from RIT; rs=" + rs);
- regionOffline(rs.getRegion());
- } else {
- LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
- + " has been deleted.");
- if (rs.isOpened()) {
- ServerName serverName = rs.getServerName();
- regionOnline(regionInfo, serverName);
- LOG.info("The master has opened the region "
- + regionInfo.getRegionNameAsString() + " that was online on "
- + serverName);
- if (this.getZKTable().isDisablingOrDisabledTable(
- regionInfo.getTableNameAsString())) {
- LOG.debug("Opened region "
- + regionInfo.getRegionNameAsString() + " but "
- + "this table is disabled, triggering close of region");
- unassign(regionInfo);
+ Lock lock = locker.acquireLock(regionName);
+ try {
+ RegionState rs = regionStates.getRegionTransitionState(regionName);
+ if (rs != null) {
+ HRegionInfo regionInfo = rs.getRegion();
+ if (rs.isSplit()) {
+ LOG.debug("Ephemeral node deleted, regionserver crashed?, " +
+ "clearing from RIT; rs=" + rs);
+ regionOffline(rs.getRegion());
+ } else {
+ LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
+ + " has been deleted.");
+ if (rs.isOpened()) {
+ ServerName serverName = rs.getServerName();
+ regionOnline(regionInfo, serverName);
+ LOG.info("The master has opened the region "
+ + regionInfo.getRegionNameAsString() + " that was online on "
+ + serverName);
+ if (this.getZKTable().isDisablingOrDisabledTable(
+ regionInfo.getTableNameAsString())) {
+ LOG.debug("Opened region "
+ + regionInfo.getRegionNameAsString() + " but "
+ + "this table is disabled, triggering close of region");
+ unassign(regionInfo);
+ }
}
}
}
+ } finally {
+ lock.unlock();
}
}
}
@@ -1125,22 +1124,12 @@ public class AssignmentManager extends Z
assign(region, setOfflineInZK, false);
}
- public void assign(HRegionInfo region, boolean setOfflineInZK,
- boolean forceNewPlan) {
- assign(region, setOfflineInZK, forceNewPlan, false);
- }
-
/**
- * @param region
- * @param setOfflineInZK
- * @param forceNewPlan
- * @param hijack True if new assignment is needed, false otherwise
+ * Use care with forceNewPlan. It could cause double assignment.
*/
- public void assign(HRegionInfo region, boolean setOfflineInZK,
- boolean forceNewPlan, boolean hijack) {
- // If hijack is true do not call disableRegionIfInRIT as
- // we have not yet moved the znode to OFFLINE state.
- if (!hijack && isDisabledorDisablingRegionInRIT(region)) {
+ public void assign(HRegionInfo region,
+ boolean setOfflineInZK, boolean forceNewPlan) {
+ if (!setOfflineInZK && isDisabledorDisablingRegionInRIT(region)) {
return;
}
if (this.serverManager.isClusterShutdown()) {
@@ -1148,11 +1137,13 @@ public class AssignmentManager extends Z
region.getRegionNameAsString());
return;
}
- RegionState state = forceRegionStateToOffline(region, hijack);
String encodedName = region.getEncodedName();
Lock lock = locker.acquireLock(encodedName);
try {
- assign(region, state, setOfflineInZK, forceNewPlan, hijack);
+ RegionState state = forceRegionStateToOffline(region, forceNewPlan);
+ if (state != null) {
+ assign(state, setOfflineInZK, forceNewPlan);
+ }
} finally {
lock.unlock();
}
@@ -1166,226 +1157,251 @@ public class AssignmentManager extends Z
*/
boolean assign(final ServerName destination,
final List<HRegionInfo> regions) {
- if (regions.size() == 0) {
+ int regionCount = regions.size();
+ if (regionCount == 0) {
return true;
}
- LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
+ LOG.debug("Bulk assigning " + regionCount + " region(s) to " +
destination.toString());
- List<RegionState> states = new ArrayList<RegionState>(regions.size());
+ Set<String> encodedNames = new HashSet<String>(regionCount);
for (HRegionInfo region : regions) {
- states.add(forceRegionStateToOffline(region));
+ encodedNames.add(region.getEncodedName());
}
- // Add region plans, so we can updateTimers when one region is opened so
- // that unnecessary timeout on RIT is reduced.
- Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
- for (HRegionInfo region : regions) {
- plans.put(region.getEncodedName(), new RegionPlan(region, null,
- destination));
- }
- this.addPlans(plans);
- // Presumption is that only this thread will be updating the state at this
- // time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
- AtomicInteger counter = new AtomicInteger(0);
- CreateUnassignedAsyncCallback cb =
- new CreateUnassignedAsyncCallback(regionStates, this.watcher, destination, counter);
- for (RegionState state: states) {
- if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
- return false;
+ List<HRegionInfo> failedToOpenRegions = new ArrayList<HRegionInfo>();
+ Map<String, Lock> locks = locker.acquireLocks(encodedNames);
+ try {
+ AtomicInteger counter = new AtomicInteger(0);
+ Map<String, Integer> offlineNodesVersions = new ConcurrentHashMap<String, Integer>();
+ OfflineCallback cb = new OfflineCallback(
+ regionStates, asyncOfflineZKWatcher, destination, counter, offlineNodesVersions);
+ Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>(regions.size());
+ List<RegionState> states = new ArrayList<RegionState>(regions.size());
+ for (HRegionInfo region : regions) {
+ String encodedRegionName = region.getEncodedName();
+ RegionState state = forceRegionStateToOffline(region, true);
+ if (state != null && asyncSetOfflineInZooKeeper(
+ state, asyncOfflineZKWatcher, cb, destination)) {
+ RegionPlan plan = new RegionPlan(region, state.getServerName(), destination);
+ plans.put(encodedRegionName, plan);
+ states.add(state);
+ } else {
+ LOG.warn("failed to force region state to offline or "
+ + "failed to set it offline in ZK, will reassign later: " + region);
+ failedToOpenRegions.add(region); // assign individually later
+ Lock lock = locks.remove(encodedRegionName);
+ lock.unlock();
+ }
}
- }
- // Wait until all unassigned nodes have been put up and watchers set.
- int total = regions.size();
- for (int oldCounter = 0; !server.isStopped();) {
- int count = counter.get();
- if (oldCounter != count) {
- LOG.info(destination.toString() + " unassigned znodes=" + count +
- " of total=" + total);
- oldCounter = count;
+
+ // Wait until all unassigned nodes have been put up and watchers set.
+ int total = states.size();
+ for (int oldCounter = 0; !server.isStopped();) {
+ int count = counter.get();
+ if (oldCounter != count) {
+ LOG.info(destination.toString() + " unassigned znodes=" + count +
+ " of total=" + total);
+ oldCounter = count;
+ }
+ if (count >= total) break;
+ Threads.sleep(5);
}
- if (count == total) break;
- Threads.sleep(10);
- }
- if (server.isStopped()) {
- return false;
- }
- // Move on to open regions.
- try {
- // Send OPEN RPC. If it fails on a IOE or RemoteException, the
- // TimeoutMonitor will pick up the pieces.
- long maxWaitTime = System.currentTimeMillis() +
- this.server.getConfiguration().
- getLong("hbase.regionserver.rpc.startup.waittime", 60000);
- while (!this.server.isStopped()) {
- try {
- List<RegionOpeningState> regionOpeningStateList = this.serverManager
- .sendRegionOpen(destination, regions);
- if (regionOpeningStateList == null) {
- // Failed getting RPC connection to this server
+ if (server.isStopped()) {
+ return false;
+ }
+
+ // Add region plans, so we can updateTimers when one region is opened so
+ // that unnecessary timeout on RIT is reduced.
+ this.addPlans(plans);
+
+ List<Pair<HRegionInfo, Integer>> regionOpenInfos =
+ new ArrayList<Pair<HRegionInfo, Integer>>(states.size());
+ for (RegionState state: states) {
+ HRegionInfo region = state.getRegion();
+ String encodedRegionName = region.getEncodedName();
+ Integer nodeVersion = offlineNodesVersions.get(encodedRegionName);
+ if (nodeVersion == null || nodeVersion.intValue() == -1) {
+ LOG.warn("failed to offline in zookeeper: " + region);
+ failedToOpenRegions.add(region); // assign individually later
+ Lock lock = locks.remove(encodedRegionName);
+ lock.unlock();
+ } else {
+ try { // Set the ZK watcher explicitly
+ ZKAssign.getData(this.watcher, encodedRegionName);
+ } catch (KeeperException e) {
+ server.abort("Unexpected exception watching ZKAssign node", e);
return false;
}
- for (int i = 0; i < regionOpeningStateList.size(); i++) {
- if (regionOpeningStateList.get(i) == RegionOpeningState.ALREADY_OPENED) {
- processAlreadyOpenedRegion(regions.get(i), destination);
- } else if (regionOpeningStateList.get(i) == RegionOpeningState.FAILED_OPENING) {
- // Failed opening this region, reassign it
- assign(regions.get(i), true, true);
+ regionStates.updateRegionState(region,
+ RegionState.State.PENDING_OPEN, destination);
+ regionOpenInfos.add(new Pair<HRegionInfo, Integer>(
+ region, nodeVersion));
+ }
+ }
+
+ // Move on to open regions.
+ try {
+ // Send OPEN RPC. If it fails on a IOE or RemoteException, the
+ // TimeoutMonitor will pick up the pieces.
+ long maxWaitTime = System.currentTimeMillis() +
+ this.server.getConfiguration().
+ getLong("hbase.regionserver.rpc.startup.waittime", 60000);
+ while (!this.server.isStopped()) {
+ try {
+ List<RegionOpeningState> regionOpeningStateList = serverManager
+ .sendRegionOpen(destination, regionOpenInfos);
+ if (regionOpeningStateList == null) {
+ // Failed getting RPC connection to this server
+ return false;
}
+ for (int i = 0, n = regionOpeningStateList.size(); i < n; i++) {
+ RegionOpeningState openingState = regionOpeningStateList.get(i);
+ if (openingState != RegionOpeningState.OPENED) {
+ HRegionInfo region = regionOpenInfos.get(i).getFirst();
+ if (openingState == RegionOpeningState.ALREADY_OPENED) {
+ processAlreadyOpenedRegion(region, destination);
+ } else if (openingState == RegionOpeningState.FAILED_OPENING) {
+ // Failed opening this region, reassign it later
+ failedToOpenRegions.add(region);
+ } else {
+ LOG.warn("THIS SHOULD NOT HAPPEN: unknown opening state "
+ + openingState + " in assigning region " + region);
+ }
+ }
+ }
+ break;
+ } catch (IOException e) {
+ if (e instanceof RemoteException) {
+ e = ((RemoteException)e).unwrapRemoteException();
+ }
+ if (e instanceof RegionServerStoppedException) {
+ LOG.warn("The region server was shut down, ", e);
+ // No need to retry, the region server is a goner.
+ return false;
+ } else if (e instanceof ServerNotRunningYetException) {
+ // This is the one exception to retry. For all else we should just fail
+ // the startup.
+ long now = System.currentTimeMillis();
+ if (now < maxWaitTime) {
+ LOG.debug("Server is not yet up; waiting up to " +
+ (maxWaitTime - now) + "ms", e);
+ Thread.sleep(100);
+ continue;
+ }
+ }
+ throw e;
}
- break;
- } catch (RemoteException e) {
- IOException decodedException = e.unwrapRemoteException();
- if (decodedException instanceof RegionServerStoppedException) {
- LOG.warn("The region server was shut down, ", decodedException);
- // No need to retry, the region server is a goner.
- return false;
- } else if (decodedException instanceof ServerNotRunningYetException) {
- // This is the one exception to retry. For all else we should just fail
- // the startup.
- long now = System.currentTimeMillis();
- if (now > maxWaitTime) throw e;
- LOG.debug("Server is not yet up; waiting up to " +
- (maxWaitTime - now) + "ms", e);
- Thread.sleep(100);
- }
-
- throw decodedException;
}
- }
- } catch (IOException e) {
- // Can be a socket timeout, EOF, NoRouteToHost, etc
- LOG.info("Unable to communicate with the region server in order" +
+ } catch (IOException e) {
+ // Can be a socket timeout, EOF, NoRouteToHost, etc
+ LOG.info("Unable to communicate with the region server in order" +
" to assign regions", e);
- return false;
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- LOG.debug("Bulk assigning done for " + destination.toString());
- return true;
- }
-
- /**
- * Callback handler for create unassigned znodes used during bulk assign.
- */
- static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
- private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
- private final RegionStates regionStates;
- private final ZooKeeperWatcher zkw;
- private final ServerName destination;
- private final AtomicInteger counter;
-
- CreateUnassignedAsyncCallback(final RegionStates regionStates,
- final ZooKeeperWatcher zkw, final ServerName destination,
- final AtomicInteger counter) {
- this.regionStates = regionStates;
- this.zkw = zkw;
- this.destination = destination;
- this.counter = counter;
+ return false;
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } finally {
+ for (Lock lock : locks.values()) {
+ lock.unlock();
+ }
}
- @Override
- public void processResult(int rc, String path, Object ctx, String name) {
- if (rc == KeeperException.Code.NODEEXISTS.intValue()) {
- LOG.warn("Node for " + path + " already exists");
- } else if (rc != 0) {
- // This is result code. If non-zero, need to resubmit.
- LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
- "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
- this.zkw.abort("Connectionloss writing unassigned at " + path +
- ", rc=" + rc, null);
- return;
+ if (!failedToOpenRegions.isEmpty()) {
+ for (HRegionInfo region : failedToOpenRegions) {
+ invokeAssign(region);
}
- LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
- // Async exists to set a watcher so we'll get triggered when
- // unassigned node changes.
- this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
- new ExistsUnassignedAsyncCallback(regionStates, counter, destination), ctx);
}
+ LOG.debug("Bulk assigning done for " + destination.toString());
+ return true;
}
/**
- * Callback handler for the exists call that sets watcher on unassigned znodes.
- * Used during bulk assign on startup.
+ * Send CLOSE RPC if the server is online, otherwise, offline the region
*/
- static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
- private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
- private final RegionStates regionStates;
- private final AtomicInteger counter;
- private ServerName destination;
-
- ExistsUnassignedAsyncCallback(final RegionStates regionStates,
- final AtomicInteger counter, ServerName destination) {
- this.regionStates = regionStates;
- this.counter = counter;
- this.destination = destination;
+ private void unassign(final HRegionInfo region,
+ final RegionState state, final int versionOfClosingNode,
+ final ServerName dest, final boolean transitionInZK) {
+ // Send CLOSE RPC
+ ServerName server = state.getServerName();
+ // ClosedRegionhandler can remove the server from this.regions
+ if (!serverManager.isServerOnline(server)) {
+ // delete the node. if no node exists need not bother.
+ deleteClosingOrClosedNode(region);
+ regionOffline(region);
+ return;
}
- @Override
- public void processResult(int rc, String path, Object ctx, Stat stat) {
- if (rc != 0) {
- // This is result code. If non-zero, need to resubmit.
- LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
- "FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
- return;
+ for (int i = 1; i <= this.maximumAttempts; i++) {
+ try {
+ if (serverManager.sendRegionClose(server, region,
+ versionOfClosingNode, dest, transitionInZK)) {
+ LOG.debug("Sent CLOSE to " + server + " for region " +
+ region.getRegionNameAsString());
+ return;
+ }
+ // This never happens. Currently regionserver close always return true.
+ LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
+ region.getRegionNameAsString());
+ } catch (Throwable t) {
+ if (t instanceof RemoteException) {
+ t = ((RemoteException)t).unwrapRemoteException();
+ }
+ if (t instanceof NotServingRegionException) {
+ deleteClosingOrClosedNode(region);
+ regionOffline(region);
+ return;
+ } else if (t instanceof RegionAlreadyInTransitionException) {
+ // RS is already processing this region, only need to update the timestamp
+ LOG.debug("update " + state + " the timestamp.");
+ state.updateTimestampToNow();
+ }
+ LOG.info("Server " + server + " returned " + t + " for "
+ + region.getRegionNameAsString() + ", try=" + i
+ + " of " + this.maximumAttempts, t);
+ // Presume retry or server will expire.
}
- RegionState state = (RegionState)ctx;
- LOG.debug("rs=" + state);
- // Transition RegionState to PENDING_OPEN here in master; means we've
- // sent the open. We're a little ahead of ourselves here since we've not
- // yet sent out the actual open but putting this state change after the
- // call to open risks our writing PENDING_OPEN after state has been moved
- // to OPENING by the regionserver.
- regionStates.updateRegionState(state.getRegion(),
- RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
- destination);
- this.counter.addAndGet(1);
}
}
/**
- * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
- * @param region
- * @return Amended RegionState.
- */
- private RegionState forceRegionStateToOffline(final HRegionInfo region) {
- return forceRegionStateToOffline(region, false);
- }
-
- /**
- * Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
- * @param region
- * @param hijack
- * @return Amended RegionState.
+ * Set region to OFFLINE unless it is opening and forceNewPlan is false.
*/
- private RegionState forceRegionStateToOffline(final HRegionInfo region,
- boolean hijack) {
- String encodedName = region.getEncodedName();
-
- Lock lock = locker.acquireLock(encodedName);
- try {
- RegionState state = regionStates.getRegionTransitionState(encodedName);
- if (state == null) {
- state = regionStates.updateRegionState(
- region, RegionState.State.OFFLINE);
- } else {
- // If we are reassigning the node do not force in-memory state to OFFLINE.
- // Based on the znode state we will decide if to change in-memory state to
- // OFFLINE or not. It will be done before setting znode to OFFLINE state.
-
- // We often get here with state == CLOSED because ClosedRegionHandler will
- // assign on its tail as part of the handling of a region close.
- if (!hijack) {
+ private RegionState forceRegionStateToOffline(
+ final HRegionInfo region, final boolean forceNewPlan) {
+ RegionState state = regionStates.getRegionState(region);
+ if (state == null) {
+ LOG.warn("Assigning a region not in region states: " + region);
+ state = regionStates.createRegionState(region);
+ } else {
+ switch (state.getState()) {
+ case OPEN:
+ case OPENING:
+ case PENDING_OPEN:
+ if (!forceNewPlan) {
+ LOG.debug("Attempting to assign region " +
+ region + " but it is already in transition: " + state);
+ return null;
+ }
+ case CLOSING:
+ case PENDING_CLOSE:
+ unassign(region, state, -1, null, false);
+ case CLOSED:
+ if (!state.isOffline()) {
LOG.debug("Forcing OFFLINE; was=" + state);
state = regionStates.updateRegionState(
region, RegionState.State.OFFLINE);
}
+ case OFFLINE:
+ break;
+ default:
+ LOG.error("Trying to assign region " + region
+ + ", which is in state " + state);
+ return null;
}
- return state;
- } finally {
- lock.unlock();
}
+ return state;
}
/**
@@ -1393,35 +1409,41 @@ public class AssignmentManager extends Z
* @param state
* @param setOfflineInZK
* @param forceNewPlan
- * @param hijack
*/
- private void assign(final HRegionInfo region, final RegionState state,
- final boolean setOfflineInZK, final boolean forceNewPlan,
- boolean hijack) {
- boolean regionAlreadyInTransitionException = false;
- boolean serverNotRunningYet = false;
+ private void assign(RegionState state,
+ final boolean setOfflineInZK, final boolean forceNewPlan) {
RegionState currentState = state;
+ int versionOfOfflineNode = -1;
+ RegionPlan plan = null;
long maxRegionServerStartupWaitTime = -1;
- for (int i = 0; i < this.maximumAssignmentAttempts; i++) {
- int versionOfOfflineNode = -1;
- if (setOfflineInZK) {
+ HRegionInfo region = state.getRegion();
+ for (int i = 1; i <= this.maximumAttempts; i++) {
+ if (plan == null) { // Get a server for the region at first
+ plan = getRegionPlan(region, forceNewPlan);
+ }
+ if (plan == null) {
+ LOG.debug("Unable to determine a plan to assign " + region);
+ this.timeoutMonitor.setAllRegionServersOffline(true);
+ return; // Should get reassigned later when RIT times out.
+ }
+ if (setOfflineInZK && versionOfOfflineNode == -1) {
// get the version of the znode after setting it to OFFLINE.
// versionOfOfflineNode will be -1 if the znode was not set to OFFLINE
- versionOfOfflineNode = setOfflineInZooKeeper(currentState, hijack);
+ versionOfOfflineNode = setOfflineInZooKeeper(currentState, plan.getDestination());
if (versionOfOfflineNode != -1) {
if (isDisabledorDisablingRegionInRIT(region)) {
return;
}
// In case of assignment from EnableTableHandler table state is ENABLING. Any how
// EnableTableHandler will set ENABLED after assigning all the table regions. If we
- // try to set to ENABLED directly then client api may think table is enabled.
+ // try to set to ENABLED directly then client API may think table is enabled.
// When we have a case such as all the regions are added directly into .META. and we call
// assignRegion then we need to make the table ENABLED. Hence in such case the table
// will not be in ENABLING or ENABLED state.
String tableName = region.getTableNameAsString();
if (!zkTable.isEnablingTable(tableName) && !zkTable.isEnabledTable(tableName)) {
LOG.debug("Setting table " + tableName + " to ENABLED state.");
- setEnabledTable(region);
+ setEnabledTable(tableName);
}
}
}
@@ -1429,29 +1451,21 @@ public class AssignmentManager extends Z
return;
}
if (this.server.isStopped()) {
- LOG.debug("Server stopped; skipping assign of " + state);
+ LOG.debug("Server stopped; skipping assign of " + region);
return;
}
- RegionPlan plan = getRegionPlan(state,
- !regionAlreadyInTransitionException && !serverNotRunningYet && forceNewPlan);
- if (plan == null) {
- LOG.debug("Unable to determine a plan to assign " + state);
- this.timeoutMonitor.setAllRegionServersOffline(true);
- return; // Should get reassigned later when RIT times out.
- }
try {
- LOG.info("Assigning region " + state.getRegion().getRegionNameAsString() +
+ LOG.info("Assigning region " + region.getRegionNameAsString() +
" to " + plan.getDestination().toString());
// Transition RegionState to PENDING_OPEN
- currentState = regionStates.updateRegionState(state.getRegion(),
- RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
- plan.getDestination());
+ currentState = regionStates.updateRegionState(region,
+ RegionState.State.PENDING_OPEN, plan.getDestination());
// Send OPEN RPC. This can fail if the server on other end is is not up.
// Pass the version that was obtained while setting the node to OFFLINE.
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
- .getDestination(), state.getRegion(), versionOfOfflineNode);
+ .getDestination(), region, versionOfOfflineNode);
if (regionOpenState == RegionOpeningState.ALREADY_OPENED) {
- processAlreadyOpenedRegion(state.getRegion(), plan.getDestination());
+ processAlreadyOpenedRegion(region, plan.getDestination());
} else if (regionOpenState == RegionOpeningState.FAILED_OPENING) {
// Failed opening this region
throw new Exception("Get regionOpeningState=" + regionOpenState);
@@ -1461,13 +1475,14 @@ public class AssignmentManager extends Z
if (t instanceof RemoteException) {
t = ((RemoteException) t).unwrapRemoteException();
}
- regionAlreadyInTransitionException = false;
- serverNotRunningYet = false;
+ boolean regionAlreadyInTransitionException = false;
+ boolean serverNotRunningYet = false;
+ boolean socketTimedOut = false;
if (t instanceof RegionAlreadyInTransitionException) {
regionAlreadyInTransitionException = true;
if (LOG.isDebugEnabled()) {
LOG.debug("Failed assignment in: " + plan.getDestination() + " due to "
- + t.getMessage());
+ + t.getMessage());
}
} else if (t instanceof ServerNotRunningYetException) {
if (maxRegionServerStartupWaitTime < 0) {
@@ -1488,52 +1503,72 @@ public class AssignmentManager extends Z
}
} catch (InterruptedException ie) {
LOG.warn("Failed to assign "
- + state.getRegion().getRegionNameAsString() + " since interrupted", ie);
+ + region.getRegionNameAsString() + " since interrupted", ie);
Thread.currentThread().interrupt();
return;
}
- }
- if (t instanceof java.net.SocketTimeoutException
+ } else if (t instanceof java.net.SocketTimeoutException
&& this.serverManager.isServerOnline(plan.getDestination())) {
- LOG.warn("Call openRegion() to " + plan.getDestination()
+ // In case socket is timed out and the region server is still online,
+ // the openRegion RPC could have been accepted by the server and
+ // just the response isn't gone through. So we will retry to
+ // open the region on the same server to avoid possible
+ // double assignment.
+ socketTimedOut = true;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Call openRegion() to " + plan.getDestination()
+ " has timed out when trying to assign "
+ region.getRegionNameAsString()
+ ", but the region might already be opened on "
+ plan.getDestination() + ".", t);
- return;
+ }
}
+
LOG.warn("Failed assignment of "
- + state.getRegion().getRegionNameAsString()
+ + region.getRegionNameAsString()
+ " to "
+ plan.getDestination()
+ ", trying to assign "
- + (regionAlreadyInTransitionException || serverNotRunningYet
- ? "to the same region server because of "
- + "RegionAlreadyInTransitionException/ServerNotRunningYetException;"
- : "elsewhere instead; ")
- + "retry=" + i, t);
- // Clean out plan we failed execute and one that doesn't look like it'll
- // succeed anyways; we need a new plan!
- // Transition back to OFFLINE
- currentState = regionStates.updateRegionState(
- state.getRegion(), RegionState.State.OFFLINE);
+ + (regionAlreadyInTransitionException || serverNotRunningYet || socketTimedOut
+ ? "to the same region server because of RegionAlreadyInTransitionException"
+ + "/ServerNotRunningYetException/SocketTimeoutException;"
+ : "elsewhere instead; ")
+ + "try=" + i + " of " + this.maximumAttempts, t);
+
+ if (i == this.maximumAttempts) {
+ // Don't reset the region state or get a new plan any more.
+ // This is the last try.
+ continue;
+ }
+
// If region opened on destination of present plan, reassigning to new
// RS may cause double assignments. In case of RegionAlreadyInTransitionException
// reassigning to same RS.
RegionPlan newPlan = plan;
- if (!regionAlreadyInTransitionException && !serverNotRunningYet) {
+ if (!(regionAlreadyInTransitionException
+ || serverNotRunningYet || socketTimedOut)) {
// Force a new plan and reassign. Will return null if no servers.
// The new plan could be the same as the existing plan since we don't
// exclude the server of the original plan, which should not be
// excluded since it could be the only server up now.
- newPlan = getRegionPlan(state, true);
+ newPlan = getRegionPlan(region, true);
}
if (newPlan == null) {
this.timeoutMonitor.setAllRegionServersOffline(true);
LOG.warn("Unable to find a viable location to assign region " +
- state.getRegion().getRegionNameAsString());
+ region.getRegionNameAsString());
return;
}
+ if (plan != newPlan
+ && !plan.getDestination().equals(newPlan.getDestination())) {
+ // Clean out plan we failed execute and one that doesn't look like it'll
+ // succeed anyways; we need a new plan!
+ // Transition back to OFFLINE
+ currentState = regionStates.updateRegionState(
+ region, RegionState.State.OFFLINE);
+ versionOfOfflineNode = -1;
+ plan = newPlan;
+ }
}
}
}
@@ -1577,44 +1612,22 @@ public class AssignmentManager extends Z
* Set region as OFFLINED up in zookeeper
*
* @param state
- * @param hijack
- * - true if needs to be hijacked and reassigned, false otherwise.
* @return the version of the offline node if setting of the OFFLINE node was
* successful, -1 otherwise.
*/
- int setOfflineInZooKeeper(final RegionState state, boolean hijack) {
- // In case of reassignment the current state in memory need not be
- // OFFLINE.
- if (!hijack && !state.isClosed() && !state.isOffline()) {
+ private int setOfflineInZooKeeper(final RegionState state, final ServerName destination) {
+ if (!state.isClosed() && !state.isOffline()) {
String msg = "Unexpected state : " + state + " .. Cannot transit it to OFFLINE.";
this.server.abort(msg, new IllegalStateException(msg));
return -1;
}
- boolean allowZNodeCreation = false;
- // Under reassignment if the current state is PENDING_OPEN
- // or OPENING then refresh the in-memory state to PENDING_OPEN. This is
- // important because if the region was in
- // RS_OPENING state for a long time the master will try to force the znode
- // to OFFLINE state meanwhile the RS could have opened the corresponding
- // region and the state in znode will be RS_ZK_REGION_OPENED.
- // For all other cases we can change the in-memory state to OFFLINE.
- if (hijack &&
- (state.getState().equals(RegionState.State.PENDING_OPEN) ||
- state.getState().equals(RegionState.State.OPENING))) {
- regionStates.updateRegionState(state.getRegion(),
- RegionState.State.PENDING_OPEN);
- allowZNodeCreation = false;
- } else {
- regionStates.updateRegionState(state.getRegion(),
- RegionState.State.OFFLINE);
- allowZNodeCreation = true;
- }
+ regionStates.updateRegionState(state.getRegion(),
+ RegionState.State.OFFLINE);
int versionOfOfflineNode = -1;
try {
// get the version after setting the znode to OFFLINE
versionOfOfflineNode = ZKAssign.createOrForceNodeOffline(watcher,
- state.getRegion(), this.server.getServerName(),
- hijack, allowZNodeCreation);
+ state.getRegion(), destination);
if (versionOfOfflineNode == -1) {
LOG.warn("Attempted to create/force node into OFFLINE state before "
+ "completing assignment but failed to do so for " + state);
@@ -1628,57 +1641,28 @@ public class AssignmentManager extends Z
}
/**
- * Set region as OFFLINED up in zookeeper asynchronously.
- * @param state
- * @return True if we succeeded, false otherwise (State was incorrect or failed
- * updating zk).
- */
- boolean asyncSetOfflineInZooKeeper(final RegionState state,
- final AsyncCallback.StringCallback cb, final Object ctx) {
- if (!state.isClosed() && !state.isOffline()) {
- this.server.abort("Unexpected state trying to OFFLINE; " + state,
- new IllegalStateException());
- return false;
- }
- regionStates.updateRegionState(
- state.getRegion(), RegionState.State.OFFLINE);
- try {
- ZKAssign.asyncCreateNodeOffline(watcher, state.getRegion(),
- this.server.getServerName(), cb, ctx);
- } catch (KeeperException e) {
- if (e instanceof NodeExistsException) {
- LOG.warn("Node for " + state.getRegion() + " already exists");
- } else {
- server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
- }
- return false;
- }
- return true;
- }
-
- /**
- * @param state
- * @return Plan for passed <code>state</code> (If none currently, it creates one or
+ * @param region the region to assign
+ * @return Plan for passed <code>region</code> (If none currently, it creates one or
* if no servers to assign, it returns null).
*/
- RegionPlan getRegionPlan(final RegionState state,
+ private RegionPlan getRegionPlan(final HRegionInfo region,
final boolean forceNewPlan) {
- return getRegionPlan(state, null, forceNewPlan);
+ return getRegionPlan(region, null, forceNewPlan);
}
/**
- * @param state
+ * @param region the region to assign
* @param serverToExclude Server to exclude (we know its bad). Pass null if
* all servers are thought to be assignable.
* @param forceNewPlan If true, then if an existing plan exists, a new plan
* will be generated.
- * @return Plan for passed <code>state</code> (If none currently, it creates one or
+ * @return Plan for passed <code>region</code> (If none currently, it creates one or
* if no servers to assign, it returns null).
*/
- RegionPlan getRegionPlan(final RegionState state,
+ private RegionPlan getRegionPlan(final HRegionInfo region,
final ServerName serverToExclude, final boolean forceNewPlan) {
// Pickup existing plan or make a new one
- final String encodedName = state.getRegion().getEncodedName();
+ final String encodedName = region.getEncodedName();
final List<ServerName> destServers =
serverManager.createDestinationServersList(serverToExclude);
@@ -1696,9 +1680,8 @@ public class AssignmentManager extends Z
existingPlan = this.regionPlans.get(encodedName);
if (existingPlan != null && existingPlan.getDestination() != null) {
- LOG.debug("Found an existing plan for " +
- state.getRegion().getRegionNameAsString() +
- " destination server is " + existingPlan.getDestination().toString());
+ LOG.debug("Found an existing plan for " + region.getRegionNameAsString()
+ + " destination server is " + existingPlan.getDestination());
}
if (forceNewPlan
@@ -1706,15 +1689,15 @@ public class AssignmentManager extends Z
|| existingPlan.getDestination() == null
|| !destServers.contains(existingPlan.getDestination())) {
newPlan = true;
- randomPlan = new RegionPlan(state.getRegion(), null,
- balancer.randomAssignment(state.getRegion(), destServers));
+ randomPlan = new RegionPlan(region, null,
+ balancer.randomAssignment(region, destServers));
this.regionPlans.put(encodedName, randomPlan);
}
}
if (newPlan) {
LOG.debug("No previous transition plan was found (or we are ignoring " +
- "an existing plan) for " + state.getRegion().getRegionNameAsString() +
+ "an existing plan) for " + region.getRegionNameAsString() +
" so generated a random one; " + randomPlan + "; " +
serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() +
@@ -1722,8 +1705,8 @@ public class AssignmentManager extends Z
return randomPlan;
}
LOG.debug("Using pre-existing plan for region " +
- state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
- return existingPlan;
+ region.getRegionNameAsString() + "; plan=" + existingPlan);
+ return existingPlan;
}
/**
@@ -1794,13 +1777,6 @@ public class AssignmentManager extends Z
LOG.debug("Starting unassignment of region " +
region.getRegionNameAsString() + " (offlining)");
- // Check if this region is currently assigned
- if (!regionStates.isRegionAssigned(region)) {
- LOG.debug("Attempted to unassign region " +
- region.getRegionNameAsString() + " but it is not " +
- "currently assigned anywhere");
- return;
- }
String encodedName = region.getEncodedName();
// Grab the state of this region and synchronize on it
int versionOfClosingNode = -1;
@@ -1812,8 +1788,15 @@ public class AssignmentManager extends Z
if (state == null) {
// Create the znode in CLOSING state
try {
+ state = regionStates.getRegionState(region);
+ if (state == null || state.getServerName() == null) {
+ // We don't know where the region is, offline it.
+ // No need to send CLOSE RPC
+ regionOffline(region);
+ return;
+ }
versionOfClosingNode = ZKAssign.createNodeClosing(
- watcher, region, server.getServerName());
+ watcher, region, state.getServerName());
if (versionOfClosingNode == -1) {
LOG.debug("Attempting to unassign region " +
region.getRegionNameAsString() + " but ZK closing node "
@@ -1862,57 +1845,11 @@ public class AssignmentManager extends Z
"already in transition (" + state.getState() + ", force=" + force + ")");
return;
}
+
+ unassign(region, state, versionOfClosingNode, dest, true);
} finally {
lock.unlock();
}
-
- // Send CLOSE RPC
- ServerName server = state.getServerName();
- // ClosedRegionhandler can remove the server from this.regions
- if (server == null) {
- // delete the node. if no node exists need not bother.
- deleteClosingOrClosedNode(region);
- return;
- }
-
- try {
- // TODO: We should consider making this look more like it does for the
- // region open where we catch all throwables and never abort
- if (serverManager.sendRegionClose(server, state.getRegion(),
- versionOfClosingNode, dest)) {
- LOG.debug("Sent CLOSE to " + server + " for region " +
- region.getRegionNameAsString());
- return;
- }
- // This never happens. Currently regionserver close always return true.
- LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
- region.getRegionNameAsString());
- } catch (Throwable t) {
- if (t instanceof RemoteException) {
- t = ((RemoteException)t).unwrapRemoteException();
- }
- if (t instanceof NotServingRegionException) {
- // Presume that master has stale data. Presume remote side just split.
- // Presume that the split message when it comes in will fix up the master's
- // in memory cluster state.
- if (getZKTable().isDisablingTable(region.getTableNameAsString())) {
- // Remove from the regionsinTransition map
- LOG.info("While trying to recover the table "
- + region.getTableNameAsString()
- + " to DISABLED state the region " + region
- + " was offlined but the table was in DISABLING state");
- regionStates.regionOffline(region);
- deleteClosingOrClosedNode(region);
- }
- } else if (t instanceof RegionAlreadyInTransitionException) {
- // RS is already processing this region, only need to update the timestamp
- LOG.debug("update " + state + " the timestamp.");
- state.updateTimestampToNow();
- }
- LOG.info("Server " + server + " returned " + t + " for " +
- region.getRegionNameAsString(), t);
- // Presume retry or server will expire.
- }
}
public void unassign(HRegionInfo region, boolean force){
@@ -1954,7 +1891,7 @@ public class AssignmentManager extends Z
* @throws DeserializationException
*/
private boolean isSplitOrSplitting(final String path)
- throws KeeperException, DeserializationException {
+ throws KeeperException, DeserializationException {
boolean result = false;
// This may fail if the SPLIT or SPLITTING znode gets cleaned up before we
// can get data from it.
@@ -1981,7 +1918,7 @@ public class AssignmentManager extends Z
* @throws InterruptedException
*/
public void waitForAssignment(HRegionInfo regionInfo)
- throws InterruptedException {
+ throws InterruptedException {
while(!this.server.isStopped() &&
!regionStates.isRegionAssigned(regionInfo)) {
// We should receive a notification, but it's
@@ -2020,6 +1957,35 @@ public class AssignmentManager extends Z
}
/**
+ * Assigns specified regions retaining assignments, if any.
+ * <p>
+ * This is a synchronous call and will return once every region has been
+ * assigned. If anything fails, an exception is thrown
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public void assign(Map<HRegionInfo, ServerName> regions)
+ throws IOException, InterruptedException {
+ if (regions == null || regions.isEmpty()) {
+ return;
+ }
+ List<ServerName> servers = serverManager.createDestinationServersList();
+ if (servers == null || servers.isEmpty()) {
+ throw new IOException("Found no destination server to assign region(s)");
+ }
+
+ // Reuse existing assignment info
+ Map<ServerName, List<HRegionInfo>> bulkPlan =
+ balancer.retainAssignment(regions, servers);
+
+ LOG.info("Bulk assigning " + regions.size() + " region(s) across " +
+ servers.size() + " server(s), retainAssignment=true");
+ BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
+ ba.bulkAssign();
+ LOG.info("Bulk assigning done");
+ }
+
+ /**
* Assigns specified regions round robin, if any.
* <p>
* This is a synchronous call and will return once every region has been
@@ -2050,18 +2016,6 @@ public class AssignmentManager extends Z
LOG.info("Bulk assigning done");
}
- // TODO: This method seems way wrong. Why would we mark a table enabled based
- // off a single region? We seem to call this on bulk assign on startup which
- // isn't too bad but then its also called in assign. It makes the enabled
- // flag up in zk meaningless. St.Ack
- private void setEnabledTable(HRegionInfo hri) {
- String tableName = hri.getTableNameAsString();
- boolean isTableEnabled = this.zkTable.isEnabledTable(tableName);
- if (!isTableEnabled) {
- setEnabledTable(tableName);
- }
- }
-
/**
* Assigns all user regions, if any exist. Used during cluster startup.
* <p>
@@ -2095,27 +2049,17 @@ public class AssignmentManager extends Z
getBoolean("hbase.master.startup.retainassign", true);
if (retainAssignment) {
- List<ServerName> servers = serverManager.createDestinationServersList();
- if (servers == null || servers.isEmpty()) {
- throw new IOException("Found no destination server to assign region(s)");
- }
-
- // Reuse existing assignment info
- Map<ServerName, List<HRegionInfo>> bulkPlan =
- balancer.retainAssignment(allRegions, servers);
-
- LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
- servers.size() + " server(s), retainAssignment=true");
- BulkAssigner ba = new GeneralBulkAssigner(this.server, bulkPlan, this);
- ba.bulkAssign();
- LOG.info("Bulk assigning done");
+ assign(allRegions);
} else {
List<HRegionInfo> regions = new ArrayList<HRegionInfo>(allRegions.keySet());
assign(regions);
}
for (HRegionInfo hri : allRegions.keySet()) {
- setEnabledTable(hri);
+ String tableName = hri.getTableNameAsString();
+ if (!zkTable.isEnabledTable(tableName)) {
+ setEnabledTable(tableName);
+ }
}
}
@@ -2126,7 +2070,7 @@ public class AssignmentManager extends Z
* @throws InterruptedException
*/
boolean waitUntilNoRegionsInTransition(final long timeout)
- throws InterruptedException {
+ throws InterruptedException {
// Blocks until there are no regions in transition. It is possible that
// there
// are regions in transition immediately after this returns but guarantees
@@ -2301,7 +2245,7 @@ public class AssignmentManager extends Z
*/
private void processDeadServersAndRecoverLostRegions(
Map<ServerName, List<HRegionInfo>> deadServers, List<String> nodes)
- throws IOException, KeeperException {
+ throws IOException, KeeperException {
if (deadServers != null) {
for (Map.Entry<ServerName, List<HRegionInfo>> server: deadServers.entrySet()) {
ServerName serverName = server.getKey();
@@ -2314,7 +2258,7 @@ public class AssignmentManager extends Z
this.watcher, this.watcher.assignmentZNode);
if (!nodes.isEmpty()) {
for (String encodedRegionName : nodes) {
- processRegionInTransition(encodedRegionName, null, deadServers);
+ processRegionInTransition(encodedRegionName, null);
}
}
@@ -2419,10 +2363,9 @@ public class AssignmentManager extends Z
* Monitor to check for time outs on region transition operations
*/
public class TimeoutMonitor extends Chore {
- private final int timeout;
- private boolean bulkAssign = false;
private boolean allRegionServersOffline = false;
private ServerManager serverManager;
+ private final int timeout;
/**
* Creates a periodic monitor to check for time outs on region transition
@@ -2441,17 +2384,6 @@ public class AssignmentManager extends Z
this.serverManager = serverManager;
}
- /**
- * @param bulkAssign If true, we'll suspend checking regions in transition
- * up in zookeeper. If false, will reenable check.
- * @return Old setting for bulkAssign.
- */
- public boolean bulkAssign(final boolean bulkAssign) {
- boolean result = this.bulkAssign;
- this.bulkAssign = bulkAssign;
- return result;
- }
-
private synchronized void setAllRegionServersOffline(
boolean allRegionServersOffline) {
this.allRegionServersOffline = allRegionServersOffline;
@@ -2459,21 +2391,21 @@ public class AssignmentManager extends Z
@Override
protected void chore() {
- // If bulkAssign in progress, suspend checks
- if (this.bulkAssign) return;
boolean noRSAvailable = this.serverManager.createDestinationServersList().isEmpty();
// Iterate all regions in transition checking for time outs
long now = System.currentTimeMillis();
// no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
// a copy while another thread is adding/removing items
- for (RegionState regionState : regionStates.getRegionsInTransition().values()) {
+ for (String regionName : regionStates.getRegionsInTransition().keySet()) {
+ RegionState regionState = regionStates.getRegionTransitionState(regionName);
+ if (regionState == null) continue;
+
if (regionState.getStamp() + timeout <= now) {
// decide on action upon timeout
actOnTimeOut(regionState);
} else if (this.allRegionServersOffline && !noRSAvailable) {
- RegionPlan existingPlan = regionPlans.get(regionState.getRegion()
- .getEncodedName());
+ RegionPlan existingPlan = regionPlans.get(regionName);
if (existingPlan == null
|| !this.serverManager.isServerOnline(existingPlan
.getDestination())) {
@@ -2541,7 +2473,7 @@ public class AssignmentManager extends Z
}
private void processOpeningState(HRegionInfo regionInfo) {
- LOG.info("Region has been OPENING for too " + "long, reassigning region="
+ LOG.info("Region has been OPENING for too long, reassigning region="
+ regionInfo.getRegionNameAsString());
// Should have a ZK node in OPENING state
try {
@@ -2573,7 +2505,7 @@ public class AssignmentManager extends Z
return;
}
- private void invokeAssign(HRegionInfo regionInfo) {
+ void invokeAssign(HRegionInfo regionInfo) {
threadPoolExecutorService.submit(new AssignCallable(this, regionInfo));
}
@@ -2581,11 +2513,11 @@ public class AssignmentManager extends Z
threadPoolExecutorService.submit(new UnAssignCallable(this, regionInfo));
}
- public boolean isCarryingRoot(ServerName serverName) {
+ boolean isCarryingRoot(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.ROOT_REGIONINFO);
}
- public boolean isCarryingMeta(ServerName serverName) {
+ boolean isCarryingMeta(ServerName serverName) {
return isCarryingRegion(serverName, HRegionInfo.FIRST_META_REGIONINFO);
}
@@ -2599,7 +2531,7 @@ public class AssignmentManager extends Z
* processing hasn't finished yet when server shutdown occurs.
* @return whether the serverName currently hosts the region
*/
- public boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
+ private boolean isCarryingRegion(ServerName serverName, HRegionInfo hri) {
RegionTransition rt = null;
try {
byte [] data = ZKAssign.getData(watcher, hri.getEncodedName());
@@ -2713,4 +2645,34 @@ public class AssignmentManager extends Z
this.server.abort(errorMsg, e);
}
}
+
+ /**
+ * Set region as OFFLINED up in zookeeper asynchronously.
+ * @param state
+ * @return True if we succeeded, false otherwise (State was incorrect or failed
+ * updating zk).
+ */
+ private boolean asyncSetOfflineInZooKeeper(final RegionState state,
+ final ZooKeeperWatcher zkw, final AsyncCallback.StringCallback cb,
+ final ServerName destination) {
+ if (!state.isClosed() && !state.isOffline()) {
+ this.server.abort("Unexpected state trying to OFFLINE; " + state,
+ new IllegalStateException());
+ return false;
+ }
+ regionStates.updateRegionState(
+ state.getRegion(), RegionState.State.OFFLINE);
+ try {
+ ZKAssign.asyncCreateNodeOffline(zkw, state.getRegion(),
+ destination, cb, state);
+ } catch (KeeperException e) {
+ if (e instanceof NodeExistsException) {
+ LOG.warn("Node for " + state.getRegion() + " already exists");
+ } else {
+ server.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
+ }
+ return false;
+ }
+ return true;
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Sat Oct 20 03:57:49 2012
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.master;
-import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
@@ -61,26 +60,13 @@ public class GeneralBulkAssigner extends
}
@Override
- public boolean bulkAssign(boolean sync) throws InterruptedException,
- IOException {
- // Disable timing out regions in transition up in zk while bulk assigning.
- this.assignmentManager.timeoutMonitor.bulkAssign(true);
- try {
- return super.bulkAssign(sync);
- } finally {
- // Re-enable timing out regions in transition up in zk.
- this.assignmentManager.timeoutMonitor.bulkAssign(false);
- }
- }
-
- @Override
protected String getThreadNamePrefix() {
return this.server.getServerName() + "-GeneralBulkAssigner";
}
@Override
protected void populatePool(ExecutorService pool) {
- this.pool = pool; // shut it down later in case some assigner hangs
+ this.pool = pool; // shut it down later in case some assigner hangs
for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
this.assignmentManager, this.failedPlans));
@@ -204,7 +190,7 @@ public class GeneralBulkAssigner extends
reassigningRegions.addAll(failedPlans.remove(e.getKey()));
}
for (HRegionInfo region : reassigningRegions) {
- assignmentManager.assign(region, true, true);
+ assignmentManager.invokeAssign(region);
}
return reassigningRegions.size();
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1400358&r1=1400357&r2=1400358&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Sat Oct 20 03:57:49 2012
@@ -673,6 +673,7 @@ Server {
if (!masterRecovery) {
this.assignmentManager.startTimeOutMonitor();
}
+
// TODO: Should do this in background rather than block master startup
status.setStatus("Splitting logs after master startup");
splitLogAfterStartup(this.fileSystemManager);
@@ -2136,12 +2137,12 @@ Server {
return arr;
}
}
- assignRegion(regionInfo);
- if (cpHost != null) {
- cpHost.postAssign(regionInfo);
- }
+ assignmentManager.assign(regionInfo, true, true);
+ if (cpHost != null) {
+ cpHost.postAssign(regionInfo);
+ }
- return arr;
+ return arr;
} catch (IOException ioe) {
throw new ServiceException(ioe);
}