You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:41:52 UTC
svn commit: r1181915 - in /hbase/branches/0.89: bin/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/ipc/
src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/...
Author: nspiegelberg
Date: Tue Oct 11 17:41:51 2011
New Revision: 1181915
URL: http://svn.apache.org/viewvc?rev=1181915&view=rev
Log:
Preserving locality of HRegions on rolling_restarts
Summary:
I created a new jruby script file to take advantage of the HBaseAdmin to
directly hook into the running cluster and perform operations so as to not hack
at it with bash scripts and kill signals.
Then I added message processing between RSs and Master and holding mechanisms
(in memory) at master's side to keep the regions in unassigned mode and not give
them out to other needy servers.
I also fixed a problem with the load balancing code path interfering with
restarts due to some previous issues in the source that got the RS added to the
list of servers used for load calculation before its actual load was refreshed.
There are quite a couple of potential issues at the moment, but I'm putting
this up for initial overview.
Test Plan:
I ran a loadtest on my cluster with 3 regionservers, then started a
rolling-restart and checked in two browser tabs if the test-table region
distribution was identical before and after.
This should however be turned into some form of unit-test...
Reviewed By: kranganathan
Reviewers: kannan, nspiegelberg, liyintang, kranganathan
Commenters: kannan
CC: hbase@lists, bogdan, kannan, kranganathan
Differential Revision: 279896
Task ID: 619449
Added:
hbase/branches/0.89/bin/stop_regionserver_for_restart.rb (with props)
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/Restartable.java
Modified:
hbase/branches/0.89/bin/hbase-daemon.sh
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerAddress.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
Modified: hbase/branches/0.89/bin/hbase-daemon.sh
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/bin/hbase-daemon.sh?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/bin/hbase-daemon.sh (original)
+++ hbase/branches/0.89/bin/hbase-daemon.sh Tue Oct 11 17:41:51 2011
@@ -180,7 +180,19 @@ case $startStop in
thiscmd=$0
args=$@
# stop the command
- $thiscmd --config "${HBASE_CONF_DIR}" stop $command $args &
+ case $command in
+ (regionserver)
+ echo -n "Stopping regionserver for a restart"
+ $bin/hbase org.jruby.Main $bin/restart_regionserver.rb &
+ while kill -0 `cat $pid` > /dev/null 2>&1; do
+ echo -n "."
+ sleep 1;
+ done
+ ;;
+ (*)
+ $thiscmd --config "${HBASE_CONF_DIR}" stop $command $args &
+ ;;
+ esac
wait_until_done $!
# start the command
$thiscmd --config "${HBASE_CONF_DIR}" start $command $args &
Added: hbase/branches/0.89/bin/stop_regionserver_for_restart.rb
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/bin/stop_regionserver_for_restart.rb?rev=1181915&view=auto
==============================================================================
--- hbase/branches/0.89/bin/stop_regionserver_for_restart.rb (added)
+++ hbase/branches/0.89/bin/stop_regionserver_for_restart.rb Tue Oct 11 17:41:51 2011
@@ -0,0 +1,90 @@
+#
+# Copyright 2009 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Script will be used to stop a regionserver and flag to the master that it is going down for a restart
+#
+# To see usage for this script, run:
+#
+# ${HBASE_HOME}/bin/hbase org.jruby.Main stop_regionserver_for_restart.rb
+#
+include Java
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.ClusterStatus
+import org.apache.hadoop.hbase.HBaseConfiguration
+import org.apache.hadoop.hbase.HServerInfo
+import org.apache.hadoop.hbase.HServerAddress
+import org.apache.hadoop.hbase.client.HBaseAdmin
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.hbase.HConstants
+import org.apache.commons.logging.LogFactory
+import java.net.InetAddress
+
+# Name of this script
+NAME = "stop_regionserver_for_restart"
+
+# Print usage for this script
+def usage
+ puts 'Usage: %s.rb' % NAME
+ exit!
+end
+
+# Check arguments
+if ARGV.size > 0
+ usage
+end
+
+# Get configuration to use.
+c = HBaseConfiguration.create()
+
+# Taken from add_table.rb script
+# Set hadoop filesystem configuration using the hbase.rootdir.
+# Otherwise, we'll always use localhost though the hbase.rootdir
+# might be pointing at hdfs location.
+c.set("fs.default.name", c.get(HConstants::HBASE_DIR))
+
+# Get a logger instance.
+LOG = LogFactory.getLog(NAME)
+
+# get the admin interface
+admin = HBaseAdmin.new(c)
+
+hostname = InetAddress.getLocalHost().getHostName()
+port = c.getInt("hbase.regionserver.port", 0)
+
+if port > 0
+ address = HServerAddress.new(hostname, port)
+else
+ address = nil
+
+ # get the cluster servers
+ servers = admin.getClusterStatus().getServerInfo()
+
+ servers.each do |server|
+ if server.getServerAddress().getHostname() == InetAddress.getLocalHost().getHostName()
+ address = server.getServerAddress()
+ break
+ end
+ end
+end
+
+if address == nil
+ puts "invalid server"
+ exit
+end
+admin.stopRegionServerForRestart(address)
Propchange: hbase/branches/0.89/bin/stop_regionserver_for_restart.rb
------------------------------------------------------------------------------
svn:executable = *
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HMsg.java Tue Oct 11 17:41:51 2011
@@ -137,6 +137,14 @@ public class HMsg implements Writable {
* Run major compaction on a specific column family within a region.
*/
MSG_REGION_CF_MAJOR_COMPACT,
+
+ /**
+ * Region server is going down for restart
+ *
+ * Note that this message is followed by MSG_REPORT_CLOSE messages for each
+ * region the region server was serving, unless it was told to quiesce.
+ */
+ MSG_REPORT_EXITING_FOR_RESTART,
}
private Type type = null;
@@ -302,6 +310,7 @@ public class HMsg implements Writable {
/**
* @see org.apache.hadoop.io.Writable#write(java.io.DataOutput)
*/
+ @Override
public void write(DataOutput out) throws IOException {
out.writeInt(this.type.ordinal());
this.info.write(out);
@@ -320,6 +329,7 @@ public class HMsg implements Writable {
/**
* @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)
*/
+ @Override
public void readFields(DataInput in) throws IOException {
int ordinal = in.readInt();
this.type = HMsg.Type.values()[ordinal];
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerAddress.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerAddress.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerAddress.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/HServerAddress.java Tue Oct 11 17:41:51 2011
@@ -19,14 +19,14 @@
*/
package org.apache.hadoop.hbase;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.WritableComparable;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.WritableComparable;
/**
* HServerAddress is a "label" for a HBase server made of host and port number.
@@ -154,6 +154,7 @@ public class HServerAddress implements W
// Writable
//
+ @Override
public void readFields(DataInput in) throws IOException {
String bindAddress = in.readUTF();
int port = in.readInt();
@@ -168,6 +169,7 @@ public class HServerAddress implements W
}
}
+ @Override
public void write(DataOutput out) throws IOException {
if (address == null) {
out.writeUTF("");
@@ -182,6 +184,7 @@ public class HServerAddress implements W
// Comparable
//
+ @Override
public int compareTo(HServerAddress o) {
// Addresses as Strings may not compare though address is for the one
// server with only difference being that one address has hostname
Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/Restartable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/Restartable.java?rev=1181915&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/Restartable.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/Restartable.java Tue Oct 11 17:41:51 2011
@@ -0,0 +1,32 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+/**
+ * Implementers are restart-able
+ */
+public interface Restartable {
+
+ /**
+ * Restart this service
+ */
+ public void stopForRestart();
+}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java Tue Oct 11 17:41:51 2011
@@ -19,6 +19,15 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.InterruptedIOException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -28,6 +37,7 @@ import org.apache.hadoop.hbase.HColumnDe
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.RegionException;
@@ -44,14 +54,6 @@ import org.apache.hadoop.io.BooleanWrita
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
/**
* Provides an interface to manage HBase database table metadata + general
@@ -1099,8 +1101,24 @@ public class HBaseAdmin {
}
/**
+ * Stop the designated RegionServer for a restart.
+ *
+ * @param hsa
+ * the address of the RegionServer to stop
+ * @throws IOException
+ * if a remote or network exception occurs
+ */
+ public synchronized void stopRegionServerForRestart(final HServerAddress hsa)
+ throws IOException {
+ HRegionInterface rs = this.connection.getHRegionConnection(hsa);
+ LOG.info("Restarting RegionServer" + hsa.toString());
+ rs.stopForRestart();
+ }
+
+ /**
* @return cluster status
- * @throws IOException if a remote or network exception occurs
+ * @throws IOException
+ * if a remote or network exception occurs
*/
public ClusterStatus getClusterStatus() throws IOException {
if (this.master == null) {
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Tue Oct 11 17:41:51 2011
@@ -19,9 +19,13 @@
*/
package org.apache.hadoop.hbase.ipc;
+import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.Restartable;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -32,16 +36,13 @@ import org.apache.hadoop.hbase.client.Sc
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import java.io.IOException;
-import java.util.List;
-
/**
* Clients interact with HRegionServers using a handle to the HRegionInterface.
*
* <p>NOTE: if you change the interface, you must change the RPC version
* number in HBaseRPCProtocolVersion
*/
-public interface HRegionInterface extends HBaseRPCProtocolVersion {
+public interface HRegionInterface extends HBaseRPCProtocolVersion, Restartable {
/**
* Get metainfo about an HRegion
*
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Tue Oct 11 17:41:51 2011
@@ -31,14 +31,19 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.HConstants;
@@ -48,8 +53,8 @@ import org.apache.hadoop.hbase.HServerAd
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.executor.HBaseEventHandler.HBaseEventType;
+import org.apache.hadoop.hbase.executor.RegionTransitionEventData;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
@@ -97,6 +102,34 @@ public class RegionManager {
final SortedMap<String, RegionState> regionsInTransition =
Collections.synchronizedSortedMap(new TreeMap<String, RegionState>());
+ // RESTARTING LOGIC
+
+ /**
+ * Thread to handle timeout of restarting Region Servers
+ */
+ private final RestartingServerHandler restartingServerHandlerThread;
+
+ /**
+ * Map from all the restarting servers to the set of non-META / non-ROOT
+ * regions they were serving
+ */
+ private final Map<HServerAddress, Set<HRegionInfo>>
+ restartingServersToRegions =
+ new ConcurrentHashMap<HServerAddress, Set<HRegionInfo>>();
+
+ /**
+ * Set of all regions being held for restart. It is needed to maintain sync if
+ * multiple servers restart at the same time
+ */
+ private final Set<HRegionInfo> restartingRegions =
+ new TreeSet<HRegionInfo>();
+
+ /**
+ * Map from all restarting servers to their restart time.
+ */
+ private final DelayQueue<RestartingServer> restartingServers =
+ new DelayQueue<RestartingServer>();
+
// regions in transition are also recorded in ZK using the zk wrapper
final ZooKeeperWrapper zkWrapper;
@@ -171,6 +204,9 @@ public class RegionManager {
// Scans the meta table
metaScannerThread = new MetaScanner(master);
+ // scans for restarting regions timeout
+ this.restartingServerHandlerThread = new RestartingServerHandler();
+
zooKeeperNumRetries = conf.getInt(HConstants.ZOOKEEPER_RETRIES,
HConstants.DEFAULT_ZOOKEEPER_RETRIES);
zooKeeperPause = conf.getInt(HConstants.ZOOKEEPER_PAUSE,
@@ -180,6 +216,8 @@ public class RegionManager {
}
void start() {
+ Threads.setDaemonThreadRunning(restartingServerHandlerThread,
+ "RegionManager.restartingServerHandler");
Threads.setDaemonThreadRunning(rootScannerThread,
"RegionManager.rootScanner");
Threads.setDaemonThreadRunning(metaScannerThread,
@@ -259,9 +297,17 @@ public class RegionManager {
if (assignmentByLocality) {
quickStartRegionServerSet.add(hostName);
}
+
+ // this variable keeps track of the code path to go through; if true, than
+ // the server we are examining was registered as restarting and thus we
+ // should assign all the regions to it directly; else, we should go through
+ // the normal code path
+ MutableBoolean restaringServerAndOnTime = new MutableBoolean(false);
+
// get the region set to be assigned to this region server
regionsToAssign = regionsAwaitingAssignment(info.getServerAddress(),
- isSingleServer, assignmentByLocality, holdRegionForBestRegionServer,
+ isSingleServer, restaringServerAndOnTime, assignmentByLocality,
+ holdRegionForBestRegionServer,
quickStartRegionServerSet);
if (regionsToAssign.size() == 0) {
@@ -273,7 +319,8 @@ public class RegionManager {
} else {
// if there's only one server or assign the region by locality,
// just give the regions to this server
- if (isSingleServer || assignmentByLocality) {
+ if (isSingleServer || assignmentByLocality
+ || restaringServerAndOnTime.booleanValue()) {
assignRegionsToOneServer(regionsToAssign, info, returnMsgs);
} else {
// otherwise, give this server a few regions taking into account the
@@ -455,13 +502,43 @@ public class RegionManager {
* the monitor for RegionManager
*/
private Set<RegionState> regionsAwaitingAssignment(HServerAddress addr,
- boolean isSingleServer, boolean assignmentByLocality,
- boolean holdRegionForBestRegionserver,
+ boolean isSingleServer, MutableBoolean restaringServerAndOnTime,
+ boolean assignmentByLocality, boolean holdRegionForBestRegionserver,
Set<String> quickStartRegionServerSet) {
// set of regions we want to assign to this server
Set<RegionState> regionsToAssign = new HashSet<RegionState>();
+ // check if server is restarting
+ // take its information away as you this method is synchronized on
+ // regionsInTransition
+ Set<HRegionInfo> regions = unholdRestartingServer(addr);
+ if (null != regions) {
+ restaringServerAndOnTime.setValue(true);
+ LOG.debug("RegionServer " + addr.getHostname()
+ + " should receive regions " + regions
+ + " coming back from restart");
+ // One could use regionsInTransition.keySet().containsAll(regions) but
+ // this provides more control and probably the same complexity. Also, this
+ // gives direct logging of precise errors
+ for (HRegionInfo ri : regions) {
+ // no need for sync as caller owns monitor
+ RegionState state = regionsInTransition.get(ri.getRegionNameAsString());
+ if (null == state || !state.isUnassigned()) {
+ LOG.error("Region "
+ + ri
+ + (null == state ? " is not in transition" : " is now in state "
+ + state)
+ + " and is no longer available for assigning to previously owning RS "
+ + addr.getHostname());
+ } else {
+ regionsToAssign.add(state);
+ }
+ }
+ // return its initial regions ASAP
+ return regionsToAssign;
+ }
+
boolean isMetaServer = isMetaServer(addr);
boolean isRootServer = isRootServer(addr);
boolean isMetaOrRoot = isMetaServer || isRootServer;
@@ -513,6 +590,11 @@ public class RegionManager {
continue;
}
+ // if we are holding it, don't give it away to any other server
+ if (restartingRegions.contains(s.getRegionInfo())) {
+ continue;
+ }
+
if (assignmentByLocality && !i.isRootRegion() && !i.isMetaRegion()) {
String preferredHost =
this.master.getPreferredRegionToRegionServerMapping().get(name);
@@ -525,6 +607,7 @@ public class RegionManager {
continue;
}
}
+
if (s.isUnassigned()) {
regionsToAssign.add(s);
}
@@ -614,6 +697,7 @@ public class RegionManager {
* PathFilter that accepts hbase tables only.
*/
static class TableDirFilter implements PathFilter {
+ @Override
public boolean accept(final Path path) {
// skip the region servers' log dirs && version file
// HBASE-1112 want to separate the log dirs from table's data dirs by a
@@ -629,6 +713,7 @@ public class RegionManager {
* PathFilter that accepts all but compaction.dir names.
*/
static class RegionDirFilter implements PathFilter {
+ @Override
public boolean accept(Path path) {
return !path.getName().equals(HConstants.HREGION_COMPACTIONDIR_NAME);
}
@@ -1814,6 +1899,7 @@ public class RegionManager {
return Bytes.toString(getRegionName()).hashCode();
}
+ @Override
public int compareTo(RegionState o) {
if (o == null) {
return 1;
@@ -1821,4 +1907,154 @@ public class RegionManager {
return Bytes.compareTo(getRegionName(), o.getRegionName());
}
}
+
+ private class RestartingServerHandler extends Thread {
+ public RestartingServerHandler() {
+ }
+
+ @Override
+ public void run() {
+ LOG.debug("Started RestartingServerHandler");
+ while (!master.getClosed().get()) {
+ try {
+ // check if any servers' waiting time expired
+ RestartingServer server = restartingServers.poll(
+ master.getConfiguration().getInt(
+ HConstants.THREAD_WAKE_FREQUENCY, 30 * 1000),
+ TimeUnit.MILLISECONDS);
+ Set<HRegionInfo> regions;
+
+ if (null == server) {
+ continue;
+ }
+
+ regions = unholdRestartingServer(server.getServer()
+ .getServerAddress());
+ if (null != regions) {
+ LOG.info("RegionServer "
+ + server.getServer()
+ + " failed to report back after restart! Redistributing all of its regions "
+ + regions);
+ } else {
+ // the server came back and restarted properly
+ }
+ } catch (InterruptedException e) {
+ // no problem, just continue
+ continue;
+ }
+ }
+ }
+ }
+
+ private class RestartingServer implements Delayed {
+ private long creationTime;
+ private HServerInfo server;
+ private long millisecondDelay;
+
+ RestartingServer(HServerInfo server, long creationTime,
+ long millisecondDelay) {
+ this.server = server;
+ this.creationTime = creationTime;
+ this.millisecondDelay = millisecondDelay;
+ }
+
+ /**
+ * Method to get the server info back
+ *
+ * @return the server info for this respective regionserver
+ */
+ public HServerInfo getServer() {
+ return this.server;
+ }
+
+ @Override
+ public int compareTo(Delayed arg0) {
+ long delta = this.getDelay(TimeUnit.MILLISECONDS)
+ - arg0.getDelay(TimeUnit.MILLISECONDS);
+ return (this.equals(arg0) ? 0 : (delta > 0 ? 1 : (delta < 0 ? -1 : 0)));
+ }
+
+ @Override
+ public long getDelay(TimeUnit unit) {
+ return unit.convert(
+ (this.creationTime + millisecondDelay) - System.currentTimeMillis(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o instanceof RestartingServer) {
+ return ((RestartingServer) o).getServer().equals(this.server);
+ }
+
+ return false;
+ }
+ }
+
+ /**
+ * Method used to do housekeeping for holding regions for a RegionServer going
+ * down for a restart
+ *
+ * @param regionServer
+ * the RegionServer going down for a restart
+ * @param regions
+ * the HRegions it was previously serving
+ */
+ public void addRegionServerForRestart(final HServerInfo regionServer,
+ Set<HRegionInfo> regions) {
+ addRegionServerForRestart(regionServer, regions, master.getConfiguration()
+ .getLong("hbase.regionserver.restart.regionHoldPeriod", 60 * 1000));
+ }
+
+ /**
+ * Method used to do housekeeping for holding regions for a RegionServer going
+ * down for a restart
+ *
+ * @param regionServer
+ * the RegionServer going down for a restart
+ * @param regions
+ * the HRegions it was previously serving
+ * @param millisecondDelay
+ * the delay to wait until redistributing the regions from holding
+ */
+ public void addRegionServerForRestart(final HServerInfo regionServer,
+ Set<HRegionInfo> regions, long millisecondDelay) {
+
+ LOG.debug("Holding for server " + regionServer + " regions " + regions
+ + " for this much time " + millisecondDelay + " ms");
+
+ restartingServersToRegions.put(regionServer.getServerAddress(), regions);
+ synchronized (restartingRegions) {
+ restartingRegions.addAll(regions);
+ }
+
+ RestartingServer serv = new RestartingServer(regionServer,
+ System.currentTimeMillis(), millisecondDelay);
+ restartingServers.put(serv);
+ }
+
+ /**
+ * Removes all the information being held for restart purposes for this
+ * particular regionserver
+ *
+ * @param addr
+ * the address of the regionserver that went down for a restart
+ * @return the regions this regionserver was holding or null if this method
+ * already got called before
+ */
+ private Set<HRegionInfo> unholdRestartingServer(final HServerAddress addr) {
+ Set<HRegionInfo> regions = restartingServersToRegions.remove(addr);
+ if (null != regions) {
+ // no longer hold the regions
+ synchronized (restartingRegions) {
+ restartingRegions.removeAll(regions);
+ }
+ }
+
+ return regions;
+ }
+
+ public boolean isServerRestarting(final HServerInfo hsi) {
+ return restartingServersToRegions.containsKey(hsi.getServerAddress());
+ }
}
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Oct 11 17:41:51 2011
@@ -19,6 +19,19 @@
*/
package org.apache.hadoop.hbase.master;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -42,18 +55,6 @@ import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* The ServerManager class manages info about region servers - HServerInfo,
* load numbers, dying servers, etc.
@@ -273,6 +274,9 @@ public class ServerManager {
if (msgs[0].isType(HMsg.Type.MSG_REPORT_EXITING)) {
processRegionServerExit(info, msgs);
return HMsg.EMPTY_HMSG_ARRAY;
+ } else if (msgs[0].isType(HMsg.Type.MSG_REPORT_EXITING_FOR_RESTART)) {
+ processRegionServerRestart(serverInfo, msgs);
+ return HMsg.EMPTY_HMSG_ARRAY;
} else if (msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) {
LOG.info("Region server " + info.getServerName() + " quiesced");
this.quiescedServers.incrementAndGet();
@@ -336,6 +340,42 @@ public class ServerManager {
}
}
+ /**
+ * Region server is going down for a restart, trying to preserve locality of
+ * its regions for a fixed amount of time.
+ *
+ * @param serverInfo
+ * the server that is restarting
+ * @param msgs
+ * the initial restart message, followed by the regions it was
+ * serving
+ */
+ private void processRegionServerRestart(final HServerInfo serverInfo,
+ HMsg msgs[]) {
+
+ Set<HRegionInfo> regions = new TreeSet<HRegionInfo>();
+
+ // skipping first message
+ for (int i = 1; i < msgs.length; ++i) {
+ if (msgs[i].getRegionInfo().isMetaRegion()
+ || msgs[i].getRegionInfo().isRootRegion()) {
+ continue;
+ }
+ regions.add(msgs[i].getRegionInfo());
+ }
+
+ LOG.info("Region server " + serverInfo.getServerName()
+ + ": MSG_REPORT_EXITING_FOR_RESTART");
+
+ // set info for restarting -- maybe not do this if regions is empty...
+ // CRUCIAL that this is done before calling processRegionServerExit
+ master.getRegionManager().addRegionServerForRestart(serverInfo, regions);
+
+ // process normal HRegion closing
+ msgs[0] = new HMsg(HMsg.Type.MSG_REPORT_EXITING);
+ processRegionServerExit(serverInfo, msgs);
+ }
+
/*
* Region server is exiting with a clean shutdown.
*
@@ -721,9 +761,24 @@ public class ServerManager {
int numServers = 0;
double averageLoad = 0.0;
synchronized (serversToLoad) {
- numServers = serversToLoad.size();
- for (HServerLoad load : serversToLoad.values()) {
- totalLoad += load.getNumberOfRegions();
+ // numServers = serversToLoad.size();
+ // the above was not accurate as a server is first removed from the
+ // serversToServerInfo map, then from the serversToLoad map
+ numServers = serversToServerInfo.size();
+ for (Map.Entry<String, HServerLoad> entry : serversToLoad.entrySet()) {
+ HServerInfo hsi = serversToServerInfo.get(entry.getKey());
+ if (null != hsi) {
+ if (!this.master.getRegionManager().isServerRestarting(hsi)) {
+ totalLoad += entry.getValue().getNumberOfRegions();
+ } else {
+ // server is being processed for a restart, ignore for loadbalancing
+ // purposes
+ }
+ } else {
+ // this server has already been removed from the serversToServerInfo
+ // map, but not from the serversToLoad one yet, thus ignore it for
+ // loadbalancing purposes
+ }
}
averageLoad = (double)totalLoad / (double)numServers;
}
@@ -827,6 +882,7 @@ public class ServerManager {
this.server = hsi;
}
+ @Override
public void process(WatchedEvent event) {
if (!event.getType().equals(EventType.NodeDeleted)) {
LOG.warn("Unexpected event=" + event);
Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181915&r1=1181914&r2=1181915&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 17:41:51 2011
@@ -66,7 +66,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HMsg;
+import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
@@ -76,15 +78,13 @@ import org.apache.hadoop.hbase.HTableDes
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LeaseListener;
import org.apache.hadoop.hbase.Leases;
+import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.HMsg.Type;
-import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
@@ -132,6 +132,8 @@ public class HRegionServer implements HR
HBaseRPCErrorHandler, Runnable, Watcher, Stoppable {
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
+ private static final HMsg REPORT_RESTARTING = new HMsg(
+ Type.MSG_REPORT_EXITING_FOR_RESTART);
private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
@@ -148,6 +150,8 @@ public class HRegionServer implements HR
// debugging and unit tests.
protected volatile boolean abortRequested;
+ protected volatile boolean restartRequested;
+
private volatile boolean killed = false;
// If false, the file system has become unavailable
@@ -317,6 +321,7 @@ public class HRegionServer implements HR
* @throws IOException
*/
private void reinitialize() throws IOException {
+ this.restartRequested = false;
this.abortRequested = false;
this.stopRequested.set(false);
@@ -386,6 +391,7 @@ public class HRegionServer implements HR
* Either way we need to update our knowledge of the master.
* @param event WatchedEvent from ZooKeeper.
*/
+ @Override
public void process(WatchedEvent event) {
EventType type = event.getType();
KeeperState state = event.getState();
@@ -436,6 +442,7 @@ public class HRegionServer implements HR
* in with the HMaster, sending heartbeats & reports, and receiving HRegion
* load/unload instructions.
*/
+ @Override
public void run() {
regionServerThread = Thread.currentThread();
boolean quiesceRequested = false;
@@ -633,7 +640,11 @@ public class HRegionServer implements HR
}
try {
HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
- exitMsg[0] = REPORT_EXITING;
+ if (restartRequested) {
+ exitMsg[0] = REPORT_RESTARTING;
+ } else {
+ exitMsg[0] = REPORT_EXITING;
+ }
// Tell the master what regions we are/were serving
int i = 1;
for (HRegion region: closedRegions) {
@@ -876,6 +887,7 @@ public class HRegionServer implements HR
* @param e
* @return True if we OOME'd and are aborting.
*/
+ @Override
public boolean checkOOME(final Throwable e) {
boolean stop = false;
try {
@@ -1142,6 +1154,7 @@ public class HRegionServer implements HR
private void startServiceThreads() throws IOException {
String n = Thread.currentThread().getName();
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread t, Throwable e) {
abort("Uncaught exception in service thread " + t.getName(), e);
}
@@ -1254,6 +1267,7 @@ public class HRegionServer implements HR
* included in the list returned
* @return list of HLog files
*/
+ @Override
public List<String> getHLogsList(boolean rollCurrentHLog) throws IOException {
if (rollCurrentHLog) this.hlog.rollWriter();
return this.hlog.getHLogsList();
@@ -1263,6 +1277,7 @@ public class HRegionServer implements HR
* Sets a flag that will cause all the HRegionServer threads to shut down
* in an orderly fashion. Used by unit tests.
*/
+ @Override
public void stop() {
this.stopRequested.set(true);
synchronized(this) {
@@ -1272,6 +1287,16 @@ public class HRegionServer implements HR
}
/**
+ * Method to set the flag that will cause
+ */
+ @Override
+ public void stopForRestart() {
+ restartRequested = true;
+ LOG.info("Going down for a restart");
+ stop();
+ }
+
+ /**
* Cause the server to exit without closing the regions it is serving, the
* log it is using and without notifying the master.
* Used unit testing and on catastrophic events such as HDFS is yanked out
@@ -1446,6 +1471,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public void run() {
try {
while(!stopRequested.get()) {
@@ -1642,6 +1668,7 @@ public class HRegionServer implements HR
HRegion r = HRegion.newHRegion(dir, this.hlog, this.fs, conf, regionInfo,
this.cacheFlusher);
long seqid = r.initialize(new Progressable() {
+ @Override
public void progress() {
addProcessingMessage(regionInfo);
}
@@ -1664,6 +1691,7 @@ public class HRegionServer implements HR
getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri));
}
+ @Override
public void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
throws IOException {
RSZookeeperUpdater zkUpdater = null;
@@ -1800,6 +1828,7 @@ public class HRegionServer implements HR
// HRegionInterface
//
+ @Override
public HRegionInfo getRegionInfo(final byte [] regionName)
throws NotServingRegionException {
requestCount.incrementAndGet();
@@ -1807,6 +1836,7 @@ public class HRegionServer implements HR
}
+ @Override
public Result getClosestRowBefore(final byte [] regionName,
final byte [] row, final byte [] family)
throws IOException {
@@ -1825,6 +1855,7 @@ public class HRegionServer implements HR
}
/** {@inheritDoc} */
+ @Override
public Result get(byte [] regionName, Get get) throws IOException {
checkOpen();
requestCount.incrementAndGet();
@@ -1836,6 +1867,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public boolean exists(byte [] regionName, Get get) throws IOException {
checkOpen();
requestCount.incrementAndGet();
@@ -1848,6 +1880,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public void put(final byte [] regionName, final Put put)
throws IOException {
if (put.getRow() == null)
@@ -1867,6 +1900,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public int put(final byte[] regionName, final List<Put> puts)
throws IOException {
checkOpen();
@@ -1927,6 +1961,7 @@ public class HRegionServer implements HR
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
+ @Override
public boolean checkAndPut(final byte[] regionName, final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Put put) throws IOException{
@@ -1945,6 +1980,7 @@ public class HRegionServer implements HR
* @throws IOException
* @return true if the new put was execute, false otherwise
*/
+ @Override
public boolean checkAndDelete(final byte[] regionName, final byte [] row,
final byte [] family, final byte [] qualifier, final byte [] value,
final Delete delete) throws IOException{
@@ -1956,6 +1992,7 @@ public class HRegionServer implements HR
// remote scanner interface
//
+ @Override
public long openScanner(byte [] regionName, Scan scan)
throws IOException {
checkOpen();
@@ -1987,6 +2024,7 @@ public class HRegionServer implements HR
return scannerId;
}
+ @Override
public Result next(final long scannerId) throws IOException {
Result [] res = next(scannerId, 1);
if(res == null || res.length == 0) {
@@ -1995,6 +2033,7 @@ public class HRegionServer implements HR
return res[0];
}
+ @Override
public Result [] next(final long scannerId, int nbRows) throws IOException {
try {
String scannerName = String.valueOf(scannerId);
@@ -2046,6 +2085,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public void close(final long scannerId) throws IOException {
try {
checkOpen();
@@ -2072,6 +2112,7 @@ public class HRegionServer implements HR
this.scannerName = n;
}
+ @Override
public void leaseExpired() {
LOG.info("Scanner " + this.scannerName + " lease expired");
InternalScanner s = scanners.remove(this.scannerName);
@@ -2088,6 +2129,7 @@ public class HRegionServer implements HR
//
// Methods that do the actual work for the remote API
//
+ @Override
public void delete(final byte [] regionName, final Delete delete)
throws IOException {
checkOpen();
@@ -2105,6 +2147,7 @@ public class HRegionServer implements HR
}
}
+ @Override
public int delete(final byte[] regionName, final List<Delete> deletes)
throws IOException {
// Count of Deletes processed.
@@ -2136,6 +2179,7 @@ public class HRegionServer implements HR
return -1;
}
+ @Override
public long lockRow(byte [] regionName, byte [] row)
throws IOException {
checkOpen();
@@ -2194,6 +2238,7 @@ public class HRegionServer implements HR
return rl;
}
+ @Override
public void unlockRow(byte [] regionName, long lockId)
throws IOException {
checkOpen();
@@ -2248,6 +2293,7 @@ public class HRegionServer implements HR
this.region = region;
}
+ @Override
public void leaseExpired() {
LOG.info("Row Lock " + this.lockName + " lease expired");
Integer r = rowlocks.remove(this.lockName);
@@ -2289,10 +2335,12 @@ public class HRegionServer implements HR
return Collections.unmodifiableCollection(onlineRegions.values());
}
+ @Override
public HRegion [] getOnlineRegionsAsArray() {
return getOnlineRegions().toArray(new HRegion[0]);
}
+ @Override
public List<String> getStoreFileList(byte[] regionName, byte[] columnFamily)
throws IllegalArgumentException {
HRegion region = getOnlineRegion(regionName);
@@ -2316,6 +2364,7 @@ public class HRegionServer implements HR
/**
* Flushes the given region
*/
+ @Override
public void flushRegion(byte[] regionName)
throws IllegalArgumentException, IOException {
HRegion region = getOnlineRegion(regionName);
@@ -2415,6 +2464,7 @@ public class HRegionServer implements HR
// we'll sort the regions in reverse
SortedMap<Long, HRegion> sortedRegions = new TreeMap<Long, HRegion>(
new Comparator<Long>() {
+ @Override
public int compare(Long a, Long b) {
return -1 * a.compareTo(b);
}
@@ -2529,6 +2579,7 @@ public class HRegionServer implements HR
return regionsToCheck;
}
+ @Override
public long getProtocolVersion(final String protocol,
final long clientVersion)
throws IOException {
@@ -2572,6 +2623,7 @@ public class HRegionServer implements HR
public HServerInfo getServerInfo() { return this.serverInfo; }
/** {@inheritDoc} */
+ @Override
public long incrementColumnValue(byte [] regionName, byte [] row,
byte [] family, byte [] qualifier, long amount, boolean writeToWAL)
throws IOException {
@@ -2595,6 +2647,7 @@ public class HRegionServer implements HR
}
/** {@inheritDoc} */
+ @Override
public HRegionInfo[] getRegionsAssignment() throws IOException {
HRegionInfo[] regions = new HRegionInfo[onlineRegions.size()];
Iterator<HRegion> ite = onlineRegions.values().iterator();
@@ -2605,6 +2658,7 @@ public class HRegionServer implements HR
}
/** {@inheritDoc} */
+ @Override
public HServerInfo getHServerInfo() throws IOException {
return serverInfo;
}
@@ -2636,6 +2690,7 @@ public class HRegionServer implements HR
return resp;
}
+ @Override
public String toString() {
return this.serverInfo.toString();
}