You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/07/27 18:34:34 UTC
svn commit: r1366438 [1/3] - in /hbase/trunk/hbase-server/src:
main/jamon/org/apache/hadoop/hbase/tmpl/master/
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/h...
Author: jxiang
Date: Fri Jul 27 16:34:32 2012
New Revision: 1366438
URL: http://svn.apache.org/viewvc?rev=1366438&view=rev
Log:
HBASE-6272 In-memory region state is inconsistent
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionState.java (with props)
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (with props)
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java (with props)
Removed:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/NotifiableConcurrentSkipListMap.java
Modified:
hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MXBeanImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterDumpServlet.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsckRepair.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestDrainingServer.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestRegionRebalancing.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/Mocking.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMXBean.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterStatusServlet.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestOpenedRegionHandler.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
Modified: hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon (original)
+++ hbase/trunk/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon Fri Jul 27 16:34:32 2012
@@ -20,7 +20,7 @@ limitations under the License.
<%import>
org.apache.hadoop.hbase.HRegionInfo;
org.apache.hadoop.hbase.master.AssignmentManager;
-org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
+org.apache.hadoop.hbase.master.RegionState;
org.apache.hadoop.conf.Configuration;
org.apache.hadoop.hbase.HBaseConfiguration;
org.apache.hadoop.hbase.HConstants;
@@ -32,7 +32,8 @@ AssignmentManager assignmentManager;
int limit = 100;
</%args>
<%java>
-Map<String, RegionState> rit = assignmentManager.copyRegionsInTransition();
+Map<String, RegionState> rit = assignmentManager
+ .getRegionStates().getRegionsInTransition();
// process the map to find region in transition details
Configuration conf = HBaseConfiguration.create();
int ritThreshold = conf.getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ClusterStatus.java Fri Jul 27 16:34:32 2012
@@ -20,19 +20,16 @@
package org.apache.hadoop.hbase;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.HashSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.LiveServerInfo;
@@ -41,13 +38,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.RegionLoad;
-import org.apache.hadoop.hbase.master.AssignmentManager.RegionState;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.VersionedWritable;
import com.google.protobuf.ByteString;
@@ -83,7 +74,6 @@ public class ClusterStatus extends Versi
* <dt>3</dt> <dd>Added master and backupMasters</dd>
* </dl>
*/
- private static final byte VERSION_MASTER_BACKUPMASTERS = 2;
private static final byte VERSION = 2;
private String hbaseVersion;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/UnknownRegionException.java Fri Jul 27 16:34:32 2012
@@ -19,8 +19,6 @@
*/
package org.apache.hadoop.hbase;
-import java.io.IOException;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -29,7 +27,7 @@ import org.apache.hadoop.classification.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
-public class UnknownRegionException extends IOException {
+public class UnknownRegionException extends RegionException {
private static final long serialVersionUID = 1968858760475205392L;
public UnknownRegionException(String regionName) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Fri Jul 27 16:34:32 2012
@@ -1178,6 +1178,16 @@ public class HBaseAdmin implements Abort
}
/**
+ * Get all the online regions on a region server.
+ */
+ public List<HRegionInfo> getOnlineRegions(
+ final ServerName sn) throws IOException {
+ AdminProtocol admin =
+ this.connection.getAdmin(sn.getHostname(), sn.getPort());
+ return ProtobufUtil.getOnlineRegions(admin);
+ }
+
+ /**
* Flush a table or an individual region.
* Asynchronous operation.
*
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Fri Jul 27 16:34:32 2012
@@ -19,14 +19,11 @@
*/
package org.apache.hadoop.hbase.master;
-import java.io.DataInput;
-import java.io.DataOutput;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -34,13 +31,10 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
-import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -65,7 +59,6 @@ import org.apache.hadoop.hbase.executor.
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
-import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
@@ -73,11 +66,9 @@ import org.apache.hadoop.hbase.master.ha
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
-import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
-import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.KeyLocker;
import org.apache.hadoop.hbase.util.Pair;
@@ -125,6 +116,11 @@ public class AssignmentManager extends Z
final private KeyLocker<String> locker = new KeyLocker<String>();
/**
+ * Used for assignment only. TODO: revisit the assign lock scheme
+ */
+ final private KeyLocker<String> assignLocker = new KeyLocker<String>();
+
+ /**
* Map of regions to reopen after the schema of a table is changed. Key -
* encoded region name, value - HRegionInfo
*/
@@ -135,13 +131,6 @@ public class AssignmentManager extends Z
*/
private final int maximumAssignmentAttempts;
- /**
- * Regions currently in transition. Map of encoded region names to the master
- * in-memory state for that region.
- */
- final NotifiableConcurrentSkipListMap<String, RegionState> regionsInTransition =
- new NotifiableConcurrentSkipListMap<String, RegionState>();
-
/** Plans for region movement. Key is the encoded version of a region name*/
// TODO: When do plans get cleaned out? Ever? In server open and in server
// shutdown processing -- St.Ack
@@ -157,32 +146,12 @@ public class AssignmentManager extends Z
Set<String> enablingTables = new HashSet<String>();
/**
- * Server to regions assignment map.
- * Contains the set of regions currently assigned to a given server.
- * This Map and {@link #regions} are tied. Always update this in tandem
- * with the other under a lock on {@link #regions}.
- * @see #regions
- */
- private final NavigableMap<ServerName, Set<HRegionInfo>> servers =
- new TreeMap<ServerName, Set<HRegionInfo>>();
-
- /**
* Contains the server which need to update timer, these servers will be
* handled by {@link TimerUpdater}
*/
private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer =
new ConcurrentSkipListSet<ServerName>();
- /**
- * Region to server assignment map.
- * Contains the server a given region is currently assigned to.
- * This Map and {@link #servers} are tied. Always update this in tandem
- * with the other under a lock on {@link #regions}.
- * @see #servers
- */
- private final SortedMap<HRegionInfo, ServerName> regions =
- new TreeMap<HRegionInfo, ServerName>();
-
private final ExecutorService executorService;
//Thread pool executor service for timeout monitor
@@ -205,6 +174,8 @@ public class AssignmentManager extends Z
// metrics instance to send metrics for RITs
MasterMetrics masterMetrics;
+ private final RegionStates regionStates;
+
/**
* Constructs a new assignment manager.
*
@@ -217,8 +188,7 @@ public class AssignmentManager extends Z
*/
public AssignmentManager(Server master, ServerManager serverManager,
CatalogTracker catalogTracker, final LoadBalancer balancer,
- final ExecutorService service, MasterMetrics metrics)
- throws KeeperException, IOException {
+ final ExecutorService service, MasterMetrics metrics) throws KeeperException, IOException {
super(master.getZooKeeper());
this.master = master;
this.serverManager = serverManager;
@@ -241,6 +211,7 @@ public class AssignmentManager extends Z
this.balancer = balancer;
this.threadPoolExecutorService = Executors.newCachedThreadPool();
this.masterMetrics = metrics;// can be null only with tests.
+ this.regionStates = new RegionStates(master, serverManager);
}
void startTimeOutMonitor() {
@@ -249,26 +220,6 @@ public class AssignmentManager extends Z
}
/**
- * Compute the average load across all region servers.
- * Currently, this uses a very naive computation - just uses the number of
- * regions being served, ignoring stats about number of requests.
- * @return the average load
- */
- double getAverageLoad() {
- int totalLoad = 0;
- int numServers = 0;
- // Sync on this.regions because access to this.servers always synchronizes
- // in this order.
- synchronized (this.regions) {
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: servers.entrySet()) {
- numServers++;
- totalLoad += e.getValue().size();
- }
- }
- return (double)totalLoad / (double)numServers;
- }
-
- /**
* @return Instance of ZKTable.
*/
public ZKTable getZKTable() {
@@ -276,17 +227,19 @@ public class AssignmentManager extends Z
// sharing.
return this.zkTable;
}
+
/**
- * Returns the RegionServer to which hri is assigned.
+ * This SHOULD not be public. It is public now
+ * because of some unit tests.
*
- * @param hri
- * HRegion for which this function returns the region server
- * @return HServerInfo The region server to which hri belongs
+ * TODO: make it package private and keep RegionStates in the master package
*/
- public ServerName getRegionServerOfRegion(HRegionInfo hri) {
- synchronized (this.regions ) {
- return regions.get(hri);
- }
+ public RegionStates getRegionStates() {
+ return regionStates;
+ }
+
+ public RegionPlan getRegionReopenPlan(HRegionInfo hri) {
+ return new RegionPlan(hri, null, regionStates.getRegionServerOfRegion(hri));
}
/**
@@ -337,7 +290,8 @@ public class AssignmentManager extends Z
for (HRegionInfo hri : hris) {
String name = hri.getEncodedName();
// no lock concurrent access ok: sequential consistency respected.
- if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
+ if (regionsToReopen.containsKey(name)
+ || regionStates.isRegionInTransition(name)) {
pending++;
}
}
@@ -389,19 +343,6 @@ public class AssignmentManager extends Z
}
/**
- * Process all regions that are in transition up in zookeeper. Used by
- * master joining an already running cluster.
- * @throws KeeperException
- * @throws IOException
- * @throws InterruptedException
- */
- void processDeadServersAndRegionsInTransition()
- throws KeeperException, IOException, InterruptedException {
- // Pass null to signify no dead servers in this context.
- processDeadServersAndRegionsInTransition(null);
- }
-
- /**
* Process all regions that are in transition in zookeeper and also
* processes the list of dead servers by scanning the META.
* Used by master joining an cluster. If we figure this is a clean cluster
@@ -417,35 +358,34 @@ public class AssignmentManager extends Z
throws KeeperException, IOException, InterruptedException {
List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.assignmentZNode);
-
+
if (nodes == null) {
String errorMessage = "Failed to get the children from ZK";
master.abort(errorMessage, new IOException(errorMessage));
return;
}
-
+
// Run through all regions. If they are not assigned and not in RIT, then
// its a clean cluster startup, else its a failover.
- synchronized (this.regions) {
- for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
- if (!e.getKey().isMetaTable() && e.getValue() != null) {
- LOG.debug("Found " + e + " out on cluster");
- this.failover = true;
- break;
- }
- if (nodes.contains(e.getKey().getEncodedName())) {
- LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
- // Could be a meta region.
- this.failover = true;
- break;
- }
+ Map<HRegionInfo, ServerName> regions = regionStates.getRegionAssignments();
+ for (Map.Entry<HRegionInfo, ServerName> e: regions.entrySet()) {
+ if (!e.getKey().isMetaTable() && e.getValue() != null) {
+ LOG.debug("Found " + e + " out on cluster");
+ this.failover = true;
+ break;
+ }
+ if (nodes.contains(e.getKey().getEncodedName())) {
+ LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
+ // Could be a meta region.
+ this.failover = true;
+ break;
}
}
// Remove regions in RIT, they are possibly being processed by
// ServerShutdownHandler.
// no lock concurrent access ok: some threads may be adding/removing items but its java-valid
- nodes.removeAll(regionsInTransition.keySet());
+ nodes.removeAll(regionStates.getRegionsInTransition().keySet());
// If some dead servers are processed by ServerShutdownHandler, we shouldn't
// assign all user regions( some would be assigned by
@@ -488,11 +428,10 @@ public class AssignmentManager extends Z
if (!intransistion) return intransistion;
LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
while (!this.master.isStopped() &&
- // no lock concurrent access ok: sequentially consistent
- this.regionsInTransition.containsKey(hri.getEncodedName())) {
+ this.regionStates.isRegionInTransition(hri.getEncodedName())) {
// We put a timeout because we may have the region getting in just between the test
// and the waitForUpdate
- this.regionsInTransition.waitForUpdate(100);
+ this.regionStates.waitForUpdate(100);
}
return intransistion;
}
@@ -543,8 +482,8 @@ public class AssignmentManager extends Z
// is that we don't have two threads working on the same region.
Lock lock = locker.acquireLock(encodedRegionName);
try {
- RegionState regionState = regionsInTransition.get(encodedRegionName);
- if (regionState != null || failoverProcessedRegions.containsKey(encodedRegionName)) {
+ if (regionStates.isRegionInTransition(encodedRegionName)
+ || failoverProcessedRegions.containsKey(encodedRegionName)) {
// Just return
return;
}
@@ -552,15 +491,14 @@ public class AssignmentManager extends Z
case M_ZK_REGION_CLOSING:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
- if (isOnDeadServer(regionInfo, deadServers) && (sn == null || !isServerOnline(sn))) {
+ if (isOnDeadServer(regionInfo, deadServers) && !isServerOnline(sn)) {
// If was on dead server, its closed now. Force to OFFLINE and this
// will get it reassigned if appropriate
forceOffline(regionInfo, rt);
} else {
// Just insert region into RIT.
// If this never updates the timeout will trigger new assignment
- regionsInTransition.put(encodedRegionName,
- getRegionState(regionInfo, RegionState.State.CLOSING, rt));
+ regionStates.updateRegionState(rt, RegionState.State.CLOSING);
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
@@ -583,8 +521,7 @@ public class AssignmentManager extends Z
// RPC is not yet sent
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, rt);
} else {
- regionsInTransition.put(encodedRegionName,
- getRegionState(regionInfo, RegionState.State.PENDING_OPEN, rt));
+ regionStates.updateRegionState(rt, RegionState.State.PENDING_OPEN);
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
@@ -595,8 +532,7 @@ public class AssignmentManager extends Z
// Just insert region into RIT
// If this never updates the timeout will trigger new assignment
if (regionInfo.isMetaTable()) {
- regionsInTransition.put(encodedRegionName,
- getRegionState(regionInfo, RegionState.State.OPENING, rt));
+ regionStates.updateRegionState(rt, RegionState.State.OPENING);
// If ROOT or .META. table is waiting for timeout monitor to assign
// it may take lot of time when the assignment.timeout.period is
// the default value which may be very long. We will not be able
@@ -609,16 +545,14 @@ public class AssignmentManager extends Z
// it to a new RS. (HBASE-5882)
processOpeningState(regionInfo);
break;
- }
- regionsInTransition.put(encodedRegionName,
- getRegionState(regionInfo, RegionState.State.OPENING, rt));
+ }
+ regionStates.updateRegionState(rt, RegionState.State.OPENING);
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case RS_ZK_REGION_OPENED:
// Region is opened, insert into RIT and handle it
- regionsInTransition.put(encodedRegionName,
- getRegionState(regionInfo, RegionState.State.OPEN, rt));
+ regionStates.updateRegionState(rt, RegionState.State.OPEN);
// sn could be null if this server is no longer online. If
// that is the case, just let this RIT timeout; it'll be assigned
// to new server then.
@@ -679,25 +613,11 @@ public class AssignmentManager extends Z
*/
private void addToRITandCallClose(final HRegionInfo hri,
final RegionState.State state, final RegionTransition oldData) {
- // No lock concurrency: adding a synchronized here would not prevent to have two
- // entries as we don't check if the region is already there. This must be ensured by the
- // method callers.
- this.regionsInTransition.put(hri.getEncodedName(), getRegionState(hri, state, oldData));
+ regionStates.updateRegionState(oldData, state);
new ClosedRegionHandler(this.master, this, hri).process();
}
/**
- * @param hri
- * @param state
- * @param rt
- * @return A new {@link RegionState} instance made of the passed arguments
- */
- RegionState getRegionState(final HRegionInfo hri, final RegionState.State state,
- final RegionTransition rt) {
- return new RegionState(hri, state, rt.getCreateTime(), rt.getServerName());
- }
-
- /**
* When a region is closed, it should be removed from the regionsToReopen
* @param hri HRegionInfo of the region which was closed
*/
@@ -769,12 +689,12 @@ public class AssignmentManager extends Z
try {
// Printing if the event was created a long time ago helps debugging
boolean lateEvent = createTime < (System.currentTimeMillis() - 15000);
- RegionState regionState = regionsInTransition.get(encodedName);
+ RegionState regionState = regionStates.getRegionTransitionState(encodedName);
LOG.debug("Handling transition=" + rt.getEventType() +
", server=" + sn + ", region=" +
(prettyPrintedRegionName == null ? "null" : prettyPrintedRegionName) +
(lateEvent ? ", which is more than 15 seconds late" : "") +
- ", current state from RIT=" + regionState);
+ ", current state from region state map =" + regionState);
switch (rt.getEventType()) {
case M_ZK_REGION_OFFLINE:
// Nothing to do.
@@ -782,7 +702,7 @@ public class AssignmentManager extends Z
case RS_ZK_REGION_SPLITTING:
if (!isInStateForSplitting(regionState)) break;
- addSplittingToRIT(sn, encodedName);
+ regionStates.updateRegionState(rt, RegionState.State.SPLITTING);
break;
case RS_ZK_REGION_SPLIT:
@@ -790,7 +710,9 @@ public class AssignmentManager extends Z
if (!isInStateForSplitting(regionState)) break;
// If null, add SPLITTING state before going to SPLIT
if (regionState == null) {
- regionState = addSplittingToRIT(sn, encodedName);
+ regionState = regionStates.updateRegionState(rt,
+ RegionState.State.SPLITTING);
+
String message = "Received SPLIT for region " + prettyPrintedRegionName +
" from server " + sn;
// If still null, it means we cannot find it and it was already processed
@@ -826,8 +748,8 @@ public class AssignmentManager extends Z
case M_ZK_REGION_CLOSING:
hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.CLOSING, createTime, sn);
- regionsInTransition.put(encodedName, regionState);
+ regionState = regionStates.updateRegionState(
+ hri, RegionState.State.CLOSING, createTime, sn);
failoverProcessedRegions.put(encodedName, hri);
break;
}
@@ -842,14 +764,14 @@ public class AssignmentManager extends Z
return;
}
// Transition to CLOSING (or update stamp if already CLOSING)
- regionState.update(RegionState.State.CLOSING, createTime, sn);
+ regionStates.updateRegionState(rt, RegionState.State.CLOSING);
break;
case RS_ZK_REGION_CLOSED:
hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn);
- regionsInTransition.put(encodedName, regionState);
+ regionState = regionStates.updateRegionState(
+ hri, RegionState.State.CLOSED, createTime, sn);
removeClosedRegion(regionState.getRegion());
new ClosedRegionHandler(master, this, regionState.getRegion())
.process();
@@ -868,7 +790,7 @@ public class AssignmentManager extends Z
// Handle CLOSED by assigning elsewhere or stopping if a disable
// If we got here all is good. Need to update RegionState -- else
// what follows will fail because not in expected state.
- regionState.update(RegionState.State.CLOSED, createTime, sn);
+ regionStates.updateRegionState(rt, RegionState.State.CLOSED);
removeClosedRegion(regionState.getRegion());
this.executorService.submit(new ClosedRegionHandler(master,
this, regionState.getRegion()));
@@ -877,8 +799,8 @@ public class AssignmentManager extends Z
case RS_ZK_REGION_FAILED_OPEN:
hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.CLOSED, createTime, sn);
- regionsInTransition.put(encodedName, regionState);
+ regionState = regionStates.updateRegionState(
+ hri, RegionState.State.CLOSED, createTime, sn);
new ClosedRegionHandler(master, this, regionState.getRegion())
.process();
failoverProcessedRegions.put(encodedName, hri);
@@ -892,8 +814,8 @@ public class AssignmentManager extends Z
return;
}
// Handle this the same as if it were opened and then closed.
- regionState.update(RegionState.State.CLOSED, createTime, sn);
- // When there are more than one region server a new RS is selected as the
+ regionStates.updateRegionState(rt, RegionState.State.CLOSED);
+ // When there are more than one region server a new RS is selected as the
// destination and the same is updated in the regionplan. (HBASE-5546)
getRegionPlan(regionState, sn, true);
this.executorService.submit(new ClosedRegionHandler(master,
@@ -903,8 +825,8 @@ public class AssignmentManager extends Z
case RS_ZK_REGION_OPENING:
hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.OPENING, createTime, sn);
- regionsInTransition.put(encodedName, regionState);
+ regionState = regionStates.updateRegionState(
+ hri, RegionState.State.OPENING, createTime, sn);
failoverProcessedRegions.put(encodedName, hri);
break;
}
@@ -920,14 +842,14 @@ public class AssignmentManager extends Z
return;
}
// Transition to OPENING (or update stamp if already OPENING)
- regionState.update(RegionState.State.OPENING, createTime, sn);
+ regionStates.updateRegionState(rt, RegionState.State.OPENING);
break;
case RS_ZK_REGION_OPENED:
hri = checkIfInFailover(regionState, encodedName, regionName);
if (hri != null) {
- regionState = new RegionState(hri, RegionState.State.OPEN, createTime, sn);
- regionsInTransition.put(encodedName, regionState);
+ regionState = regionStates.updateRegionState(
+ hri, RegionState.State.OPEN, createTime, sn);
new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion).process();
failoverProcessedRegions.put(encodedName, hri);
break;
@@ -943,7 +865,7 @@ public class AssignmentManager extends Z
return;
}
// Handle OPENED by removing from transition and deleted zk node
- regionState.update(RegionState.State.OPEN, createTime, sn);
+ regionStates.updateRegionState(rt, RegionState.State.OPEN);
this.executorService.submit(
new OpenedRegionHandler(master, this, regionState.getRegion(), sn, expectedVersion));
break;
@@ -975,7 +897,7 @@ public class AssignmentManager extends Z
}
return null;
}
-
+
/**
* Gets the HRegionInfo from the META table
* @param regionName
@@ -1019,7 +941,8 @@ public class AssignmentManager extends Z
private boolean convertPendingCloseToSplitting(final RegionState rs) {
if (!rs.isPendingClose()) return false;
LOG.debug("Converting PENDING_CLOSE to SPLITING; rs=" + rs);
- rs.update(RegionState.State.SPLITTING);
+ regionStates.updateRegionState(
+ rs.getRegion(), RegionState.State.SPLITTING);
// Clean up existing state. Clear from region plans seems all we
// have to do here by way of clean up of PENDING_CLOSE.
clearRegionPlan(rs.getRegion());
@@ -1027,63 +950,6 @@ public class AssignmentManager extends Z
}
/**
- * @param serverName
- * @param encodedName
- * @return The SPLITTING RegionState we added to RIT for the passed region
- * <code>encodedName</code>
- */
- private RegionState addSplittingToRIT(final ServerName serverName,
- final String encodedName) {
- RegionState regionState = null;
- synchronized (this.regions) {
- regionState = findHRegionInfoThenAddToRIT(serverName, encodedName);
- if (regionState != null) {
- regionState.update(RegionState.State.SPLITTING,
- System.currentTimeMillis(), serverName);
- }
- }
- return regionState;
- }
-
- /**
- * Caller must hold lock on <code>this.regions</code>.
- * @param serverName
- * @param encodedName
- * @return The instance of RegionState that was added to RIT or null if error.
- */
- private RegionState findHRegionInfoThenAddToRIT(final ServerName serverName,
- final String encodedName) {
- HRegionInfo hri = findHRegionInfo(serverName, encodedName);
- if (hri == null) {
- LOG.warn("Region " + encodedName + " not found on server " + serverName +
- "; failed processing");
- return null;
- }
- // Add to regions in transition, then update state to SPLITTING.
- return addToRegionsInTransition(hri);
- }
-
- /**
- * Caller must hold lock on <code>this.regions</code>.
- * @param serverName
- * @param encodedName
- * @return Found HRegionInfo or null.
- */
- private HRegionInfo findHRegionInfo(final ServerName sn,
- final String encodedName) {
- if (!this.serverManager.isServerOnline(sn)) return null;
- Set<HRegionInfo> hris = this.servers.get(sn);
- HRegionInfo foundHri = null;
- for (HRegionInfo hri: hris) {
- if (hri.getEncodedName().equals(encodedName)) {
- foundHri = hri;
- break;
- }
- }
- return foundHri;
- }
-
- /**
* Handle a ZK unassigned node transition triggered by HBCK repair tool.
* <p>
* This is handled in a separate code path because it breaks the normal rules.
@@ -1094,7 +960,7 @@ public class AssignmentManager extends Z
LOG.info("Handling HBCK triggered transition=" + rt.getEventType() +
", server=" + rt.getServerName() + ", region=" +
HRegionInfo.prettyPrint(encodedName));
- RegionState regionState = regionsInTransition.get(encodedName);
+ RegionState regionState = regionStates.getRegionTransitionState(encodedName);
switch (rt.getEventType()) {
case M_ZK_REGION_OFFLINE:
HRegionInfo regionInfo = null;
@@ -1178,9 +1044,7 @@ public class AssignmentManager extends Z
public void nodeDeleted(final String path) {
if (path.startsWith(this.watcher.assignmentZNode)) {
String regionName = ZKAssign.getRegionName(this.master.getZooKeeper(), path);
- // no lock concurrency ok: sequentially consistent if someone adds/removes the region in
- // the same time
- RegionState rs = this.regionsInTransition.get(regionName);
+ RegionState rs = regionStates.getRegionTransitionState(regionName);
if (rs != null) {
HRegionInfo regionInfo = rs.getRegion();
if (rs.isSplit()) {
@@ -1191,26 +1055,22 @@ public class AssignmentManager extends Z
LOG.debug("The znode of region " + regionInfo.getRegionNameAsString()
+ " has been deleted.");
if (rs.isOpened()) {
- makeRegionOnline(rs, regionInfo);
+ ServerName serverName = rs.getServerName();
+ regionOnline(regionInfo, serverName);
+ LOG.info("The master has opened the region "
+ + regionInfo.getRegionNameAsString() + " that was online on "
+ + serverName);
+ if (this.getZKTable().isDisablingOrDisabledTable(
+ regionInfo.getTableNameAsString())) {
+ LOG.debug("Opened region "
+ + regionInfo.getRegionNameAsString() + " but "
+ + "this table is disabled, triggering close of region");
+ unassign(regionInfo);
+ }
}
}
}
}
-
- }
-
- private void makeRegionOnline(RegionState rs, HRegionInfo regionInfo) {
- regionOnline(regionInfo, rs.serverName);
- LOG.info("The master has opened the region "
- + regionInfo.getRegionNameAsString() + " that was online on "
- + rs.serverName);
- if (this.getZKTable().isDisablingOrDisabledTable(
- regionInfo.getTableNameAsString())) {
- LOG.debug("Opened region "
- + regionInfo.getRegionNameAsString() + " but "
- + "this table is disabled, triggering close of region");
- unassign(regionInfo);
- }
}
/**
@@ -1247,24 +1107,13 @@ public class AssignmentManager extends Z
* @param sn
*/
void regionOnline(HRegionInfo regionInfo, ServerName sn) {
- // no lock concurrency ok.
- this.regionsInTransition.remove(regionInfo.getEncodedName());
-
- synchronized (this.regions) {
- // Add check
- ServerName oldSn = this.regions.get(regionInfo);
- if (oldSn != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
- " on " + oldSn + " with " + sn);
-
- if (isServerOnline(sn)) {
- this.regions.put(regionInfo, sn);
- addToServers(sn, regionInfo);
- this.regions.notifyAll();
- } else {
- LOG.info("The server is not in online servers, ServerName=" +
- sn.getServerName() + ", region=" + regionInfo.getEncodedName());
- }
+ if (!isServerOnline(sn)) {
+ LOG.warn("A region was opened on a dead server, ServerName=" +
+ sn.getServerName() + ", region=" + regionInfo.getEncodedName());
}
+
+ regionStates.regionOnline(regionInfo, sn);
+
// Remove plan if one.
clearRegionPlan(regionInfo);
// Add the server to serversInUpdatingTimer
@@ -1307,11 +1156,9 @@ public class AssignmentManager extends Z
for (Map.Entry<String, RegionPlan> e : rps) {
if (e.getValue() != null && e.getKey() != null && sn.equals(e.getValue().getDestination())) {
- RegionState rs = this.regionsInTransition.get(e.getKey());
- if (rs != null) {
- // no lock concurrency ok: there is a write when we update the timestamp but it's ok
- // as it's an AtomicLong
- rs.updateTimestampToNow();
+ RegionState regionState = regionStates.getRegionTransitionState(e.getKey());
+ if (regionState != null) {
+ regionState.updateTimestampToNow();
}
}
}
@@ -1325,30 +1172,10 @@ public class AssignmentManager extends Z
* @param regionInfo
*/
public void regionOffline(final HRegionInfo regionInfo) {
- // no lock concurrency ok
- this.regionsInTransition.remove(regionInfo.getEncodedName());
+ regionStates.regionOffline(regionInfo);
// remove the region plan as well just in case.
clearRegionPlan(regionInfo);
- setOffline(regionInfo);
- }
-
- /**
- * Sets the region as offline by removing in-memory assignment information but
- * retaining transition information.
- * <p>
- * Used when a region has been closed but should be reassigned.
- * @param regionInfo
- */
- public void setOffline(HRegionInfo regionInfo) {
- synchronized (this.regions) {
- ServerName sn = this.regions.remove(regionInfo);
- if (sn == null) return;
- Set<HRegionInfo> serverRegions = this.servers.get(sn);
- if (!serverRegions.remove(regionInfo)) {
- LOG.warn("No " + regionInfo + " on " + sn);
- }
- }
}
public void offlineDisabledRegion(HRegionInfo regionInfo) {
@@ -1417,10 +1244,18 @@ public class AssignmentManager extends Z
region.getRegionNameAsString());
return;
}
- RegionState state = addToRegionsInTransition(region,
+ RegionState state = forceRegionStateToOffline(region,
hijack);
- synchronized (state) {
+ // TODO: we can't synchronized on state any more since it could
+ // be an new instance. We need to reconsider how to avoid
+ // double/multiple assignments.
+ // This is to prevent double assignments? Does it work?
+ String encodedName = region.getEncodedName();
+ Lock lock = assignLocker.acquireLock(encodedName);
+ try {
assign(region, state, setOfflineInZK, forceNewPlan, hijack);
+ } finally {
+ lock.unlock();
}
}
@@ -1455,7 +1290,7 @@ public class AssignmentManager extends Z
// time; i.e. handlers on backend won't be trying to set it to OPEN, etc.
AtomicInteger counter = new AtomicInteger(0);
CreateUnassignedAsyncCallback cb =
- new CreateUnassignedAsyncCallback(this.watcher, destination, counter);
+ new CreateUnassignedAsyncCallback(regionStates, this.watcher, destination, counter);
for (RegionState state: states) {
if (!asyncSetOfflineInZooKeeper(state, cb, state)) {
return false;
@@ -1579,12 +1414,15 @@ public class AssignmentManager extends Z
*/
static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
+ private final RegionStates regionStates;
private final ZooKeeperWatcher zkw;
private final ServerName destination;
private final AtomicInteger counter;
- CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
- final ServerName destination, final AtomicInteger counter) {
+ CreateUnassignedAsyncCallback(final RegionStates regionStates,
+ final ZooKeeperWatcher zkw, final ServerName destination,
+ final AtomicInteger counter) {
+ this.regionStates = regionStates;
this.zkw = zkw;
this.destination = destination;
this.counter = counter;
@@ -1604,7 +1442,7 @@ public class AssignmentManager extends Z
// Async exists to set a watcher so we'll get triggered when
// unassigned node changes.
this.zkw.getRecoverableZooKeeper().getZooKeeper().exists(path, this.zkw,
- new ExistsUnassignedAsyncCallback(this.counter, destination), ctx);
+ new ExistsUnassignedAsyncCallback(regionStates, counter, destination), ctx);
}
}
@@ -1614,10 +1452,13 @@ public class AssignmentManager extends Z
*/
static class ExistsUnassignedAsyncCallback implements AsyncCallback.StatCallback {
private final Log LOG = LogFactory.getLog(ExistsUnassignedAsyncCallback.class);
+ private final RegionStates regionStates;
private final AtomicInteger counter;
private ServerName destination;
- ExistsUnassignedAsyncCallback(final AtomicInteger counter, ServerName destination) {
+ ExistsUnassignedAsyncCallback(final RegionStates regionStates,
+ final AtomicInteger counter, ServerName destination) {
+ this.regionStates = regionStates;
this.counter = counter;
this.destination = destination;
}
@@ -1625,7 +1466,7 @@ public class AssignmentManager extends Z
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (rc != 0) {
- // Thisis resultcode. If non-zero, need to resubmit.
+ // This is result code. If non-zero, need to resubmit.
LOG.warn("rc != 0 for " + path + " -- retryable connectionloss -- " +
"FIX see http://wiki.apache.org/hadoop/ZooKeeper/FAQ#A2");
return;
@@ -1637,28 +1478,14 @@ public class AssignmentManager extends Z
// yet sent out the actual open but putting this state change after the
// call to open risks our writing PENDING_OPEN after state has been moved
// to OPENING by the regionserver.
- state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(), destination);
+ regionStates.updateRegionState(state.getRegion(),
+ RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
+ destination);
this.counter.addAndGet(1);
}
}
/**
- * @param region
- * @return The current RegionState
- */
- private RegionState addToRegionsInTransition(final HRegionInfo region) {
- return addToRegionsInTransition(region, false);
- }
- /**
- * @param region
- * @param hijack
- * @return The current RegionState
- */
- private RegionState addToRegionsInTransition(final HRegionInfo region,
- boolean hijack) {
- return forceRegionStateToOffline(region, hijack);
- }
- /**
* Sets regions {@link RegionState} to {@link RegionState.State#OFFLINE}.
* @param region
* @return Amended RegionState.
@@ -1679,10 +1506,10 @@ public class AssignmentManager extends Z
Lock lock = locker.acquireLock(encodedName);
try {
- RegionState state = this.regionsInTransition.get(encodedName);
+ RegionState state = regionStates.getRegionTransitionState(encodedName);
if (state == null) {
- state = new RegionState(region, RegionState.State.OFFLINE);
- this.regionsInTransition.put(encodedName, state);
+ state = regionStates.updateRegionState(
+ region, RegionState.State.OFFLINE);
} else {
// If we are reassigning the node do not force in-memory state to OFFLINE.
// Based on the znode state we will decide if to change in-memory state to
@@ -1692,7 +1519,8 @@ public class AssignmentManager extends Z
// assign on its tail as part of the handling of a region close.
if (!hijack) {
LOG.debug("Forcing OFFLINE; was=" + state);
- state.update(RegionState.State.OFFLINE);
+ state = regionStates.updateRegionState(
+ region, RegionState.State.OFFLINE);
}
}
return state;
@@ -1751,8 +1579,9 @@ public class AssignmentManager extends Z
LOG.info("Assigning region " + state.getRegion().getRegionNameAsString() +
" to " + plan.getDestination().toString());
// Transition RegionState to PENDING_OPEN
- state.update(RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
- plan.getDestination());
+ regionStates.updateRegionState(state.getRegion(),
+ RegionState.State.PENDING_OPEN, System.currentTimeMillis(),
+ plan.getDestination());
// Send OPEN RPC. This can fail if the server on other end is is not up.
// Pass the version that was obtained while setting the node to OFFLINE.
RegionOpeningState regionOpenState = serverManager.sendRegionOpen(plan
@@ -1781,7 +1610,8 @@ public class AssignmentManager extends Z
// Clean out plan we failed execute and one that doesn't look like it'll
// succeed anyways; we need a new plan!
// Transition back to OFFLINE
- state.update(RegionState.State.OFFLINE);
+ regionStates.updateRegionState(
+ state.getRegion(), RegionState.State.OFFLINE);
// Force a new plan and reassign. Will return null if no servers.
if (getRegionPlan(state, plan.getDestination(), true) == null) {
this.timeoutMonitor.setAllRegionServersOffline(true);
@@ -1794,7 +1624,6 @@ public class AssignmentManager extends Z
}
private void processAlreadyOpenedRegion(HRegionInfo region, ServerName sn) {
-
// Remove region from in-memory transition and unassigned node from ZK
// While trying to enable the table the regions of the table were
// already enabled.
@@ -1813,14 +1642,8 @@ public class AssignmentManager extends Z
"Error deleting OFFLINED node in ZK for transition ZK node ("
+ encodedRegionName + ")", e);
}
- // no lock concurrent ok -> sequentially consistent
- this.regionsInTransition.remove(region.getEncodedName());
-
- synchronized (this.regions) {
- this.regions.put(region, sn);
- addToServers(sn, region);
- }
+ regionStates.regionOnline(region, sn);
}
private boolean isDisabledorDisablingRegionInRIT(final HRegionInfo region) {
@@ -1864,10 +1687,12 @@ public class AssignmentManager extends Z
if (hijack &&
(state.getState().equals(RegionState.State.PENDING_OPEN) ||
state.getState().equals(RegionState.State.OPENING))) {
- state.update(RegionState.State.PENDING_OPEN);
+ regionStates.updateRegionState(state.getRegion(),
+ RegionState.State.PENDING_OPEN);
allowZNodeCreation = false;
} else {
- state.update(RegionState.State.OFFLINE);
+ regionStates.updateRegionState(state.getRegion(),
+ RegionState.State.OFFLINE);
allowZNodeCreation = true;
}
int versionOfOfflineNode = -1;
@@ -1901,7 +1726,8 @@ public class AssignmentManager extends Z
new IllegalStateException());
return false;
}
- state.update(RegionState.State.OFFLINE);
+ regionStates.updateRegionState(
+ state.getRegion(), RegionState.State.OFFLINE);
try {
ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
this.master.getServerName(), cb, ctx);
@@ -2009,10 +1835,10 @@ public class AssignmentManager extends Z
int waitTime = this.master.getConfiguration().getInt(
"hbase.bulk.waitbetween.reopen", 0);
for (HRegionInfo region : regions) {
- if (isRegionInTransition(region) != null)
+ if (regionStates.isRegionInTransition(region))
continue;
unassign(region, false);
- while (isRegionInTransition(region) != null) {
+ while (regionStates.isRegionInTransition(region)) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
@@ -2065,24 +1891,21 @@ public class AssignmentManager extends Z
LOG.debug("Starting unassignment of region " +
region.getRegionNameAsString() + " (offlining)");
- synchronized (this.regions) {
- // Check if this region is currently assigned
- if (!regions.containsKey(region)) {
- LOG.debug("Attempted to unassign region " +
- region.getRegionNameAsString() + " but it is not " +
- "currently assigned anywhere");
- return;
- }
+ // Check if this region is currently assigned
+ if (!regionStates.isRegionAssigned(region)) {
+ LOG.debug("Attempted to unassign region " +
+ region.getRegionNameAsString() + " but it is not " +
+ "currently assigned anywhere");
+ return;
}
String encodedName = region.getEncodedName();
// Grab the state of this region and synchronize on it
- RegionState state;
int versionOfClosingNode = -1;
// We need a lock here as we're going to do a put later and we don't want multiple states
// creation
ReentrantLock lock = locker.acquireLock(encodedName);
+ RegionState state = regionStates.getRegionTransitionState(encodedName);
try {
- state = regionsInTransition.get(encodedName);
if (state == null) {
// Create the znode in CLOSING state
try {
@@ -2124,13 +1947,12 @@ public class AssignmentManager extends Z
master.abort("Unexpected ZK exception creating node CLOSING", e);
return;
}
- state = new RegionState(region, RegionState.State.PENDING_CLOSE);
- regionsInTransition.put(encodedName, state);
+ state = regionStates.updateRegionState(region, RegionState.State.PENDING_CLOSE);
} else if (force && (state.isPendingClose() || state.isClosing())) {
LOG.debug("Attempting to unassign region " + region.getRegionNameAsString() +
" which is already " + state.getState() +
" but forcing to send a CLOSE RPC again ");
- state.update(state.getState());
+ state.updateTimestampToNow();
} else {
LOG.debug("Attempting to unassign region " +
region.getRegionNameAsString() + " but it is " +
@@ -2142,27 +1964,9 @@ public class AssignmentManager extends Z
}
// Send CLOSE RPC
- ServerName server = null;
- synchronized (this.regions) {
- server = regions.get(region);
- }
+ ServerName server = state.getServerName();
// ClosedRegionhandler can remove the server from this.regions
if (server == null) {
- // Possibility of disable flow removing from RIT.
- lock = locker.acquireLock(encodedName);
- try {
- state = regionsInTransition.get(encodedName);
- if (state != null) {
- // remove only if the state is PENDING_CLOSE or CLOSING
- RegionState.State presentState = state.getState();
- if (presentState == RegionState.State.PENDING_CLOSE
- || presentState == RegionState.State.CLOSING) {
- this.regionsInTransition.remove(encodedName);
- }
- }
- } finally {
- lock.unlock();
- }
// delete the node. if no node exists need not bother.
deleteClosingOrClosedNode(region);
return;
@@ -2194,22 +1998,13 @@ public class AssignmentManager extends Z
+ region.getTableNameAsString()
+ " to DISABLED state the region " + region
+ " was offlined but the table was in DISABLING state");
- this.regionsInTransition.remove(region.getEncodedName());
-
- // Remove from the regionsMap
- synchronized (this.regions) {
- this.regions.remove(region);
- Set<HRegionInfo> serverRegions = this.servers.get(server);
- if (!serverRegions.remove(region)) {
- LOG.warn("No " + region + " on " + server);
- }
- }
+ regionStates.regionOffline(region);
deleteClosingOrClosedNode(region);
}
} else if (t instanceof RegionAlreadyInTransitionException) {
// RS is already processing this region, only need to update the timestamp
LOG.debug("update " + state + " the timestamp.");
- state.update(state.getState());
+ state.updateTimestampToNow();
}
LOG.info("Server " + server + " returned " + t + " for " +
region.getRegionNameAsString(), t);
@@ -2284,13 +2079,12 @@ public class AssignmentManager extends Z
*/
public void waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
- synchronized(regions) {
- while(!this.master.isStopped() && !regions.containsKey(regionInfo)) {
- // We should receive a notification, but it's
- // better to have a timeout to recheck the condition here:
- // it lowers the impact of a race condition if any
- regions.wait(100);
- }
+ while(!this.master.isStopped() &&
+ !regionStates.isRegionAssigned(regionInfo)) {
+ // We should receive a notification, but it's
+ // better to have a timeout to recheck the condition here:
+ // it lowers the impact of a race condition if any
+ regionStates.waitForUpdate(100);
}
}
@@ -2556,12 +2350,12 @@ public class AssignmentManager extends Z
// state of the Master.
final long endTime = System.currentTimeMillis() + timeout;
- while (!this.master.isStopped() && !regionsInTransition.isEmpty() &&
- endTime > System.currentTimeMillis()) {
- regionsInTransition.waitForUpdate(100);
+ while (!this.master.isStopped() && regionStates.isRegionsInTransition()
+ && endTime > System.currentTimeMillis()) {
+ regionStates.waitForUpdate(100);
}
- return regionsInTransition.isEmpty();
+ return !regionStates.isRegionsInTransition();
}
/**
@@ -2580,13 +2374,13 @@ public class AssignmentManager extends Z
Iterator<HRegionInfo> regionInfoIterator = regions.iterator();
while (regionInfoIterator.hasNext()) {
HRegionInfo hri = regionInfoIterator.next();
- if (!regionsInTransition.containsKey(hri.getEncodedName())) {
+ if (!regionStates.isRegionInTransition(hri)) {
regionInfoIterator.remove();
}
}
if (!regions.isEmpty()) {
- regionsInTransition.waitForUpdate(100);
+ regionStates.waitForUpdate(100);
}
}
@@ -2620,6 +2414,7 @@ public class AssignmentManager extends Z
HRegionInfo regionInfo = region.getFirst();
ServerName regionLocation = region.getSecond();
if (regionInfo == null) continue;
+ regionStates.createRegionState(regionInfo);
String tableName = regionInfo.getTableNameAsString();
if (regionLocation == null) {
// regionLocation could be null if createTable didn't finish properly.
@@ -2664,19 +2459,16 @@ public class AssignmentManager extends Z
byte[] data = ZKUtil.getDataNoWatch(this.watcher, node, stat);
// If znode does not exist dont consider this region
if (data == null) {
- LOG.debug("Region " + regionInfo.getRegionNameAsString()
- + " split is completed. Hence need not add to regions list");
+ LOG.debug("Region " + regionInfo.getRegionNameAsString()
+ + " split is completed. Hence need not add to regions list");
continue;
}
}
// Region is being served and on an active server
// add only if region not in disabled and enabling table
- if (false == checkIfRegionBelongsToDisabled(regionInfo)
- && false == checkIfRegionsBelongsToEnabling(regionInfo)) {
- synchronized (this.regions) {
- regions.put(regionInfo, regionLocation);
- addToServers(regionLocation, regionInfo);
- }
+ if (!checkIfRegionBelongsToDisabled(regionInfo)
+ && !checkIfRegionsBelongsToEnabling(regionInfo)) {
+ regionStates.regionOnline(regionInfo, regionLocation);
}
disablingOrEnabling = addTheTablesInPartialState(this.disablingTables,
this.enablingTables, regionInfo, tableName);
@@ -2876,32 +2668,6 @@ public class AssignmentManager extends Z
}
}
- /*
- * Presumes caller has taken care of necessary locking modifying servers Map.
- * @param hsi
- * @param hri
- */
- private void addToServers(final ServerName sn, final HRegionInfo hri) {
- Set<HRegionInfo> hris = servers.get(sn);
- if (hris == null) {
- hris = new ConcurrentSkipListSet<HRegionInfo>();
- servers.put(sn, hris);
- }
- if (!hris.contains(hri)) hris.add(hri);
- }
-
- /**
- * @return A copy of the Map of regions currently in transition.
- */
- public NavigableMap<String, RegionState> copyRegionsInTransition() {
- // no lock concurrent access ok
- return regionsInTransition.copyMap();
- }
-
- NotifiableConcurrentSkipListMap<String, RegionState> getRegionsInTransition() {
- return regionsInTransition;
- }
-
/**
* Set Regions in transitions metrics.
* This takes an iterator on the RegionInTransition map (CLSM), and is not synchronized.
@@ -2916,9 +2682,9 @@ public class AssignmentManager extends Z
long oldestRITTime = 0;
int ritThreshold = this.master.getConfiguration().
getInt(HConstants.METRICS_RIT_STUCK_WARNING_THRESHOLD, 60000);
- for (Map.Entry<String, RegionState> e : this.regionsInTransition.copyEntrySet()) {
+ for (RegionState state: regionStates.getRegionsInTransition().values()) {
totalRITs++;
- long ritTime = currentTime - e.getValue().getStamp();
+ long ritTime = currentTime - state.getStamp();
if (ritTime > ritThreshold) { // more than the threshold
totalRITsOverThreshold++;
}
@@ -2934,26 +2700,6 @@ public class AssignmentManager extends Z
}
/**
- * @return True if regions in transition.
- */
- public boolean isRegionsInTransition() {
- // no lock concurrent access ok: we could imagine that someone is currently going to remove
- // it or add a region, but it's sequentially consistent.
- return !(this.regionsInTransition.isEmpty());
- }
-
- /**
- * @param hri Region to check.
- * @return Returns null if passed region is not in transition else the current
- * RegionState
- */
- public RegionState isRegionInTransition(final HRegionInfo hri) {
- // no lock concurrent access ok: we could imagine that someone is currently going to remove
- // it or add it, but it's sequentially consistent.
- return this.regionsInTransition.get(hri.getEncodedName());
- }
-
- /**
* @param region Region whose plan we are to clear.
*/
void clearRegionPlan(final HRegionInfo region) {
@@ -2968,14 +2714,14 @@ public class AssignmentManager extends Z
* @throws IOException
*/
public void waitOnRegionToClearRegionsInTransition(final HRegionInfo hri)
- throws IOException {
- if (isRegionInTransition(hri) == null) return;
+ throws IOException, InterruptedException {
+ if (!regionStates.isRegionInTransition(hri)) return;
RegionState rs = null;
// There is already a timeout monitor on regions in transition so I
// should not have to have one here too?
- while(!this.master.isStopped() && (rs = isRegionInTransition(hri)) != null) {
- Threads.sleep(1000);
+ while(!this.master.isStopped() && regionStates.isRegionInTransition(hri)) {
LOG.info("Waiting on " + rs + " to clear regions-in-transition");
+ regionStates.waitForUpdate(1000);
}
if (this.master.isStopped()) {
LOG.info("Giving up wait on regions in " +
@@ -2983,35 +2729,6 @@ public class AssignmentManager extends Z
}
}
-
- /**
- * Gets the online regions of the specified table.
- * This method looks at the in-memory state. It does not go to <code>.META.</code>.
- * Only returns <em>online</em> regions. If a region on this table has been
- * closed during a disable, etc., it will be included in the returned list.
- * So, the returned list may not necessarily be ALL regions in this table, its
- * all the ONLINE regions in the table.
- * @param tableName
- * @return Online regions from <code>tableName</code>
- */
- public List<HRegionInfo> getRegionsOfTable(byte[] tableName) {
- List<HRegionInfo> tableRegions = new ArrayList<HRegionInfo>();
- // boundary needs to have table's name but regionID 0 so that it is sorted
- // before all table's regions.
- HRegionInfo boundary =
- new HRegionInfo(tableName, null, null, false, 0L);
- synchronized (this.regions) {
- for (HRegionInfo regionInfo: this.regions.tailMap(boundary).keySet()) {
- if(Bytes.equals(regionInfo.getTableName(), tableName)) {
- tableRegions.add(regionInfo);
- } else {
- break;
- }
- }
- }
- return tableRegions;
- }
-
/**
* Update timers for all regions in transition going against the server in the
* serversInUpdatingTimer.
@@ -3093,7 +2810,7 @@ public class AssignmentManager extends Z
long now = System.currentTimeMillis();
// no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
// a copy while another thread is adding/removing items
- for (RegionState regionState : regionsInTransition.copyValues()) {
+ for (RegionState regionState : regionStates.getRegionsInTransition().values()) {
if (regionState.getStamp() + timeout <= now ||
(this.allRegionServersOffline && !noRSAvailable)) {
//decide on action upon timeout or, if some RSs just came back online, we can start the
@@ -3132,8 +2849,15 @@ public class AssignmentManager extends Z
case OPEN:
LOG.error("Region has been OPEN for too long, " +
"we don't know where region was opened so can't do anything");
- synchronized (regionState) {
+ // TODO: do we need synchronization here?
+ // could not synchronized on regionState since it can be
+ // an new instance
+ String encodedName = regionState.getRegion().getEncodedName();
+ Lock lock = assignLocker.acquireLock(encodedName);
+ try {
regionState.updateTimestampToNow();
+ } finally {
+ lock.unlock();
}
break;
@@ -3242,7 +2966,7 @@ public class AssignmentManager extends Z
return matchZK;
}
- ServerName addressFromAM = getRegionServerOfRegion(hri);
+ ServerName addressFromAM = regionStates.getRegionServerOfRegion(hri);
boolean matchAM = (addressFromAM != null &&
addressFromAM.equals(serverName));
LOG.debug("based on AM, current region=" + hri.getRegionNameAsString() +
@@ -3271,33 +2995,7 @@ public class AssignmentManager extends Z
}
}
}
- // TODO: Do we want to sync on RIT here?
- // Remove this server from map of servers to regions, and remove all regions
- // of this server from online map of regions.
- Set<HRegionInfo> deadRegions = null;
- List<RegionState> rits = new ArrayList<RegionState>();
- synchronized (this.regions) {
- Set<HRegionInfo> assignedRegions = this.servers.remove(sn);
- if (assignedRegions == null || assignedRegions.isEmpty()) {
- // No regions on this server, we are done, return empty list of RITs
- return rits;
- }
- deadRegions = new TreeSet<HRegionInfo>(assignedRegions);
- for (HRegionInfo region : deadRegions) {
- this.regions.remove(region);
- }
- }
- // See if any of the regions that were online on this server were in RIT
- // If they are, normal timeouts will deal with them appropriately so
- // let's skip a manual re-assignment.
- // no lock concurrent access ok: we will be working on a copy, and it's java-valid to do
- // a copy while another thread is adding/removing items
- for (RegionState region : this.regionsInTransition.copyValues()) {
- if (deadRegions.remove(region.getRegion())) {
- rits.add(region);
- }
- }
- return rits;
+ return regionStates.serverOffline(sn);
}
/**
@@ -3325,92 +3023,6 @@ public class AssignmentManager extends Z
}
/**
- * This is an EXPENSIVE clone. Cloning though is the safest thing to do.
- * Can't let out original since it can change and at least the loadbalancer
- * wants to iterate this exported list. We need to synchronize on regions
- * since all access to this.servers is under a lock on this.regions.
- *
- * @return A clone of current assignments by table.
- */
- Map<String, Map<ServerName, List<HRegionInfo>>> getAssignmentsByTable() {
- Map<String, Map<ServerName, List<HRegionInfo>>> result = null;
- synchronized (this.regions) {
- result = new HashMap<String, Map<ServerName,List<HRegionInfo>>>();
- if (!this.master.getConfiguration().
- getBoolean("hbase.master.loadbalance.bytable", true)) {
- result.put("ensemble", getAssignments());
- } else {
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
- for (HRegionInfo hri : e.getValue()) {
- if (hri.isMetaRegion() || hri.isRootRegion()) continue;
- String tablename = hri.getTableNameAsString();
- Map<ServerName, List<HRegionInfo>> svrToRegions = result.get(tablename);
- if (svrToRegions == null) {
- svrToRegions = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
- result.put(tablename, svrToRegions);
- }
- List<HRegionInfo> regions = null;
- if (!svrToRegions.containsKey(e.getKey())) {
- regions = new ArrayList<HRegionInfo>();
- svrToRegions.put(e.getKey(), regions);
- } else {
- regions = svrToRegions.get(e.getKey());
- }
- regions.add(hri);
- }
- }
- }
- }
- Map<ServerName, ServerLoad> onlineSvrs = this.serverManager.getOnlineServers();
- // Take care of servers w/o assignments.
- for (Map<ServerName,List<HRegionInfo>> map : result.values()) {
- for (Map.Entry<ServerName, ServerLoad> svrEntry: onlineSvrs.entrySet()) {
- if (!map.containsKey(svrEntry.getKey())) {
- map.put(svrEntry.getKey(), new ArrayList<HRegionInfo>());
- }
- }
- }
- return result;
- }
-
- /**
- * @return A clone of current assignments. Note, this is assignments only.
- * If a new server has come in and it has no regions, it will not be included
- * in the returned Map.
- */
- Map<ServerName, List<HRegionInfo>> getAssignments() {
- // This is an EXPENSIVE clone. Cloning though is the safest thing to do.
- // Can't let out original since it can change and at least the loadbalancer
- // wants to iterate this exported list. We need to synchronize on regions
- // since all access to this.servers is under a lock on this.regions.
- Map<ServerName, List<HRegionInfo>> result = null;
- synchronized (this.regions) {
- result = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
- result.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
- }
- }
- return result;
- }
-
- /**
- * @param encodedRegionName Region encoded name.
- * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
- * and the hosting servers {@link ServerName}.
- */
- Pair<HRegionInfo, ServerName> getAssignment(final byte [] encodedRegionName) {
- String name = Bytes.toString(encodedRegionName);
- synchronized(this.regions) {
- for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
- if (e.getKey().getEncodedName().equals(name)) {
- return new Pair<HRegionInfo, ServerName>(e.getKey(), e.getValue());
- }
- }
- }
- return null;
- }
-
- /**
* @param plan Plan to execute.
*/
void balance(final RegionPlan plan) {
@@ -3420,265 +3032,6 @@ public class AssignmentManager extends Z
unassign(plan.getRegionInfo(), false, plan.getDestination());
}
- /**
- * Run through remaining regionservers and unassign all catalog regions.
- */
- void unassignCatalogRegions() {
- synchronized (this.regions) {
- for (Map.Entry<ServerName, Set<HRegionInfo>> e: this.servers.entrySet()) {
- Set<HRegionInfo> regions = e.getValue();
- if (regions == null || regions.isEmpty()) continue;
- for (HRegionInfo hri: regions) {
- if (hri.isMetaRegion()) {
- unassign(hri);
- }
- }
- }
- }
- }
-
- /**
- * State of a Region while undergoing transitions.
- */
- public static class RegionState implements org.apache.hadoop.io.Writable {
- private HRegionInfo region;
-
- public enum State {
- OFFLINE, // region is in an offline state
- PENDING_OPEN, // sent rpc to server to open but has not begun
- OPENING, // server has begun to open but not yet done
- OPEN, // server opened region and updated meta
- PENDING_CLOSE, // sent rpc to server to close but has not begun
- CLOSING, // server has begun to close but not yet done
- CLOSED, // server closed region and updated meta
- SPLITTING, // server started split of a region
- SPLIT // server completed split of a region
- }
-
- private State state;
- // Many threads can update the state at the stamp at the same time
- private final AtomicLong stamp;
- private ServerName serverName;
-
- public RegionState() {
- this.stamp = new AtomicLong(System.currentTimeMillis());
- }
-
- RegionState(HRegionInfo region, State state) {
- this(region, state, System.currentTimeMillis(), null);
- }
-
- RegionState(HRegionInfo region, State state, long stamp, ServerName serverName) {
- this.region = region;
- this.state = state;
- this.stamp = new AtomicLong(stamp);
- this.serverName = serverName;
- }
-
- public void update(State state, long stamp, ServerName serverName) {
- this.state = state;
- updateTimestamp(stamp);
- this.serverName = serverName;
- }
-
- public void update(State state) {
- this.state = state;
- updateTimestampToNow();
- this.serverName = null;
- }
-
- public void updateTimestamp(long stamp) {
- this.stamp.set(stamp);
- }
-
- public void updateTimestampToNow() {
- this.stamp.set(System.currentTimeMillis());
- }
-
- public State getState() {
- return state;
- }
-
- public long getStamp() {
- return stamp.get();
- }
-
- public HRegionInfo getRegion() {
- return region;
- }
-
- public ServerName getServerName() {
- return serverName;
- }
-
- public boolean isClosing() {
- return state == State.CLOSING;
- }
-
- public boolean isClosed() {
- return state == State.CLOSED;
- }
-
- public boolean isPendingClose() {
- return state == State.PENDING_CLOSE;
- }
-
- public boolean isOpening() {
- return state == State.OPENING;
- }
-
- public boolean isOpened() {
- return state == State.OPEN;
- }
-
- public boolean isPendingOpen() {
- return state == State.PENDING_OPEN;
- }
-
- public boolean isOffline() {
- return state == State.OFFLINE;
- }
-
- public boolean isSplitting() {
- return state == State.SPLITTING;
- }
-
- public boolean isSplit() {
- return state == State.SPLIT;
- }
-
- @Override
- public String toString() {
- return region.getRegionNameAsString()
- + " state=" + state
- + ", ts=" + stamp
- + ", server=" + serverName;
- }
-
- /**
- * A slower (but more easy-to-read) stringification
- */
- public String toDescriptiveString() {
- long lstamp = stamp.get();
- long relTime = System.currentTimeMillis() - lstamp;
-
- return region.getRegionNameAsString()
- + " state=" + state
- + ", ts=" + new Date(lstamp) + " (" + (relTime/1000) + "s ago)"
- + ", server=" + serverName;
- }
-
- /**
- * Convert a RegionState to an HBaseProtos.RegionState
- *
- * @return the converted HBaseProtos.RegionState
- */
- public ClusterStatusProtos.RegionState convert() {
- ClusterStatusProtos.RegionState.Builder regionState = ClusterStatusProtos.RegionState.newBuilder();
- ClusterStatusProtos.RegionState.State rs;
- switch (regionState.getState()) {
- case OFFLINE:
- rs = ClusterStatusProtos.RegionState.State.OFFLINE;
- break;
- case PENDING_OPEN:
- rs = ClusterStatusProtos.RegionState.State.PENDING_OPEN;
- break;
- case OPENING:
- rs = ClusterStatusProtos.RegionState.State.OPENING;
- break;
- case OPEN:
- rs = ClusterStatusProtos.RegionState.State.OPEN;
- break;
- case PENDING_CLOSE:
- rs = ClusterStatusProtos.RegionState.State.PENDING_CLOSE;
- break;
- case CLOSING:
- rs = ClusterStatusProtos.RegionState.State.CLOSING;
- break;
- case CLOSED:
- rs = ClusterStatusProtos.RegionState.State.CLOSED;
- break;
- case SPLITTING:
- rs = ClusterStatusProtos.RegionState.State.SPLITTING;
- break;
- case SPLIT:
- rs = ClusterStatusProtos.RegionState.State.SPLIT;
- break;
- default:
- throw new IllegalStateException("");
- }
- regionState.setRegionInfo(HRegionInfo.convert(region));
- regionState.setState(rs);
- regionState.setStamp(getStamp());
- return regionState.build();
- }
-
- /**
- * Convert a protobuf HBaseProtos.RegionState to a RegionState
- *
- * @return the RegionState
- */
- public static RegionState convert(ClusterStatusProtos.RegionState proto) {
- RegionState.State state;
- switch (proto.getState()) {
- case OFFLINE:
- state = State.OFFLINE;
- break;
- case PENDING_OPEN:
- state = State.PENDING_OPEN;
- break;
- case OPENING:
- state = State.OPENING;
- break;
- case OPEN:
- state = State.OPEN;
- break;
- case PENDING_CLOSE:
- state = State.PENDING_CLOSE;
- break;
- case CLOSING:
- state = State.CLOSING;
- break;
- case CLOSED:
- state = State.CLOSED;
- break;
- case SPLITTING:
- state = State.SPLITTING;
- break;
- case SPLIT:
- state = State.SPLIT;
- break;
- default:
- throw new IllegalStateException("");
- }
-
- return new RegionState(HRegionInfo.convert(proto.getRegionInfo()),state,proto.getStamp(),null);
- }
-
- /**
- * @deprecated Writables are going away
- */
- @Deprecated
- @Override
- public void readFields(DataInput in) throws IOException {
- region = new HRegionInfo();
- region.readFields(in);
- state = State.valueOf(in.readUTF());
- stamp.set(in.readLong());
- }
-
- /**
- * @deprecated Writables are going away
- */
- @Deprecated
- @Override
- public void write(DataOutput out) throws IOException {
- region.write(out);
- out.writeUTF(state.name());
- out.writeLong(stamp.get());
- }
- }
-
public void stop() {
this.timeoutMonitor.interrupt();
this.timerUpdater.interrupt();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java?rev=1366438&r1=1366437&r2=1366438&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/BulkReOpen.java Fri Jul 27 16:34:32 2012
@@ -62,8 +62,7 @@ public class BulkReOpen extends BulkAssi
// add plans for the regions that need to be reopened
Map<String, RegionPlan> plans = new HashMap<String, RegionPlan>();
for (HRegionInfo hri : hris) {
- RegionPlan reOpenPlan = new RegionPlan(hri, null,
- assignmentManager.getRegionServerOfRegion(hri));
+ RegionPlan reOpenPlan = assignmentManager.getRegionReopenPlan(hri);
plans.put(hri.getEncodedName(), reOpenPlan);
}
assignmentManager.addPlans(plans);