You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/06/20 14:50:52 UTC
svn commit: r1352071 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/master/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/zookeeper/
test/java/org/apache/hadoop/hbase/master/
Author: mbautin
Date: Wed Jun 20 12:50:51 2012
New Revision: 1352071
URL: http://svn.apache.org/viewvc?rev=1352071&view=rev
Log:
[master] Improve regionserver generation timestamp handling on check-in; handle failures to write a RS ZK node
Author: mbautin
Summary:
There are three related changes here:
- Make the master correctly handle the regionserver generation timestamp on check-in (expire the older server out of the existing one for the host:port pair and the one checking in; do nothing if the two have the same generation timestamp)
- Avoid writing the regionserver ZK node on the master side when setting a watch on it
- Correctly handle failures to write the RS ZK node on the regionserver side and to set watch on it on the master side
Additionally, I am moving the server -> load and load -> server map manipulation logic from ServerManager to the new ServerLoadMap class and adding a unit test.
Test Plan: Unit tests
Reviewers: pkhemani, kranganathan, kannan
Reviewed By: pkhemani
Differential Revision: https://reviews.facebook.net/D3699
Added:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Jun 20 12:50:51 2012
@@ -142,8 +142,7 @@ public class AssignmentManager {
// shutting down.
if (info != null &&
!master.getServerManager().isDead(info.getServerName()) &&
- master.getServerManager().getServersToLoad()
- .get(info.getServerName()) != null) {
+ master.getServerManager().getServersToServerInfo().get(info.getServerName()) != null) {
LOG.info("Add a transient assignment from the assignment plan: "
+ " region " + region.getRegionNameAsString() + " to the "
+ positions[i] + " region server" + info.getHostnamePort());
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jun 20 12:50:51 2012
@@ -38,7 +38,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.Random;
import java.util.Set;
-import java.util.SortedMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.HRegionIn
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LocalHBaseCluster;
@@ -695,11 +693,6 @@ public class HMaster extends Thread impl
return this.zooKeeperWrapper;
}
- // These methods are so don't have to pollute RegionManager with ServerManager.
- SortedMap<HServerLoad, Set<String>> getLoadToServers() {
- return this.serverManager.getLoadToServers();
- }
-
int numServers() {
return this.serverManager.numServers();
}
@@ -720,17 +713,6 @@ public class HMaster extends Thread impl
return this.oldLogDir;
}
- /**
- * Add to the passed <code>m</code> servers that are loaded less than
- * <code>l</code>.
- * @param l
- * @param m
- */
- void getLightServers(final HServerLoad l,
- SortedMap<HServerLoad, Set<String>> m) {
- this.serverManager.getLightServers(l, m);
- }
-
/** Main processing loop */
@Override
public void run() {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Wed Jun 20 12:50:51 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -500,15 +501,14 @@ public class RegionManager {
*/
private int regionsToGiveOtherServers(final int numUnassignedRegions,
final HServerLoad thisServersLoad) {
- SortedMap<HServerLoad, Set<String>> lightServers =
- new TreeMap<HServerLoad, Set<String>>();
- this.master.getLightServers(thisServersLoad, lightServers);
+ SortedMap<HServerLoad, Collection<String>> lightServers =
+ master.getServerManager().getServersToLoad().getLightServers(thisServersLoad);
// Examine the list of servers that are more lightly loaded than this one.
// Pretend that we will assign regions to these more lightly loaded servers
// until they reach load equal with ours. Then, see how many regions are
// left unassigned. That is how many regions we should assign to this server
int nRegions = 0;
- for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
+ for (Map.Entry<HServerLoad, Collection<String>> e : lightServers.entrySet()) {
HServerLoad lightLoad = new HServerLoad(e.getKey());
do {
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
@@ -778,15 +778,11 @@ public class RegionManager {
private int computeNextHeaviestLoad(HServerLoad referenceLoad,
HServerLoad heavierLoad) {
- SortedMap<HServerLoad, Set<String>> heavyServers =
- new TreeMap<HServerLoad, Set<String>>();
- synchronized (master.getLoadToServers()) {
- heavyServers.putAll(
- master.getLoadToServers().tailMap(referenceLoad));
- }
+ SortedMap<HServerLoad, Collection<String>> heavyServers = master.getServerManager().
+ getServersToLoad().getHeavyServers(referenceLoad);
int nservers = 0;
- for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
- Set<String> servers = e.getValue();
+ for (Map.Entry<HServerLoad, Collection<String>> e : heavyServers.entrySet()) {
+ Collection<String> servers = e.getValue();
nservers += servers.size();
if (e.getKey().compareTo(referenceLoad) == 0) {
// This is the load factor of the server we are considering
@@ -2154,21 +2150,21 @@ public class RegionManager {
private int balanceToLowloaded(String srvName, HServerLoad srvLoad,
double avgLoad) {
- SortedMap<HServerLoad, Set<String>> loadToServers =
- master.getLoadToServers();
- // check if server most loaded
- if (!loadToServers.get(loadToServers.lastKey()).contains(srvName))
+ ServerLoadMap<HServerLoad> serverLoadMap = master.getServerManager().getServersToLoad();
+
+ if (!serverLoadMap.isMostLoadedServer(srvName))
return 0;
// this server is most loaded, we will try to unload it by lowest
// loaded servers
int avgLoadMinusSlop = (int)Math.floor(avgLoad * (1 - this.slop)) - 1;
- int lowestLoad = loadToServers.firstKey().getNumberOfRegions();
+ HServerLoad lowestServerLoad = serverLoadMap.getLowestLoad();
+ int lowestLoad = lowestServerLoad.getNumberOfRegions();
if(lowestLoad >= avgLoadMinusSlop)
return 0; // there is no low loaded servers
- int lowSrvCount = loadToServers.get(loadToServers.firstKey()).size();
+ int lowSrvCount = serverLoadMap.numServersByLoad(lowestServerLoad);
int numSrvRegs = srvLoad.getNumberOfRegions();
int numMoveToLowLoaded = (avgLoadMinusSlop - lowestLoad) * lowSrvCount;
Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java?rev=1352071&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java Wed Jun 20 12:50:51 2012
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.collect.TreeMultimap;
+
+/** Maintains a map from regionservers name to server load and vice versa */
+public class ServerLoadMap<L extends Comparable<?>> {
+
+ /** SortedMap server load -> Set of server names */
+ private final TreeMultimap<L, String> loadToServers = TreeMultimap.create();
+
+ /** Map of server names -> server load */
+ private final Map<String, L> serversToLoad = new HashMap<String, L>();
+
+ private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+ public void updateServerLoad(String serverName, L load) {
+ rwLock.writeLock().lock();
+ try {
+ L currentLoad = serversToLoad.get(serverName);
+ if (load != null && load.equals(currentLoad)) {
+ // The load for this server is already the same as what we are trying to set.
+ return;
+ }
+
+ if (currentLoad != null) {
+ // Remove the server from its current load bucket.
+ loadToServers.remove(currentLoad, serverName);
+ }
+
+ if (load != null) {
+ // Set the new load for the server and add it to its new load bucket.
+ serversToLoad.put(serverName, load);
+ loadToServers.put(load, serverName);
+ } else {
+ // Remove server -> load mapping.
+ serversToLoad.remove(serverName);
+ }
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ public void removeServerLoad(String serverName) {
+ rwLock.writeLock().lock();
+ try {
+ L load = serversToLoad.remove(serverName);
+ if (load != null) {
+ loadToServers.remove(load, serverName);
+ }
+ } finally {
+ rwLock.writeLock().unlock();
+ }
+ }
+
+ public L get(String serverName) {
+ rwLock.readLock().lock();
+ try {
+ return serversToLoad.get(serverName);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ /** Takes a snapshot of all entries in the server to load map */
+ public List<Map.Entry<String, L>> entries() {
+ rwLock.readLock().lock();
+ try {
+ return new ArrayList<Map.Entry<String, L>>(serversToLoad.entrySet());
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public SortedMap<L, Collection<String>> getLightServers(L referenceLoad) {
+ rwLock.readLock().lock();
+ try {
+ SortedMap<L, Collection<String>> lightServers =
+ new TreeMap<L, Collection<String>>();
+ lightServers.putAll(loadToServers.asMap().headMap(referenceLoad));
+ return lightServers;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public SortedMap<L, Collection<String>> getHeavyServers(L referenceLoad) {
+ rwLock.readLock().lock();
+ try {
+ SortedMap<L, Collection<String>> heavyServers =
+ new TreeMap<L, Collection<String>>();
+ heavyServers.putAll(loadToServers.asMap().tailMap(referenceLoad));
+ return heavyServers;
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public int size() {
+ rwLock.readLock().lock();
+ try {
+ return serversToLoad.size();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public boolean isMostLoadedServer(String serverName) {
+ rwLock.readLock().lock();
+ try {
+ L maxLoad = loadToServers.asMap().lastKey();
+ if (maxLoad == null) {
+ return false;
+ }
+ return loadToServers.get(maxLoad).contains(serverName);
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public L getLowestLoad() {
+ rwLock.readLock().lock();
+ try {
+ return loadToServers.asMap().firstKey();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+ public int numServersByLoad(L load) {
+ rwLock.readLock().lock();
+ try {
+ Set<String> servers = loadToServers.get(load);
+ if (servers == null) {
+ return 0;
+ }
+ return servers.size();
+ } finally {
+ rwLock.readLock().unlock();
+ }
+ }
+
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Jun 20 12:50:51 2012
@@ -28,12 +28,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -48,7 +45,6 @@ import org.apache.hadoop.hbase.HRegionLo
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HServerLoad;
-import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.client.Get;
@@ -100,13 +96,9 @@ public class ServerManager {
* to be piggybacked upon the next processMsgs;
*/
private ConcurrentHashMap<HServerInfo, ArrayList<HMsg>> pendingMsgsToSvrsMap;
-
- // SortedMap server load -> Set of server names
- private final SortedMap<HServerLoad, Set<String>> loadToServers =
- Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
- // Map of server names -> server load
- private final Map<String, HServerLoad> serversToLoad =
- new ConcurrentHashMap<String, HServerLoad>();
+
+ /** A bidirectional map between server names and their load */
+ private ServerLoadMap<HServerLoad> serversToLoad = new ServerLoadMap<HServerLoad>();
private HMaster master;
@@ -216,11 +208,19 @@ public class ServerManager {
String message = "Server start rejected; we already have " + hostAndPort +
" registered; existingServer=" + existingServer + ", newServer=" + info;
LOG.info(message);
- if (existingServer.getStartCode() < info.getStartCode()) {
+ long existingStartCode = existingServer.getStartCode();
+ long newStartCode = info.getStartCode();
+ if (existingStartCode < newStartCode) {
LOG.info("Triggering server recovery; existingServer looks stale");
expireServer(existingServer);
+ } else if (existingStartCode == newStartCode) {
+ LOG.debug("Duplicate region server check-in with start code " + existingStartCode + ": " +
+ info.getServerName() + ", processing normally");
+ } else {
+ LOG.info("Newer server has already checked in, telling the old server to stop");
+ throw new YouAreDeadException("A new server with start code " + existingStartCode
+ + " is already online for " + info.getHostnamePort());
}
- throw new PleaseHoldException(message);
}
checkIsDead(info.getServerName(), "STARTUP");
LOG.info("Received start message from: " + info.getServerName());
@@ -257,33 +257,48 @@ public class ServerManager {
* Adds the HSI to the RS list and creates an empty load
* @param info The region server informations
*/
- public void recordNewServer(HServerInfo info) {
+ public void recordNewServer(HServerInfo info) throws IOException {
recordNewServer(info, false);
}
+ /** Restore the old value for the given key in a map */
+ private static <K, V> void undoMapUpdate(Map<K, V> m, K key, V oldValue) {
+ if (oldValue == null) {
+ m.remove(key);
+ } else {
+ m.put(key, oldValue);
+ }
+ }
+
/**
* Adds the HSI to the RS list
* @param info The region server informations
* @param useInfoLoad True if the load from the info should be used
* like under a master failover
*/
- void recordNewServer(HServerInfo info, boolean useInfoLoad) {
+ void recordNewServer(HServerInfo info, boolean useInfoLoad) throws IOException {
HServerLoad load = useInfoLoad ? info.getLoad() : new HServerLoad();
String serverName = info.getServerName();
info.setLoad(load);
// We must set this watcher here because it can be set on a fresh start
// or on a failover
Watcher watcher = new ServerExpirer(new HServerInfo(info));
- this.master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
+
+ // Save the old values so we can rollback if we fail setting the ZK watch
+ HServerInfo oldServerInfo = serversToServerInfo.get(serverName);
+ HServerLoad oldServerLoad = serversToLoad.get(serverName);
+
this.serversToServerInfo.put(serverName, info);
- this.serversToLoad.put(serverName, load);
- synchronized (this.loadToServers) {
- Set<String> servers = this.loadToServers.get(load);
- if (servers == null) {
- servers = new HashSet<String>();
- }
- servers.add(serverName);
- this.loadToServers.put(load, servers);
+ serversToLoad.updateServerLoad(serverName, load);
+
+ // Setting a watch after making changes to internal server to server info / load data
+ // structures because the watch can fire immediately after being set.
+ if (!this.master.getZooKeeperWrapper().setRSLocationWatch(info, watcher)) {
+ // Could not set a watch, undo the above changes.
+ serversToLoad.updateServerLoad(serverName, oldServerLoad);
+ undoMapUpdate(serversToServerInfo, serverName, oldServerInfo);
+ throw new IOException("Could not set a watch on regionserver location "
+ + info.getServerName());
}
}
@@ -474,14 +489,11 @@ public class ServerManager {
throws IOException {
// Refresh the info object and the load information
this.serversToServerInfo.put(serverInfo.getServerName(), serverInfo);
- HServerLoad oldLoad = this.serversToLoad.get(serverInfo.getServerName());
+ HServerLoad oldLoad = serversToLoad.get(serverInfo.getServerName());
HServerLoad newLoad = serverInfo.getLoad();
if (oldLoad != null) {
// XXX why are we using oldLoad to update metrics
this.master.getMetrics().incrementRequests(oldLoad.getNumberOfRequests());
- if (!oldLoad.equals(newLoad)) {
- updateLoadToServers(serverInfo.getServerName(), oldLoad);
- }
}
// Set the current load information
@@ -491,15 +503,7 @@ public class ServerManager {
serverInfo.getServerName());
}
newLoad.expireAfter = Long.MAX_VALUE;
- this.serversToLoad.put(serverInfo.getServerName(), newLoad);
- synchronized (loadToServers) {
- Set<String> servers = this.loadToServers.get(newLoad);
- if (servers == null) {
- servers = new HashSet<String>();
- }
- servers.add(serverInfo.getServerName());
- this.loadToServers.put(newLoad, servers);
- }
+ serversToLoad.updateServerLoad(serverInfo.getServerName(), newLoad);
// Next, process messages for this server
return processMsgs(serverInfo, mostLoadedRegions, msgs);
@@ -582,7 +586,7 @@ public class ServerManager {
// Production code path.
master.getRegionManager().assignRegions(serverInfo,
mostLoadedRegions, returnMsgs);
- } else {
+ } else if (mostLoadedRegions.length > 0) {
// UNIT TESTS ONLY.
// We just don't assign anything to "blacklisted" regionservers as
// required by a unit test (for determinism). This is OK because
@@ -785,8 +789,8 @@ public class ServerManager {
this.master.getRegionManager().unsetRootRegion();
if (region.isOffline()) {
// Can't proceed without root region. Shutdown.
- LOG.fatal("root region is marked offline");
- this.master.shutdown();
+ LOG.fatal("root region is marked offline, shutting down the cluster");
+ master.requestClusterShutdown();
return;
}
@@ -846,27 +850,11 @@ public class ServerManager {
}
infoUpdated = true;
- // update load information
- updateLoadToServers(serverName, this.serversToLoad.remove(serverName));
+ serversToLoad.removeServerLoad(serverName);
}
return infoUpdated;
}
- private void updateLoadToServers(final String serverName,
- final HServerLoad load) {
- if (load == null) return;
- synchronized (this.loadToServers) {
- Set<String> servers = this.loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- if (servers.size() > 0)
- this.loadToServers.put(load, servers);
- else
- this.loadToServers.remove(load);
- }
- }
- }
-
/**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
@@ -878,19 +866,11 @@ public class ServerManager {
int numServers = 0;
double averageLoad = 0.0;
synchronized (serversToLoad) {
- // numServers = serversToLoad.size();
- // the above was not accurate as a server is first removed from the
- // serversToServerInfo map, then from the serversToLoad map
- numServers = serversToServerInfo.size();
- for (Map.Entry<String, HServerLoad> entry : serversToLoad.entrySet()) {
+ for (Map.Entry<String, HServerLoad> entry : serversToLoad.entries()) {
HServerInfo hsi = serversToServerInfo.get(entry.getKey());
if (null != hsi) {
totalLoad += entry.getValue().getNumberOfRegions();
- } else {
- // this server has already been removed from the serversToServerInfo
- // map, but not from the serversToLoad one yet, thus ignore it for
- // loadbalancing purposes
- numServers--;
+ numServers++;
}
}
if (numServers > 0) {
@@ -934,22 +914,6 @@ public class ServerManager {
}
/**
- * @return Read-only map of servers to load.
- */
- public Map<String, HServerLoad> getServersToLoad() {
- return Collections.unmodifiableMap(serversToLoad);
- }
-
- /**
- * @return Read-only map of load to servers.
- */
- public SortedMap<HServerLoad, Set<String>> getLoadToServers() {
- synchronized (this.loadToServers) {
- return Collections.unmodifiableSortedMap(this.loadToServers);
- }
- }
-
- /**
* There is no way to guarantee that the returned servers are really online
*
* @return the list of the HServerAddress for all the online region servers.
@@ -1034,16 +998,7 @@ public class ServerManager {
}
// Remove the server from the known servers lists and update load info
this.serversToServerInfo.remove(serverName);
- HServerLoad load = this.serversToLoad.remove(serverName);
- if (load != null) {
- synchronized (this.loadToServers) {
- Set<String> servers = this.loadToServers.get(load);
- if (servers != null) {
- servers.remove(serverName);
- if (servers.isEmpty()) this.loadToServers.remove(load);
- }
- }
- }
+ serversToLoad.removeServerLoad(serverName);
// Add to dead servers and queue a shutdown processing.
LOG.debug("Added=" + serverName +
" to dead servers, added shutdown processing operation");
@@ -1092,17 +1047,8 @@ public class ServerManager {
return this.deadServers;
}
- /**
- * Add to the passed <code>m</code> servers that are loaded less than
- * <code>l</code>.
- * @param l
- * @param m
- */
- void getLightServers(final HServerLoad l,
- SortedMap<HServerLoad, Set<String>> m) {
- synchronized (this.loadToServers) {
- m.putAll(this.loadToServers.headMap(l));
- }
+ public ServerLoadMap<HServerLoad> getServersToLoad() {
+ return serversToLoad;
}
public boolean hasEnoughRegionServers() {
@@ -1205,11 +1151,12 @@ public class ServerManager {
long curTime = EnvironmentEdgeManager.currentTimeMillis();
boolean waitingForMoreServersInRackToTimeOut = false;
boolean reportDetails = false;
+ int serverCount = serversToLoad.size();
if ((curTime > lastDetailedLogAt + (3600 * 1000)) ||
- lastLoggedServerCount != serversToLoad.size()) {
+ lastLoggedServerCount != serverCount) {
reportDetails = true;
lastDetailedLogAt = curTime;
- lastLoggedServerCount = serversToLoad.size();
+ lastLoggedServerCount = serverCount;
}
// rack -> time of last report from rack
Map<String, Long> rackLastReportAtMap = new HashMap<String, Long>();
@@ -1219,7 +1166,7 @@ public class ServerManager {
// rack -> #servers in rack
Map<String, Integer> rackNumServersMap =
new HashMap<String, Integer>();
- for (Map.Entry<String, HServerLoad> e : this.serversToLoad.entrySet()) {
+ for (Map.Entry<String, HServerLoad> e : this.serversToLoad.entries()) {
HServerInfo si = this.serversToServerInfo.get(e.getKey());
if (si == null) {
LOG.debug("ServerTimeoutMonitor : no si for " + e.getKey());
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jun 20 12:50:51 2012
@@ -27,11 +27,13 @@ import java.lang.management.RuntimeMXBea
import java.lang.reflect.Constructor;
import java.net.BindException;
import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -42,12 +44,7 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
-import java.util.SortedSet;
import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.Date;
-import java.util.Calendar;
-import java.text.SimpleDateFormat;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -91,17 +88,14 @@ import org.apache.hadoop.hbase.Stoppable
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.HMsg.Type;
-import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
@@ -122,7 +116,16 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ParamFormat;
+import org.apache.hadoop.hbase.util.ParamFormatter;
+import org.apache.hadoop.hbase.util.RuntimeHaltAbortStrategy;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable;
@@ -134,7 +137,6 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.hadoop.hbase.util.Bytes;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
@@ -295,7 +297,7 @@ public class HRegionServer implements HR
// Cache configuration and block cache reference
private final CacheConfig cacheConfig;
-
+
// prevents excessive checking of filesystem
private int minCheckFSIntervalMillis;
@@ -368,7 +370,7 @@ public class HRegionServer implements HR
minCheckFSIntervalMillis =
conf.getInt("hbase.regionserver.min.check.fs.interval", 30000);
checkFSAbortTimeOutMillis =
- conf.getInt("hbase.regionserver.check.fs.abort.timeout",
+ conf.getInt("hbase.regionserver.check.fs.abort.timeout",
conf.getInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT,
HConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT));
if (minCheckFSIntervalMillis > checkFSAbortTimeOutMillis) {
@@ -399,7 +401,8 @@ public class HRegionServer implements HR
this.serverInfo = new HServerInfo(new HServerAddress(
new InetSocketAddress(address.getBindAddress(),
this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
- this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030), machineName);
+ this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
+ HConstants.DEFAULT_REGIONSERVER_INFOPORT), machineName);
if (this.serverInfo.getServerAddress() == null) {
throw new NullPointerException("Server address cannot be null; " +
"hbase-958 debugging");
@@ -1342,7 +1345,8 @@ public class HRegionServer implements HR
this.leases.setName(n + ".leaseChecker");
this.leases.start();
// Put up info server.
- int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
+ int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
+ HConstants.DEFAULT_REGIONSERVER_INFOPORT);
// -1 is for disabling info server
if (port >= 0) {
String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
@@ -1579,7 +1583,7 @@ public class HRegionServer implements HR
* Let the master know we're here
* Run initialization using parameters passed us by the master.
*/
- private MapWritable reportForDuty() {
+ private MapWritable reportForDuty() throws YouAreDeadException {
while (!stopRequested.get() && !getMaster()) {
sleeper.sleep();
LOG.warn("Unable to get master for initialization");
@@ -1598,11 +1602,19 @@ public class HRegionServer implements HR
if (LOG.isDebugEnabled())
LOG.debug("sending initial server load: " + hsl);
lastMsg = System.currentTimeMillis();
- zooKeeperWrapper.writeRSLocation(this.serverInfo);
- result = this.hbaseMaster.regionServerStartup(this.serverInfo);
- break;
+ if (zooKeeperWrapper.writeRSLocation(this.serverInfo)) {
+ // We either created the znode, or it existed already. Check in with the master.
+ result = this.hbaseMaster.regionServerStartup(this.serverInfo);
+ break;
+ } else {
+ LOG.error("Could not write RS znode " + serverInfo.getServerName()
+ + " to ZK, will try again");
+ }
} catch (IOException e) {
LOG.warn("error telling master we are up", e);
+ if (e instanceof YouAreDeadException) {
+ throw (YouAreDeadException) e;
+ }
}
sleeper.sleep(lastMsg);
}
@@ -2175,8 +2187,8 @@ public class HRegionServer implements HR
throws IOException {
return applyMutations(regionName, puts, "multiput_");
}
-
- private int applyMutations(final byte[] regionName,
+
+ private int applyMutations(final byte[] regionName,
final List<? extends Mutation> mutations,
String methodName)
throws IOException {
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Wed Jun 20 12:50:51 2012
@@ -915,27 +915,26 @@ public class ZooKeeperWrapper implements
return false;
}
+ private String getRSZNode(HServerInfo info) {
+ return rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName();
+ }
+
/**
- * Update the RS address and set a watcher on the znode
- * @param info The RS info
- * @param watcher The watcher to put on the znode
- * @return true if the update is done, false if it failed
- */
- public boolean updateRSLocationGetWatch(HServerInfo info, Watcher watcher) {
- byte[] data = Bytes.toBytes(info.getServerAddress().toString());
- String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName();
- try {
- recoverableZK.setData(znode, data, -1);
- LOG.debug("<" + instanceName + ">" + "Updated ZNode " + znode
- + " with data " + info.getServerAddress().toString());
+ * Set a watch on a region server location node
+ */
+ public boolean setRSLocationWatch(HServerInfo info, Watcher watcher) {
+ String znode = getRSZNode(info);
+ try {
recoverableZK.getData(znode, watcher, null);
return true;
} catch (KeeperException e) {
- LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
+ LOG.warn("<" + instanceName + ">" + "Failed to set watch on the " + znode
+ + " znode in ZooKeeper", e);
} catch (InterruptedException e) {
- LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
+ Thread.currentThread().interrupt();
+ LOG.warn("<" + instanceName + ">" + "Failed to set watch on the " + znode
+ + " znode in ZooKeeper", e);
}
-
return false;
}
Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java?rev=1352071&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java Wed Jun 20 12:50:51 2012
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+public class TestServerLoadMap {
+
+ @Test
+ public void testOperations() {
+ ServerLoadMap<Integer> m = new ServerLoadMap<Integer>();
+ m.updateServerLoad("a", 2);
+ assertEquals(2, m.get("a").intValue());
+ m.removeServerLoad("a");
+ assertNull(m.get("a"));
+ m.updateServerLoad("a", 3);
+ assertEquals(3, m.get("a").intValue());
+ m.updateServerLoad("b", 1);
+ m.updateServerLoad("c", 2);
+ m.updateServerLoad("d", 2);
+ m.updateServerLoad("e", 3);
+ assertEquals("{1=[b], 2=[c, d]}", m.getLightServers(3).toString());
+ assertEquals("{2=[c, d], 3=[a, e]}", m.getHeavyServers(2).toString());
+ List<Map.Entry<String, Integer>> entries = m.entries();
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+ @Override
+ public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
+ return o1.getKey().compareTo(o2.getKey());
+ }
+ });
+ assertEquals("[a=3, b=1, c=2, d=2, e=3]", entries.toString());
+ assertEquals(5, m.size());
+ assertTrue(m.isMostLoadedServer("a"));
+ assertTrue(m.isMostLoadedServer("e"));
+ assertFalse(m.isMostLoadedServer("b"));
+ assertFalse(m.isMostLoadedServer("c"));
+ assertFalse(m.isMostLoadedServer("d"));
+ assertEquals(2, m.numServersByLoad(3));
+ assertEquals(2, m.numServersByLoad(2));
+ assertEquals(1, m.numServersByLoad(1));
+ assertEquals(0, m.numServersByLoad(-135));
+ }
+
+
+ @Test
+ public void testRandomizedPutRemove() {
+ final boolean debug = false;
+ ServerLoadMap<Integer> m = new ServerLoadMap<Integer>();
+ Map<String, Integer> serverToLoad = new HashMap<String, Integer>();
+ Map<Integer, Set<String>> loadToServers = new HashMap<Integer, Set<String>>();
+ Random rand = new Random(23984727L);
+ for (int i = 0; i < 100000; ++i) {
+ String serverName = "server" + rand.nextInt(100);
+ if (rand.nextBoolean()) {
+ int load = rand.nextInt(50);
+ if (debug) {
+ System.err.println("Put: " + serverName + " -> " + load);
+ }
+ m.updateServerLoad(serverName, load);
+ Integer oldLoad = serverToLoad.remove(serverName);
+ if (oldLoad != null) {
+ loadToServers.get(oldLoad).remove(serverName);
+ if (loadToServers.get(oldLoad).isEmpty()) {
+ loadToServers.remove(oldLoad);
+ }
+ }
+ serverToLoad.put(serverName, load);
+ if (!loadToServers.containsKey(load)) {
+ loadToServers.put(load, new TreeSet<String>());
+ }
+ loadToServers.get(load).add(serverName);
+ } else {
+ if (debug) {
+ System.err.println("Remove: " + serverName);
+ }
+ m.removeServerLoad(serverName);
+ Integer load = serverToLoad.remove(serverName);
+ if (load != null && loadToServers.containsKey(load)) {
+ loadToServers.get(load).remove(serverName);
+ if (loadToServers.get(load).isEmpty()) {
+ loadToServers.remove(load);
+ }
+ }
+ }
+ Map<Integer, Collection<String>> mLoadToSrv = m.getLightServers(Integer.MAX_VALUE);
+ if (debug) {
+ System.err.println(" serverToLoad: " + serverToLoad);
+ System.err.println(" loadToServers: " + loadToServers);
+ System.err.println(" m.entries: " + m.entries());
+ System.err.println(" m.size: " + m.size());
+ System.err.println(" mLoadToSrv: " + mLoadToSrv);
+ }
+ assertEquals(serverToLoad.size(), m.size());
+ assertEquals(loadToServers.size(), mLoadToSrv.size());
+ for (Map.Entry<Integer, Collection<String>> entry : mLoadToSrv.entrySet()) {
+ List<String> mServers = new ArrayList<String>(entry.getValue());
+ Collections.sort(mServers);
+ assertEquals("Error at iteration #" + i + " for load " + entry.getKey(),
+ loadToServers.get(entry.getKey()).toString(), mServers.toString());
+ }
+ }
+ }
+
+}