You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2012/09/24 22:33:20 UTC
svn commit: r1389561 [2/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/master/handler/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/zookeeper/ test...
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java?rev=1389561&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java Mon Sep 24 20:33:19 2012
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Run bulk assign. Does one RCP per regionserver passing a
+ * batch of regions using {@link SingleServerBulkAssigner}.
+ */
+@InterfaceAudience.Private
+public class GeneralBulkAssigner extends BulkAssigner {
+ private static final Log LOG = LogFactory.getLog(GeneralBulkAssigner.class);
+
+ private Map<ServerName, List<HRegionInfo>> failedPlans
+ = new ConcurrentHashMap<ServerName, List<HRegionInfo>>();
+ private ExecutorService pool;
+
+ final Map<ServerName, List<HRegionInfo>> bulkPlan;
+ final AssignmentManager assignmentManager;
+
+ GeneralBulkAssigner(final Server server,
+ final Map<ServerName, List<HRegionInfo>> bulkPlan,
+ final AssignmentManager am) {
+ super(server);
+ this.bulkPlan = bulkPlan;
+ this.assignmentManager = am;
+ }
+
+ @Override
+ public boolean bulkAssign(boolean sync) throws InterruptedException,
+ IOException {
+ // Disable timing out regions in transition up in zk while bulk assigning.
+ this.assignmentManager.timeoutMonitor.bulkAssign(true);
+ try {
+ return super.bulkAssign(sync);
+ } finally {
+ // Re-enable timing out regions in transition up in zk.
+ this.assignmentManager.timeoutMonitor.bulkAssign(false);
+ }
+ }
+
+ @Override
+ protected String getThreadNamePrefix() {
+ return this.server.getServerName() + "-GeneralBulkAssigner";
+ }
+
+ @Override
+ protected void populatePool(ExecutorService pool) {
+ this.pool = pool; // shut it down later in case some assigner hangs
+ for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
+ pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
+ this.assignmentManager, this.failedPlans));
+ }
+ }
+
+ /**
+ *
+ * @param timeout How long to wait.
+ * @return true if done.
+ */
+ @Override
+ protected boolean waitUntilDone(final long timeout)
+ throws InterruptedException {
+ Set<HRegionInfo> regionSet = new HashSet<HRegionInfo>();
+ for (List<HRegionInfo> regionList : bulkPlan.values()) {
+ regionSet.addAll(regionList);
+ }
+
+ pool.shutdown(); // no more task allowed
+ int serverCount = bulkPlan.size();
+ int regionCount = regionSet.size();
+ long startTime = System.currentTimeMillis();
+ long rpcWaitTime = startTime + timeout;
+ while (!server.isStopped() && !pool.isTerminated()
+ && rpcWaitTime > System.currentTimeMillis()) {
+ if (failedPlans.isEmpty()) {
+ pool.awaitTermination(100, TimeUnit.MILLISECONDS);
+ } else {
+ reassignFailedPlans();
+ }
+ }
+ if (!pool.isTerminated()) {
+ LOG.warn("bulk assigner is still running after "
+ + (System.currentTimeMillis() - startTime) + "ms, shut it down now");
+ // some assigner hangs, can't wait any more, shutdown the pool now
+ List<Runnable> notStarted = pool.shutdownNow();
+ if (notStarted != null && !notStarted.isEmpty()) {
+ server.abort("some single server assigner hasn't started yet"
+ + " when the bulk assigner timed out", null);
+ return false;
+ }
+ }
+
+ int reassigningRegions = 0;
+ if (!failedPlans.isEmpty() && !server.isStopped()) {
+ reassigningRegions = reassignFailedPlans();
+ }
+
+ Configuration conf = server.getConfiguration();
+ long perRegionOpenTimeGuesstimate =
+ conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
+ long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
+ + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+ RegionStates regionStates = assignmentManager.getRegionStates();
+ // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+ while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
+ Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+ while (regionInfoIterator.hasNext()) {
+ HRegionInfo hri = regionInfoIterator.next();
+ RegionState state = regionStates.getRegionState(hri);
+ if ((!regionStates.isRegionInTransition(hri) && regionStates.isRegionAssigned(hri))
+ || state.isSplit() || state.isSplitting()) {
+ regionInfoIterator.remove();
+ }
+ }
+ if (!regionSet.isEmpty()) {
+ regionStates.waitForUpdate(100);
+ }
+ }
+
+ if (LOG.isDebugEnabled()) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ String status = "successfully";
+ if (!regionSet.isEmpty()) {
+ status = "with " + regionSet.size() + " regions still not assigned yet";
+ }
+ LOG.debug("bulk assigning total " + regionCount + " regions to "
+ + serverCount + " servers, took " + elapsedTime + "ms, " + status);
+ }
+ return regionSet.isEmpty();
+ }
+
+ @Override
+ protected long getTimeoutOnRIT() {
+ // Guess timeout. Multiply the max number of regions on a server
+ // by how long we think one region takes opening.
+ Configuration conf = server.getConfiguration();
+ long perRegionOpenTimeGuesstimate =
+ conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
+ int maxRegionsPerServer = 1;
+ for (List<HRegionInfo> regionList : bulkPlan.values()) {
+ int size = regionList.size();
+ if (size > maxRegionsPerServer) {
+ maxRegionsPerServer = size;
+ }
+ }
+ long timeout = perRegionOpenTimeGuesstimate * maxRegionsPerServer
+ + conf.getLong("hbase.regionserver.rpc.startup.waittime", 60000)
+ + conf.getLong("hbase.bulk.assignment.perregionserver.rpc.waittime",
+ 30000) * bulkPlan.size();
+ LOG.debug("Timeout-on-RIT=" + timeout);
+ return timeout;
+ }
+
+ @Override
+ protected UncaughtExceptionHandler getUncaughtExceptionHandler() {
+ return new UncaughtExceptionHandler() {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ LOG.warn("Assigning regions in " + t.getName(), e);
+ }
+ };
+ }
+
+ private int reassignFailedPlans() {
+ List<HRegionInfo> reassigningRegions = new ArrayList<HRegionInfo>();
+ for (Map.Entry<ServerName, List<HRegionInfo>> e : failedPlans.entrySet()) {
+ LOG.info("Failed assigning " + e.getValue().size()
+ + " regions to server " + e.getKey() + ", reassigning them");
+ reassigningRegions.addAll(failedPlans.remove(e.getKey()));
+ }
+ for (HRegionInfo region : reassigningRegions) {
+ assignmentManager.assign(region, true, true);
+ }
+ return reassigningRegions.size();
+ }
+
+ /**
+ * Manage bulk assigning to a server.
+ */
+ static class SingleServerBulkAssigner implements Runnable {
+ private final ServerName regionserver;
+ private final List<HRegionInfo> regions;
+ private final AssignmentManager assignmentManager;
+ private final Map<ServerName, List<HRegionInfo>> failedPlans;
+
+ SingleServerBulkAssigner(final ServerName regionserver,
+ final List<HRegionInfo> regions, final AssignmentManager am,
+ final Map<ServerName, List<HRegionInfo>> failedPlans) {
+ this.regionserver = regionserver;
+ this.regions = regions;
+ this.assignmentManager = am;
+ this.failedPlans = failedPlans;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (!assignmentManager.assign(regionserver, regions)) {
+ failedPlans.put(regionserver, regions);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Failed bulking assigning " + regions.size()
+ + " region(s) to " + regionserver.getServerName()
+ + ", and continue to bulk assign others", t);
+ failedPlans.put(regionserver, regions);
+ }
+ }
+ }
+}
Propchange: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Mon Sep 24 20:33:19 2012
@@ -30,11 +30,8 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import java.util.SortedMap;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -61,7 +58,6 @@ import org.apache.hadoop.hbase.MasterMon
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.PleaseHoldException;
-import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerLoad;
@@ -661,7 +657,7 @@ Server {
// Wait for region servers to report in.
this.serverManager.waitForRegionServers(status);
- // Check zk for regionservers that are up but didn't register
+ // Check zk for region servers that are up but didn't register
for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
if (!this.serverManager.isServerOnline(sn)) {
// Not registered; add it.
@@ -690,7 +686,7 @@ Server {
.updateRootAndMetaIfNecessary(this);
this.balancer.setMasterServices(this);
- // Fixup assignment manager status
+ // Fix up assignment manager status
status.setStatus("Starting assignment manager");
this.assignmentManager.joinCluster();
@@ -765,12 +761,11 @@ Server {
/**
* If ServerShutdownHandler is disabled, we enable it and expire those dead
* but not expired servers.
- * @throws IOException
*/
- private void enableServerShutdownHandler() throws IOException {
+ private void enableServerShutdownHandler() {
if (!serverShutdownHandlerEnabled) {
serverShutdownHandlerEnabled = true;
- this.serverManager.expireDeadNotExpiredServers();
+ this.serverManager.processQueuedDeadServers();
}
}
@@ -845,7 +840,7 @@ Server {
enableSSHandWaitForMeta();
assigned++;
} else {
- // Region already assigned. We didnt' assign it. Add to in-memory state.
+ // Region already assigned. We didn't assign it. Add to in-memory state.
this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
this.catalogTracker.getMetaLocation());
}
@@ -911,8 +906,11 @@ Server {
// Now work on our list of found parents. See if any we can clean up.
int fixups = 0;
for (Map.Entry<HRegionInfo, Result> e : offlineSplitParents.entrySet()) {
- fixups += ServerShutdownHandler.fixupDaughters(
+ ServerName sn = HRegionInfo.getServerName(e.getValue());
+ if (!serverManager.isServerDead(sn)) { // Otherwise, let SSH take care of it
+ fixups += ServerShutdownHandler.fixupDaughters(
e.getValue(), assignmentManager, catalogTracker);
+ }
}
if (fixups != 0) {
LOG.info("Scanned the catalog and fixed up " + fixups +
@@ -1484,7 +1482,7 @@ Server {
}
this.executorService.submit(new CreateTableHandler(this,
- this.fileSystemManager, this.serverManager, hTableDescriptor, conf,
+ this.fileSystemManager, hTableDescriptor, conf,
newRegions, catalogTracker, assignmentManager));
if (cpHost != null) {
cpHost.postCreateTable(hTableDescriptor, newRegions);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionStates.java Mon Sep 24 20:33:19 2012
@@ -133,6 +133,13 @@ public class RegionStates {
}
/**
+ * @return the server the specified region assigned to; null if not assigned.
+ */
+ public synchronized ServerName getAssignedServer(final HRegionInfo hri) {
+ return regionAssignments.get(hri);
+ }
+
+ /**
* Wait for the state map to be updated by assignment manager.
*/
public synchronized void waitForUpdate(
@@ -519,7 +526,11 @@ public class RegionStates {
try {
Pair<HRegionInfo, ServerName> p =
MetaReader.getRegion(server.getCatalogTracker(), regionName);
- return p == null ? null : p.getFirst();
+ HRegionInfo hri = p == null ? null : p.getFirst();
+ if (hri != null) {
+ createRegionState(hri);
+ }
+ return hri;
} catch (IOException e) {
server.abort("Aborting because error occoured while reading " +
Bytes.toStringBinary(regionName) + " from .META.", e);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Mon Sep 24 20:33:19 2012
@@ -39,7 +39,6 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClockOutOfSyncException;
import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.Server;
@@ -59,11 +58,9 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.OpenRegionResponse;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdResponse;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
-import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
@@ -78,6 +75,15 @@ import com.google.protobuf.ServiceExcept
* (hostname and port) as well as the startcode (timestamp from when the server
* was started). This is used to differentiate a restarted instance of a given
* server from the original instance.
+ * <p>
+ * If a sever is known not to be running any more, it is called dead. The dead
+ * server needs to be handled by a ServerShutdownHandler. If the handler is not
+ * enabled yet, the server can't be handled right away so it is queued up.
+ * After the handler is enabled, the server will be submitted to a handler to handle.
+ * However, the handler may be just partially enabled. If so,
+ * the server cannot be fully processed, and be queued up for further processing.
+ * A server is fully processed only after the handler is fully enabled
+ * and has completed the handling.
*/
@InterfaceAudience.Private
public class ServerManager {
@@ -117,12 +123,39 @@ public class ServerManager {
private final long warningSkew;
/**
- * Set of region servers which are dead but not expired immediately. If one
+ * Set of region servers which are dead but not processed immediately. If one
* server died before master enables ServerShutdownHandler, the server will be
- * added to set and will be expired through calling
- * {@link ServerManager#expireDeadNotExpiredServers()} by master.
+ * added to this set and will be processed through calling
+ * {@link ServerManager#processQueuedDeadServers()} by master.
+ * <p>
+ * A dead server is a server instance known to be dead, not listed in the /hbase/rs
+ * znode any more. It may have not been submitted to ServerShutdownHandler yet
+ * because the handler is not enabled.
+ * <p>
+ * A dead server, which has been submitted to ServerShutdownHandler while the
+ * handler is not enabled, is queued up.
+ * <p>
+ * So this is a set of region servers known to be dead but not submitted to
+ * ServerShutdownHander for processing yet.
+ */
+ private Set<ServerName> queuedDeadServers = new HashSet<ServerName>();
+
+ /**
+ * Set of region servers which are dead and submitted to ServerShutdownHandler to
+ * process but not fully processed immediately.
+ * <p>
+ * If one server died before assignment manager finished the failover cleanup, the server
+ * will be added to this set and will be processed through calling
+ * {@link ServerManager#processQueuedDeadServers()} by assignment manager.
+ * <p>
+ * For all the region servers in this set, HLog split is already completed.
+ * <p>
+ * ServerShutdownHandler processes a dead server submitted to the handler after
+ * the handler is enabled. It may not be able to complete the processing because root/meta
+ * is not yet online or master is currently in startup mode. In this case, the dead
+ * server will be parked in this set temporarily.
*/
- private Set<ServerName> deadNotExpiredServers = new HashSet<ServerName>();
+ private Set<ServerName> requeuedDeadServers = new HashSet<ServerName>();
/**
* Constructor.
@@ -326,18 +359,6 @@ public class ServerManager {
}
/**
- * @param address
- * @return ServerLoad if serverName is known else null
- * @deprecated Use {@link #getLoad(HServerAddress)}
- */
- public ServerLoad getLoad(final HServerAddress address) {
- ServerName sn = new ServerName(address.toString(), ServerName.NON_STARTCODE);
- ServerName actual =
- ServerName.findServerWithSameHostnamePort(this.getOnlineServersList(), sn);
- return actual == null? null: getLoad(actual);
- }
-
- /**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
* regions being served, ignoring stats about number of requests.
@@ -410,20 +431,19 @@ public class ServerManager {
}
/*
- * Expire the passed server. Add it to list of deadservers and queue a
+ * Expire the passed server. Add it to list of dead servers and queue a
* shutdown processing.
*/
public synchronized void expireServer(final ServerName serverName) {
if (!services.isServerShutdownHandlerEnabled()) {
LOG.info("Master doesn't enable ServerShutdownHandler during initialization, "
+ "delay expiring server " + serverName);
- this.deadNotExpiredServers.add(serverName);
+ this.queuedDeadServers.add(serverName);
return;
}
if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Received expiration of " + serverName +
" but server is not currently online");
- return;
}
if (this.deadservers.contains(serverName)) {
// TODO: Can this happen? It shouldn't be online in this case?
@@ -465,20 +485,47 @@ public class ServerManager {
carryingRoot + ", meta=" + carryingMeta);
}
+ public synchronized void processDeadServer(final ServerName serverName) {
+ // When assignment manager is cleaning up the zookeeper nodes and rebuilding the
+ // in-memory region states, region servers could be down. Root/meta table can and
+ // should be re-assigned, log splitting can be done too. However, it is better to
+ // wait till the cleanup is done before re-assigning user regions.
+ //
+ // We should not wait in the server shutdown handler thread since it can clog
+ // the handler threads and root/meta table could not be re-assigned in case
+ // the corresponding server is down. So we queue them up here instead.
+ if (!services.getAssignmentManager().isFailoverCleanupDone()) {
+ requeuedDeadServers.add(serverName);
+ return;
+ }
+
+ this.deadservers.add(serverName);
+ this.services.getExecutorService().submit(new ServerShutdownHandler(
+ this.master, this.services, this.deadservers, serverName, false));
+ }
+
/**
- * Expire the servers which died during master's initialization. It will be
- * called after HMaster#assignRootAndMeta.
- * @throws IOException
+ * Process the servers which died during master's initialization. It will be
+ * called after HMaster#assignRootAndMeta and AssignmentManager#joinCluster.
* */
- synchronized void expireDeadNotExpiredServers() throws IOException {
+ synchronized void processQueuedDeadServers() {
if (!services.isServerShutdownHandlerEnabled()) {
- throw new IOException("Master hasn't enabled ServerShutdownHandler ");
+ LOG.info("Master hasn't enabled ServerShutdownHandler");
}
- Iterator<ServerName> serverIterator = deadNotExpiredServers.iterator();
+ Iterator<ServerName> serverIterator = queuedDeadServers.iterator();
while (serverIterator.hasNext()) {
expireServer(serverIterator.next());
serverIterator.remove();
}
+
+ if (!services.getAssignmentManager().isFailoverCleanupDone()) {
+ LOG.info("AssignmentManager hasn't finished failover cleanup");
+ }
+ serverIterator = requeuedDeadServers.iterator();
+ while (serverIterator.hasNext()) {
+ processDeadServer(serverIterator.next());
+ serverIterator.remove();
+ }
}
/*
@@ -713,11 +760,23 @@ public class ServerManager {
* @return A copy of the internal set of deadNotExpired servers.
*/
Set<ServerName> getDeadNotExpiredServers() {
- return new HashSet<ServerName>(this.deadNotExpiredServers);
+ return new HashSet<ServerName>(this.queuedDeadServers);
}
public boolean isServerOnline(ServerName serverName) {
- return onlineServers.containsKey(serverName);
+ return serverName != null && onlineServers.containsKey(serverName);
+ }
+
+ /**
+ * Check if a server is known to be dead. A server can be online,
+ * or known to be dead, or unknown to this manager (i.e, not online,
+ * not known to be dead either. it is simply not tracked by the
+ * master any more, for example, a very old previous instance).
+ */
+ public synchronized boolean isServerDead(ServerName serverName) {
+ return serverName == null || deadservers.isDeadServer(serverName)
+ || queuedDeadServers.contains(serverName)
+ || requeuedDeadServers.contains(serverName);
}
public void shutdownCluster() {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java Mon Sep 24 20:33:19 2012
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
@@ -41,9 +40,7 @@ import org.apache.hadoop.hbase.master.As
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
-import org.apache.hadoop.hbase.master.ServerManager;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.zookeeper.KeeperException;
@@ -58,19 +55,15 @@ public class CreateTableHandler extends
private Configuration conf;
private final AssignmentManager assignmentManager;
private final CatalogTracker catalogTracker;
- private final ServerManager serverManager;
private final HRegionInfo [] newRegions;
public CreateTableHandler(Server server, MasterFileSystem fileSystemManager,
- ServerManager serverManager, HTableDescriptor hTableDescriptor,
- Configuration conf, HRegionInfo [] newRegions,
- CatalogTracker catalogTracker, AssignmentManager assignmentManager)
- throws NotAllMetaRegionsOnlineException, TableExistsException,
- IOException {
+ HTableDescriptor hTableDescriptor, Configuration conf, HRegionInfo [] newRegions,
+ CatalogTracker catalogTracker, AssignmentManager assignmentManager)
+ throws NotAllMetaRegionsOnlineException, TableExistsException, IOException {
super(server, EventType.C_M_CREATE_TABLE);
this.fileSystemManager = fileSystemManager;
- this.serverManager = serverManager;
this.hTableDescriptor = hTableDescriptor;
this.conf = conf;
this.newRegions = newRegions;
@@ -173,11 +166,10 @@ public class CreateTableHandler extends
}
// 4. Trigger immediate assignment of the regions in round-robin fashion
- List<ServerName> servers = serverManager.createDestinationServersList();
try {
List<HRegionInfo> regions = Arrays.asList(newRegions);
assignmentManager.getRegionStates().createRegionStates(regions);
- assignmentManager.assignUserRegions(regions, servers);
+ assignmentManager.assign(regions);
} catch (InterruptedException ie) {
LOG.error("Caught " + ie + " during round-robin assignment");
throw new IOException(ie);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java Mon Sep 24 20:33:19 2012
@@ -208,7 +208,7 @@ public class EnableTableHandler extends
}
} else {
try {
- assignmentManager.assignUserRegionsToOnlineServers(regions);
+ assignmentManager.assign(regions);
} catch (InterruptedException e) {
LOG.warn("Assignment was interrupted");
Thread.currentThread().interrupt();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java Mon Sep 24 20:33:19 2012
@@ -222,10 +222,13 @@ public class ServerShutdownHandler exten
// The solution here is to resubmit a ServerShutdownHandler request to process
// user regions on that server so that MetaServerShutdownHandler
// executor pool is always available.
- if (isCarryingRoot() || isCarryingMeta()) { // -ROOT- or .META.
- this.services.getExecutorService().submit(new ServerShutdownHandler(
- this.server, this.services, this.deadServers, serverName, false));
- this.deadServers.add(serverName);
+ //
+ // If AssignmentManager hasn't finished rebuilding user regions,
+ // we are not ready to assign dead regions either. So we re-queue up
+ // the dead server for further processing too.
+ if (isCarryingRoot() || isCarryingMeta() // -ROOT- or .META.
+ || !services.getAssignmentManager().isFailoverCleanupDone()) {
+ this.services.getServerManager().processDeadServer(serverName);
return;
}
@@ -267,6 +270,9 @@ public class ServerShutdownHandler exten
serverName + ", retrying META read", ioe);
}
}
+ if (this.server.isStopped()) {
+ throw new IOException("Server is stopped");
+ }
// Skip regions that were in transition unless CLOSING or PENDING_CLOSE
for (RegionState rit : regionsInTransition) {
@@ -347,11 +353,12 @@ public class ServerShutdownHandler exten
toAssignRegions.remove(hri);
}
}
- // Get all available servers
- List<ServerName> availableServers = services.getServerManager()
- .createDestinationServersList();
- this.services.getAssignmentManager().assign(toAssignRegions,
- availableServers);
+ try {
+ this.services.getAssignmentManager().assign(toAssignRegions);
+ } catch (InterruptedException ie) {
+ LOG.error("Caught " + ie + " during round-robin assignment");
+ throw new IOException(ie);
+ }
}
} finally {
this.deadServers.finish(serverName);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Mon Sep 24 20:33:19 2012
@@ -49,7 +49,6 @@ import java.util.concurrent.ConcurrentMa
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.management.ObjectName;
@@ -68,14 +67,15 @@ import org.apache.hadoop.hbase.DoNotRetr
import org.apache.hadoop.hbase.FailedSanityCheckException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionMovedException;
+import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableDescriptors;
@@ -167,11 +167,20 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.UnlockRowResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameBytesPair;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionInfo;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RpcRequestBody;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -214,23 +223,12 @@ import org.apache.hadoop.util.Reflection
import org.apache.hadoop.util.StringUtils;
import org.apache.zookeeper.KeeperException;
import org.codehaus.jackson.map.ObjectMapper;
-import com.google.protobuf.ServiceException;
-import org.apache.hadoop.hbase.ServerLoad;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.Coprocessor;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionLoad;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupResponse;
-import org.apache.hadoop.hbase.RegionServerStatusProtocol;
import com.google.common.base.Function;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import static org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKTable.java Mon Sep 24 20:33:19 2012
@@ -27,14 +27,11 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.DeserializationException;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.zookeeper.KeeperException;
-import com.google.protobuf.InvalidProtocolBufferException;
-
/**
* Helper class for table state tracking for use by {@link AssignmentManager}.
* Reads, caches and sets state up in zookeeper. If multiple read/write
@@ -305,4 +302,67 @@ public class ZKTable {
}
return disabledTables;
}
+
+ /**
+ * Gets a list of all the tables set as disabled in zookeeper.
+ * @return Set of disabled tables, empty Set if none
+ * @throws KeeperException
+ */
+ public static Set<String> getDisabledTables(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED);
+ }
+
+ /**
+ * Gets a list of all the tables set as disabling in zookeeper.
+ * @return Set of disabling tables, empty Set if none
+ * @throws KeeperException
+ */
+ public static Set<String> getDisablingTables(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLING);
+ }
+
+ /**
+ * Gets a list of all the tables set as enabling in zookeeper.
+ * @return Set of enabling tables, empty Set if none
+ * @throws KeeperException
+ */
+ public static Set<String> getEnablingTables(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ return getAllTables(zkw, ZooKeeperProtos.Table.State.ENABLING);
+ }
+
+ /**
+ * Gets a list of all the tables set as disabled in zookeeper.
+ * @return Set of disabled tables, empty Set if none
+ * @throws KeeperException
+ */
+ public static Set<String> getDisabledOrDisablingTables(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ return getAllTables(zkw, ZooKeeperProtos.Table.State.DISABLED,
+ ZooKeeperProtos.Table.State.DISABLING);
+ }
+
+ /**
+ * Gets a list of all the tables of specified states in zookeeper.
+ * @return Set of tables of specified states, empty Set if none
+ * @throws KeeperException
+ */
+ static Set<String> getAllTables(final ZooKeeperWatcher zkw,
+ final ZooKeeperProtos.Table.State... states) throws KeeperException {
+ Set<String> allTables = new HashSet<String>();
+ List<String> children =
+ ZKUtil.listChildrenNoWatch(zkw, zkw.tableZNode);
+ for (String child: children) {
+ ZooKeeperProtos.Table.State state = ZKTableReadOnly.getTableState(zkw, child);
+ for (ZooKeeperProtos.Table.State expectedState: states) {
+ if (state == expectedState) {
+ allTables.add(child);
+ break;
+ }
+ }
+ }
+ return allTables;
+ }
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/TestZooKeeper.java Mon Sep 24 20:33:19 2012
@@ -397,12 +397,13 @@ public class TestZooKeeper {
zk.close();
ZKUtil.createAndFailSilent(zk2, aclZnode);
}
-
- @Test
+
/**
* Test should not fail with NPE when getChildDataAndWatchForNewChildren
* invoked with wrongNode
*/
+ @Test
+ @SuppressWarnings("deprecation")
public void testGetChildDataAndWatchForNewChildrenShouldNotThrowNPE()
throws Exception {
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
@@ -443,7 +444,7 @@ public class TestZooKeeper {
* Tests whether the logs are split when master recovers from a expired zookeeper session and an
* RS goes down.
*/
- @Test(timeout = 60000)
+ @Test(timeout = 180000)
public void testLogSplittingAfterMasterRecoveryDueToZKExpiry() throws IOException,
KeeperException, InterruptedException {
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Mon Sep 24 20:33:19 2012
@@ -155,7 +155,7 @@ public class TestHCM {
table.put(put2);
assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
- // We can wait for all regions to be onlines, that makes log reading easier when debugging
+ // We can wait for all regions to be online, that makes log reading easier when debugging
while (TEST_UTIL.getMiniHBaseCluster().getMaster().
getAssignmentManager().getRegionStates().isRegionsInTransition()) {
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerCoprocessorExceptionWithAbort.java Mon Sep 24 20:33:19 2012
@@ -20,7 +20,6 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
-import java.io.InterruptedIOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
@@ -32,9 +31,6 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManager.java Mon Sep 24 20:33:19 2012
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.Table;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -337,6 +336,7 @@ public class TestAssignmentManager {
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
+ am.failoverCleanupDone.set(true);
try {
// Make sure our new AM gets callbacks; once registered, can't unregister.
// Thats ok because we make a new zk watcher for each test.
@@ -451,9 +451,10 @@ public class TestAssignmentManager {
// Create and startup an executor. This is used by AssignmentManager
// handling zk callbacks.
ExecutorService executor = startupMasterExecutor("testSSHWhenSplitRegionInProgress");
-
// We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
+ ZKAssign.deleteAllNodes(this.watcher);
+
// Create an AM.
AssignmentManagerWithExtrasForTesting am = setUpMockedAssignmentManager(
this.server, this.serverManager);
@@ -501,6 +502,8 @@ public class TestAssignmentManager {
// We need a mocked catalog tracker.
CatalogTracker ct = Mockito.mock(CatalogTracker.class);
LoadBalancer balancer = LoadBalancerFactory.getLoadBalancer(server.getConfiguration());
+ ZKAssign.deleteAllNodes(this.watcher);
+
// Create an AM.
AssignmentManager am = new AssignmentManager(this.server,
this.serverManager, ct, balancer, executor, null);
@@ -521,6 +524,7 @@ public class TestAssignmentManager {
String node = ZKAssign.getNodeName(this.watcher, REGIONINFO.getEncodedName());
// create znode in M_ZK_REGION_CLOSING state.
ZKUtil.createAndWatch(this.watcher, node, data.toByteArray());
+
try {
processServerShutdownHandler(ct, am, false);
// check znode deleted or not.
@@ -541,7 +545,7 @@ public class TestAssignmentManager {
ZKAssign.deleteAllNodes(this.watcher);
}
}
-
+
private void processServerShutdownHandler(CatalogTracker ct, AssignmentManager am, boolean splitRegion)
throws IOException, ServiceException {
// Make sure our new AM gets callbacks; once registered, can't unregister.
@@ -588,6 +592,7 @@ public class TestAssignmentManager {
Mockito.when(services.getZooKeeper()).thenReturn(this.watcher);
ServerShutdownHandler handler = new ServerShutdownHandler(this.server,
services, deadServers, SERVERNAME_A, false);
+ am.failoverCleanupDone.set(true);
handler.process();
// The region in r will have been assigned. It'll be up in zk as unassigned.
}
@@ -667,7 +672,7 @@ public class TestAssignmentManager {
};
((ZooKeeperWatcher) zkw).registerListener(am);
Mockito.doThrow(new InterruptedException()).when(recoverableZk)
- .getChildren("/hbase/unassigned", zkw);
+ .getChildren("/hbase/unassigned", null);
am.setWatcher((ZooKeeperWatcher) zkw);
try {
am.processDeadServersAndRegionsInTransition(null);
@@ -748,7 +753,7 @@ public class TestAssignmentManager {
am.shutdown();
}
}
-
+
/**
* Mocked load balancer class used in the testcase to make sure that the testcase waits until
* random assignment is called and the gate variable is set to true.
@@ -774,7 +779,7 @@ public class TestAssignmentManager {
return super.retainAssignment(regions, servers);
}
}
-
+
/**
* Test the scenario when the master is in failover and trying to process a
* region which is in Opening state on a dead RS. Master should immediately
@@ -791,8 +796,8 @@ public class TestAssignmentManager {
EventType.RS_ZK_REGION_OPENING, version);
RegionTransition rt = RegionTransition.createRegionTransition(EventType.RS_ZK_REGION_OPENING,
REGIONINFO.getRegionName(), SERVERNAME_A, HConstants.EMPTY_BYTE_ARRAY);
- Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers =
- new HashMap<ServerName, List<Pair<HRegionInfo, Result>>>();
+ Map<ServerName, List<HRegionInfo>> deadServers =
+ new HashMap<ServerName, List<HRegionInfo>>();
deadServers.put(SERVERNAME_A, null);
version = ZKAssign.getVersion(this.watcher, REGIONINFO);
am.gate.set(false);
@@ -804,7 +809,7 @@ public class TestAssignmentManager {
assertTrue("The region should be assigned immediately.", null != am.regionPlans.get(REGIONINFO
.getEncodedName()));
}
-
+
/**
* Test verifies whether assignment is skipped for regions of tables in DISABLING state during
* clean cluster startup. See HBASE-6281.
@@ -969,20 +974,12 @@ public class TestAssignmentManager {
@Override
boolean processRegionInTransition(String encodedRegionName,
HRegionInfo regionInfo,
- Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
+ Map<ServerName, List<HRegionInfo>> deadServers)
throws KeeperException, IOException {
this.processRITInvoked = true;
return super.processRegionInTransition(encodedRegionName, regionInfo,
deadServers);
}
- @Override
- void processRegionsInTransition(final RegionTransition rt,
- final HRegionInfo regionInfo,
- final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
- final int expectedVersion) throws KeeperException {
- while (this.gate.get()) Threads.sleep(1);
- super.processRegionsInTransition(rt, regionInfo, deadServers, expectedVersion);
- }
@Override
public void assign(HRegionInfo region, boolean setOfflineInZK, boolean forceNewPlan,
@@ -992,16 +989,16 @@ public class TestAssignmentManager {
}
@Override
- public void assign(java.util.List<HRegionInfo> regions, java.util.List<ServerName> servers)
- {
+ public void assign(List<HRegionInfo> regions)
+ throws IOException, InterruptedException {
assignInvoked = true;
- };
-
+ }
+
/** reset the watcher */
void setWatcher(ZooKeeperWatcher watcher) {
this.watcher = watcher;
}
-
+
/**
* @return ExecutorService used by this instance.
*/
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterFailover.java Mon Sep 24 20:33:19 2012
@@ -781,34 +781,14 @@ public class TestMasterFailover {
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
master.getAssignmentManager().getRegionStates().updateRegionState(
- region, RegionState.State.PENDING_OPEN, null);
+ region, RegionState.State.PENDING_OPEN);
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// PENDING_OPEN and disabled
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
master.getAssignmentManager().getRegionStates().updateRegionState(
- region, RegionState.State.PENDING_OPEN, null);
+ region, RegionState.State.PENDING_OPEN);
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
- // This test is bad. It puts up a PENDING_CLOSE but doesn't say what
- // server we were PENDING_CLOSE against -- i.e. an entry in
- // AssignmentManager#regions. W/o a server, we NPE trying to resend close.
- // In past, there was wonky logic that had us reassign region if no server
- // at tail of the unassign. This was removed. Commenting out for now.
- // TODO: Remove completely.
- /*
- // PENDING_CLOSE and enabled
- region = enabledRegions.remove(0);
- LOG.info("Setting PENDING_CLOSE enabled " + region.getEncodedName());
- regionsThatShouldBeOnline.add(region);
- master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
- new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
- // PENDING_CLOSE and disabled
- region = disabledRegions.remove(0);
- LOG.info("Setting PENDING_CLOSE disabled " + region.getEncodedName());
- regionsThatShouldBeOffline.add(region);
- master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
- new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
- */
// Failover should be completed, now wait for no RIT
log("Waiting for no more RIT");
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Mon Sep 24 20:33:19 2012
@@ -131,8 +131,8 @@ public class TestReplication {
LOG.info("Setup second Zk");
CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
- utility1.startMiniCluster(2);
- utility2.startMiniCluster(2);
+ utility1.startMiniCluster(3);
+ utility2.startMiniCluster(3);
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java?rev=1389561&r1=1389560&r2=1389561&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java Mon Sep 24 20:33:19 2012
@@ -52,7 +52,6 @@ import org.apache.hadoop.hbase.HRegionLo
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.RegionTransition;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.client.AdminProtocol;
@@ -65,7 +64,6 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.io.hfile.TestHFile;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.HMaster;
@@ -78,6 +76,7 @@ import org.apache.hadoop.hbase.util.HBas
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
@@ -138,6 +137,7 @@ public class TestHBaseFsck {
// point to a different region server
HTable meta = new HTable(conf, HTableDescriptor.META_TABLEDESC.getName());
ResultScanner scanner = meta.getScanner(new Scan());
+ HRegionInfo hri = null;
resforloop:
for (Result res : scanner) {
@@ -158,6 +158,7 @@ public class TestHBaseFsck {
put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
Bytes.toBytes(sn.getStartcode()));
meta.put(put);
+ hri = HRegionInfo.getHRegionInfo(res);
break resforloop;
}
}
@@ -167,10 +168,8 @@ public class TestHBaseFsck {
assertErrors(doFsck(conf, true), new ERROR_CODE[]{
ERROR_CODE.SERVER_DOES_NOT_MATCH_META});
- // fixing assignments require opening regions is not synchronous. To make
- // the test pass consistently so for now we bake in some sleep to let it
- // finish. 1s seems sufficient.
- Thread.sleep(1000);
+ TEST_UTIL.getHBaseCluster().getMaster()
+ .getAssignmentManager().waitForAssignment(hri);
// Should be fixed now
assertNoErrors(doFsck(conf, false));
@@ -318,18 +317,6 @@ public class TestHBaseFsck {
}
tbl.put(puts);
tbl.flushCommits();
- long endTime = System.currentTimeMillis() + 60000;
- while (!TEST_UTIL.getHBaseAdmin().isTableEnabled(tablename)) {
- try {
- if (System.currentTimeMillis() > endTime) {
- fail("Failed to enable table " + tablename + " after waiting for 60 sec");
- }
- Thread.sleep(100);
- } catch (InterruptedException e) {
- e.printStackTrace();
- fail("Interrupted when waiting table " + tablename + " to be enabled");
- }
- }
return tbl;
}
@@ -1117,15 +1104,19 @@ public class TestHBaseFsck {
// Region of disable table was opened on RS
TEST_UTIL.getHBaseAdmin().disableTable(table);
+ // Mess up ZKTable state, otherwise, can't open the region
+ ZKTable zkTable = cluster.getMaster().getAssignmentManager().getZKTable();
+ zkTable.setEnabledTable(table);
HRegionInfo region = disabledRegions.remove(0);
ZKAssign.createNodeOffline(zkw, region, serverName);
ProtobufUtil.openRegion(hrs, region);
int iTimes = 0;
+ byte[] regionName = region.getRegionName();
while (true) {
- byte[] data = ZKAssign.getData(zkw, region.getEncodedName());
- RegionTransition rt = data == null ? null : RegionTransition.parseFrom(data);
- if (rt == null || rt.getEventType() == EventType.RS_ZK_REGION_OPENED) {
+ if (cluster.getServerWith(regionName) != -1) {
+ // Now, region is deployed, reset the table state back
+ zkTable.setDisabledTable(table);
break;
}
Thread.sleep(100);