You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/06/20 14:50:52 UTC

svn commit: r1352071 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/master/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/zookeeper/ test/java/org/apache/hadoop/hbase/master/

Author: mbautin
Date: Wed Jun 20 12:50:51 2012
New Revision: 1352071

URL: http://svn.apache.org/viewvc?rev=1352071&view=rev
Log:
[master] Improve regionserver generation timestamp handling on check-in; handle failures to write a RS ZK node

Author: mbautin

Summary:
There are three related changes here:
- Make the master correctly handle the regionserver generation timestamp on check-in (expire the older server out of the existing one for the host:port pair and the one checking in; do nothing if the two have the same generation timestamp)
- Avoid writing the regionserver ZK node on the master side when setting a watch on it
- Correctly handle failures to write the RS ZK node on the regionserver side and to set watch on it on the master side

Additionally, I am moving the server -> load and load -> server map manipulation logic from ServerManager to the new ServerLoadMap class and adding a unit test.

Test Plan: Unit tests

Reviewers: pkhemani, kranganathan, kannan

Reviewed By: pkhemani

Differential Revision: https://reviews.facebook.net/D3699

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Jun 20 12:50:51 2012
@@ -142,8 +142,7 @@ public class AssignmentManager {
         // shutting down.
         if (info != null &&
             !master.getServerManager().isDead(info.getServerName()) &&
-            master.getServerManager().getServersToLoad()
-            .get(info.getServerName()) != null) {
+            master.getServerManager().getServersToServerInfo().get(info.getServerName()) != null) {
           LOG.info("Add a transient assignment from the assignment plan: "
               + " region " + region.getRegionNameAsString() + " to the "
               + positions[i] + " region server" + info.getHostnamePort());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Jun 20 12:50:51 2012
@@ -38,7 +38,6 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Random;
 import java.util.Set;
-import java.util.SortedMap;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -70,7 +69,6 @@ import org.apache.hadoop.hbase.HRegionIn
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
-import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
@@ -695,11 +693,6 @@ public class HMaster extends Thread impl
     return this.zooKeeperWrapper;
   }
 
-  // These methods are so don't have to pollute RegionManager with ServerManager.
-  SortedMap<HServerLoad, Set<String>> getLoadToServers() {
-    return this.serverManager.getLoadToServers();
-  }
-
   int numServers() {
     return this.serverManager.numServers();
   }
@@ -720,17 +713,6 @@ public class HMaster extends Thread impl
     return this.oldLogDir;
   }
 
-  /**
-   * Add to the passed <code>m</code> servers that are loaded less than
-   * <code>l</code>.
-   * @param l
-   * @param m
-   */
-  void getLightServers(final HServerLoad l,
-      SortedMap<HServerLoad, Set<String>> m) {
-    this.serverManager.getLightServers(l, m);
-  }
-
   /** Main processing loop */
   @Override
   public void run() {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/RegionManager.java Wed Jun 20 12:50:51 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.master;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -500,15 +501,14 @@ public class RegionManager {
    */
   private int regionsToGiveOtherServers(final int numUnassignedRegions,
       final HServerLoad thisServersLoad) {
-    SortedMap<HServerLoad, Set<String>> lightServers =
-      new TreeMap<HServerLoad, Set<String>>();
-    this.master.getLightServers(thisServersLoad, lightServers);
+    SortedMap<HServerLoad, Collection<String>> lightServers = 
+        master.getServerManager().getServersToLoad().getLightServers(thisServersLoad);
     // Examine the list of servers that are more lightly loaded than this one.
     // Pretend that we will assign regions to these more lightly loaded servers
     // until they reach load equal with ours. Then, see how many regions are
     // left unassigned. That is how many regions we should assign to this server
     int nRegions = 0;
-    for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
+    for (Map.Entry<HServerLoad, Collection<String>> e : lightServers.entrySet()) {
       HServerLoad lightLoad = new HServerLoad(e.getKey());
       do {
         lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
@@ -778,15 +778,11 @@ public class RegionManager {
   private int computeNextHeaviestLoad(HServerLoad referenceLoad,
     HServerLoad heavierLoad) {
 
-    SortedMap<HServerLoad, Set<String>> heavyServers =
-      new TreeMap<HServerLoad, Set<String>>();
-    synchronized (master.getLoadToServers()) {
-      heavyServers.putAll(
-        master.getLoadToServers().tailMap(referenceLoad));
-    }
+    SortedMap<HServerLoad, Collection<String>> heavyServers = master.getServerManager().
+        getServersToLoad().getHeavyServers(referenceLoad);
     int nservers = 0;
-    for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
-      Set<String> servers = e.getValue();
+    for (Map.Entry<HServerLoad, Collection<String>> e : heavyServers.entrySet()) {
+      Collection<String> servers = e.getValue();
       nservers += servers.size();
       if (e.getKey().compareTo(referenceLoad) == 0) {
         // This is the load factor of the server we are considering
@@ -2154,21 +2150,21 @@ public class RegionManager {
     private int balanceToLowloaded(String srvName, HServerLoad srvLoad,
         double avgLoad) {
 
-      SortedMap<HServerLoad, Set<String>> loadToServers =
-        master.getLoadToServers();
-      // check if server most loaded
-      if (!loadToServers.get(loadToServers.lastKey()).contains(srvName))
+      ServerLoadMap<HServerLoad> serverLoadMap = master.getServerManager().getServersToLoad(); 
+
+      if (!serverLoadMap.isMostLoadedServer(srvName))
         return 0;
 
       // this server is most loaded, we will try to unload it by lowest
       // loaded servers
       int avgLoadMinusSlop = (int)Math.floor(avgLoad * (1 - this.slop)) - 1;
-      int lowestLoad = loadToServers.firstKey().getNumberOfRegions();
+      HServerLoad lowestServerLoad = serverLoadMap.getLowestLoad();
+      int lowestLoad = lowestServerLoad.getNumberOfRegions();
 
       if(lowestLoad >= avgLoadMinusSlop)
         return 0; // there is no low loaded servers
 
-      int lowSrvCount = loadToServers.get(loadToServers.firstKey()).size();
+      int lowSrvCount = serverLoadMap.numServersByLoad(lowestServerLoad);
       int numSrvRegs = srvLoad.getNumberOfRegions();
       int numMoveToLowLoaded = (avgLoadMinusSlop - lowestLoad) * lowSrvCount;
 

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java?rev=1352071&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerLoadMap.java Wed Jun 20 12:50:51 2012
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import com.google.common.collect.TreeMultimap;
+
+/** Maintains a map from regionservers name to server load and vice versa */
+public class ServerLoadMap<L extends Comparable<?>> {
+
+  /** SortedMap server load -> Set of server names */
+  private final TreeMultimap<L, String> loadToServers = TreeMultimap.create();
+
+  /** Map of server names -> server load */
+  private final Map<String, L> serversToLoad = new HashMap<String, L>();
+
+  private ReadWriteLock rwLock = new ReentrantReadWriteLock();
+
+  public void updateServerLoad(String serverName, L load) {
+    rwLock.writeLock().lock();
+    try {
+      L currentLoad = serversToLoad.get(serverName);
+      if (load != null && load.equals(currentLoad)) {
+        // The load for this server is already the same as what we are trying to set.
+        return;
+      }
+
+      if (currentLoad != null) {
+        // Remove the server from its current load bucket.
+        loadToServers.remove(currentLoad, serverName);
+      }
+
+      if (load != null) {
+        // Set the new load for the server and add it to its new load bucket.
+        serversToLoad.put(serverName, load);
+        loadToServers.put(load, serverName);
+      } else {
+        // Remove server -> load mapping.
+        serversToLoad.remove(serverName);
+      }
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+  }
+
+  public void removeServerLoad(String serverName) {
+    rwLock.writeLock().lock();
+    try {
+      L load = serversToLoad.remove(serverName);
+      if (load != null) {
+        loadToServers.remove(load, serverName);
+      }
+    } finally {
+      rwLock.writeLock().unlock();
+    }
+  }
+
+  public L get(String serverName) {
+    rwLock.readLock().lock();
+    try {
+      return serversToLoad.get(serverName);
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  /** Takes a snapshot of all entries in the server to load map */
+  public List<Map.Entry<String, L>> entries() {
+    rwLock.readLock().lock();
+    try {
+      return new ArrayList<Map.Entry<String, L>>(serversToLoad.entrySet());
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  public SortedMap<L, Collection<String>> getLightServers(L referenceLoad) {
+    rwLock.readLock().lock();
+    try {
+      SortedMap<L, Collection<String>> lightServers =
+          new TreeMap<L, Collection<String>>();
+      lightServers.putAll(loadToServers.asMap().headMap(referenceLoad));
+      return lightServers;
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  public SortedMap<L, Collection<String>> getHeavyServers(L referenceLoad) {
+    rwLock.readLock().lock();
+    try {
+      SortedMap<L, Collection<String>> heavyServers =
+          new TreeMap<L, Collection<String>>();
+      heavyServers.putAll(loadToServers.asMap().tailMap(referenceLoad));
+      return heavyServers;
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  public int size() {
+    rwLock.readLock().lock();
+    try {
+      return serversToLoad.size();
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  public boolean isMostLoadedServer(String serverName) {
+    rwLock.readLock().lock();
+    try {
+      L maxLoad = loadToServers.asMap().lastKey();
+      if (maxLoad == null) {
+        return false;
+      }
+      return loadToServers.get(maxLoad).contains(serverName);
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  public L getLowestLoad() {
+    rwLock.readLock().lock();
+    try {
+      return loadToServers.asMap().firstKey();
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+  public int numServersByLoad(L load) {
+    rwLock.readLock().lock();
+    try {
+      Set<String> servers = loadToServers.get(load);
+      if (servers == null) {
+        return 0;
+      }
+      return servers.size();
+    } finally {
+      rwLock.readLock().unlock();
+    }
+  }
+
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Wed Jun 20 12:50:51 2012
@@ -28,12 +28,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
@@ -48,7 +45,6 @@ import org.apache.hadoop.hbase.HRegionLo
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HServerLoad;
-import org.apache.hadoop.hbase.PleaseHoldException;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.client.Get;
@@ -100,13 +96,9 @@ public class ServerManager {
    *  to be piggybacked upon the next processMsgs;
    */
   private ConcurrentHashMap<HServerInfo, ArrayList<HMsg>> pendingMsgsToSvrsMap;
-  
-  // SortedMap server load -> Set of server names
-  private final SortedMap<HServerLoad, Set<String>> loadToServers =
-    Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
-  // Map of server names -> server load
-  private final Map<String, HServerLoad> serversToLoad =
-    new ConcurrentHashMap<String, HServerLoad>();
+
+  /** A bidirectional map between server names and their load */
+  private ServerLoadMap<HServerLoad> serversToLoad = new ServerLoadMap<HServerLoad>();
 
   private HMaster master;
 
@@ -216,11 +208,19 @@ public class ServerManager {
       String message = "Server start rejected; we already have " + hostAndPort +
         " registered; existingServer=" + existingServer + ", newServer=" + info;
       LOG.info(message);
-      if (existingServer.getStartCode() < info.getStartCode()) {
+      long existingStartCode = existingServer.getStartCode();
+      long newStartCode = info.getStartCode();
+      if (existingStartCode < newStartCode) {
         LOG.info("Triggering server recovery; existingServer looks stale");
         expireServer(existingServer);
+      } else if (existingStartCode == newStartCode) {
+        LOG.debug("Duplicate region server check-in with start code " + existingStartCode + ": " +
+            info.getServerName() + ", processing normally");
+      } else {
+        LOG.info("Newer server has already checked in, telling the old server to stop");
+        throw new YouAreDeadException("A new server with start code " + existingStartCode
+            + " is already online for " + info.getHostnamePort());
       }
-      throw new PleaseHoldException(message);
     }
     checkIsDead(info.getServerName(), "STARTUP");
     LOG.info("Received start message from: " + info.getServerName());
@@ -257,33 +257,48 @@ public class ServerManager {
    * Adds the HSI to the RS list and creates an empty load
    * @param info The region server informations
    */
-  public void recordNewServer(HServerInfo info) {
+  public void recordNewServer(HServerInfo info) throws IOException {
     recordNewServer(info, false);
   }
 
+  /** Restore the old value for the given key in a map */
+  private static <K, V> void undoMapUpdate(Map<K, V> m, K key, V oldValue) {
+    if (oldValue == null) {
+      m.remove(key);
+    } else {
+      m.put(key, oldValue);
+    }
+  }
+
   /**
    * Adds the HSI to the RS list
    * @param info The region server informations
    * @param useInfoLoad True if the load from the info should be used
    *                    like under a master failover
    */
-  void recordNewServer(HServerInfo info, boolean useInfoLoad) {
+  void recordNewServer(HServerInfo info, boolean useInfoLoad) throws IOException {
     HServerLoad load = useInfoLoad ? info.getLoad() : new HServerLoad();
     String serverName = info.getServerName();
     info.setLoad(load);
     // We must set this watcher here because it can be set on a fresh start
     // or on a failover
     Watcher watcher = new ServerExpirer(new HServerInfo(info));
-    this.master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
+
+    // Save the old values so we can rollback if we fail setting the ZK watch
+    HServerInfo oldServerInfo = serversToServerInfo.get(serverName);
+    HServerLoad oldServerLoad = serversToLoad.get(serverName);
+
     this.serversToServerInfo.put(serverName, info);
-    this.serversToLoad.put(serverName, load);
-    synchronized (this.loadToServers) {
-      Set<String> servers = this.loadToServers.get(load);
-      if (servers == null) {
-        servers = new HashSet<String>();
-      }
-      servers.add(serverName);
-      this.loadToServers.put(load, servers);
+    serversToLoad.updateServerLoad(serverName, load);
+
+    // Setting a watch after making changes to internal server to server info / load data
+    // structures because the watch can fire immediately after being set.
+    if (!this.master.getZooKeeperWrapper().setRSLocationWatch(info, watcher)) {
+      // Could not set a watch, undo the above changes.
+      serversToLoad.updateServerLoad(serverName, oldServerLoad);
+      undoMapUpdate(serversToServerInfo, serverName, oldServerInfo);
+      throw new IOException("Could not set a watch on regionserver location "
+          + info.getServerName());
     }
   }
 
@@ -474,14 +489,11 @@ public class ServerManager {
   throws IOException {
     // Refresh the info object and the load information
     this.serversToServerInfo.put(serverInfo.getServerName(), serverInfo);
-    HServerLoad oldLoad = this.serversToLoad.get(serverInfo.getServerName());
+    HServerLoad oldLoad = serversToLoad.get(serverInfo.getServerName());
     HServerLoad newLoad = serverInfo.getLoad();
     if (oldLoad != null) {
       // XXX why are we using oldLoad to update metrics
       this.master.getMetrics().incrementRequests(oldLoad.getNumberOfRequests());
-      if (!oldLoad.equals(newLoad)) {
-        updateLoadToServers(serverInfo.getServerName(), oldLoad);
-      }
     }
 
     // Set the current load information
@@ -491,15 +503,7 @@ public class ServerManager {
           serverInfo.getServerName());
     }
     newLoad.expireAfter = Long.MAX_VALUE;
-    this.serversToLoad.put(serverInfo.getServerName(), newLoad);
-    synchronized (loadToServers) {
-      Set<String> servers = this.loadToServers.get(newLoad);
-      if (servers == null) {
-        servers = new HashSet<String>();
-      }
-      servers.add(serverInfo.getServerName());
-      this.loadToServers.put(newLoad, servers);
-    }
+    serversToLoad.updateServerLoad(serverInfo.getServerName(), newLoad);
 
     // Next, process messages for this server
     return processMsgs(serverInfo, mostLoadedRegions, msgs);
@@ -582,7 +586,7 @@ public class ServerManager {
           // Production code path.
           master.getRegionManager().assignRegions(serverInfo,
               mostLoadedRegions, returnMsgs);
-        } else {
+        } else if (mostLoadedRegions.length > 0) {
           // UNIT TESTS ONLY.
           // We just don't assign anything to "blacklisted" regionservers as
           // required by a unit test (for determinism). This is OK because
@@ -785,8 +789,8 @@ public class ServerManager {
         this.master.getRegionManager().unsetRootRegion();
         if (region.isOffline()) {
           // Can't proceed without root region. Shutdown.
-          LOG.fatal("root region is marked offline");
-          this.master.shutdown();
+          LOG.fatal("root region is marked offline, shutting down the cluster");
+          master.requestClusterShutdown();
           return;
         }
 
@@ -846,27 +850,11 @@ public class ServerManager {
       }
 
       infoUpdated = true;
-      // update load information
-      updateLoadToServers(serverName, this.serversToLoad.remove(serverName));
+      serversToLoad.removeServerLoad(serverName);
     }
     return infoUpdated;
   }
 
-  private void updateLoadToServers(final String serverName,
-      final HServerLoad load) {
-    if (load == null) return;
-    synchronized (this.loadToServers) {
-      Set<String> servers = this.loadToServers.get(load);
-      if (servers != null) {
-        servers.remove(serverName);
-        if (servers.size() > 0)
-          this.loadToServers.put(load, servers);
-        else
-          this.loadToServers.remove(load);
-      }
-    }
-  }
-
   /**
    * Compute the average load across all region servers.
    * Currently, this uses a very naive computation - just uses the number of
@@ -878,19 +866,11 @@ public class ServerManager {
     int numServers = 0;
     double averageLoad = 0.0;
     synchronized (serversToLoad) {
-      // numServers = serversToLoad.size();
-      // the above was not accurate as a server is first removed from the
-      // serversToServerInfo map, then from the serversToLoad map
-      numServers = serversToServerInfo.size();
-      for (Map.Entry<String, HServerLoad> entry : serversToLoad.entrySet()) {
+      for (Map.Entry<String, HServerLoad> entry : serversToLoad.entries()) {
         HServerInfo hsi = serversToServerInfo.get(entry.getKey());
         if (null != hsi) {
           totalLoad += entry.getValue().getNumberOfRegions();
-        } else {
-          // this server has already been removed from the serversToServerInfo
-          // map, but not from the serversToLoad one yet, thus ignore it for
-          // loadbalancing purposes
-          numServers--;
+          numServers++;
         }
       }
       if (numServers > 0) {
@@ -934,22 +914,6 @@ public class ServerManager {
   }
 
   /**
-   * @return Read-only map of servers to load.
-   */
-  public Map<String, HServerLoad> getServersToLoad() {
-    return Collections.unmodifiableMap(serversToLoad);
-  }
-
-  /**
-   * @return Read-only map of load to servers.
-   */
-  public SortedMap<HServerLoad, Set<String>> getLoadToServers() {
-    synchronized (this.loadToServers) {
-      return Collections.unmodifiableSortedMap(this.loadToServers);
-    }
-  }
-
-  /**
    * There is no way to guarantee that the returned servers are really online
    *
    * @return the list of the HServerAddress for all the online region servers.
@@ -1034,16 +998,7 @@ public class ServerManager {
     }
     // Remove the server from the known servers lists and update load info
     this.serversToServerInfo.remove(serverName);
-    HServerLoad load = this.serversToLoad.remove(serverName);
-    if (load != null) {
-      synchronized (this.loadToServers) {
-        Set<String> servers = this.loadToServers.get(load);
-        if (servers != null) {
-          servers.remove(serverName);
-          if (servers.isEmpty()) this.loadToServers.remove(load);
-        }
-      }
-    }
+    serversToLoad.removeServerLoad(serverName);
     // Add to dead servers and queue a shutdown processing.
     LOG.debug("Added=" + serverName +
       " to dead servers, added shutdown processing operation");
@@ -1092,17 +1047,8 @@ public class ServerManager {
     return this.deadServers;
   }
 
-  /**
-   * Add to the passed <code>m</code> servers that are loaded less than
-   * <code>l</code>.
-   * @param l
-   * @param m
-   */
-  void getLightServers(final HServerLoad l,
-      SortedMap<HServerLoad, Set<String>> m) {
-    synchronized (this.loadToServers) {
-      m.putAll(this.loadToServers.headMap(l));
-    }
+  public ServerLoadMap<HServerLoad> getServersToLoad() {
+    return serversToLoad;
   }
 
   public boolean hasEnoughRegionServers() {
@@ -1205,11 +1151,12 @@ public class ServerManager {
     long curTime = EnvironmentEdgeManager.currentTimeMillis();
     boolean waitingForMoreServersInRackToTimeOut = false;
     boolean reportDetails = false;
+    int serverCount = serversToLoad.size();
     if ((curTime > lastDetailedLogAt + (3600 * 1000)) ||
-        lastLoggedServerCount != serversToLoad.size()) {
+        lastLoggedServerCount != serverCount) {
       reportDetails = true;
       lastDetailedLogAt = curTime;
-      lastLoggedServerCount = serversToLoad.size();
+      lastLoggedServerCount = serverCount;
     }
     // rack -> time of last report from rack
     Map<String, Long> rackLastReportAtMap = new HashMap<String, Long>();
@@ -1219,7 +1166,7 @@ public class ServerManager {
     // rack -> #servers in rack
     Map<String, Integer> rackNumServersMap =
         new HashMap<String, Integer>();
-    for (Map.Entry<String, HServerLoad> e : this.serversToLoad.entrySet()) {
+    for (Map.Entry<String, HServerLoad> e : this.serversToLoad.entries()) {
       HServerInfo si = this.serversToServerInfo.get(e.getKey());
       if (si == null) {
         LOG.debug("ServerTimeoutMonitor : no si for " + e.getKey());

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Jun 20 12:50:51 2012
@@ -27,11 +27,13 @@ import java.lang.management.RuntimeMXBea
 import java.lang.reflect.Constructor;
 import java.net.BindException;
 import java.net.InetSocketAddress;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -42,12 +44,7 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.SortedMap;
-import java.util.SortedSet;
 import java.util.TreeMap;
-import java.util.TreeSet;
-import java.util.Date;
-import java.util.Calendar;
-import java.text.SimpleDateFormat;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -91,17 +88,14 @@ import org.apache.hadoop.hbase.Stoppable
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
-import org.apache.hadoop.hbase.HMsg.Type;
-import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MultiPut;
 import org.apache.hadoop.hbase.client.MultiPutResponse;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
@@ -122,7 +116,16 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics.StoreMetricType;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
-import org.apache.hadoop.hbase.util.*;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.InfoServer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ParamFormat;
+import org.apache.hadoop.hbase.util.ParamFormatter;
+import org.apache.hadoop.hbase.util.RuntimeHaltAbortStrategy;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Writable;
@@ -134,7 +137,6 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event.EventType;
-import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
@@ -295,7 +297,7 @@ public class HRegionServer implements HR
 
   // Cache configuration and block cache reference
   private final CacheConfig cacheConfig;
-  
+
   // prevents excessive checking of filesystem
   private int minCheckFSIntervalMillis;
 
@@ -368,7 +370,7 @@ public class HRegionServer implements HR
     minCheckFSIntervalMillis =
         conf.getInt("hbase.regionserver.min.check.fs.interval", 30000);
     checkFSAbortTimeOutMillis =
-        conf.getInt("hbase.regionserver.check.fs.abort.timeout", 
+        conf.getInt("hbase.regionserver.check.fs.abort.timeout",
                     conf.getInt(HConstants.ZOOKEEPER_SESSION_TIMEOUT,
                                 HConstants.DEFAULT_ZOOKEEPER_SESSION_TIMEOUT));
     if (minCheckFSIntervalMillis > checkFSAbortTimeOutMillis) {
@@ -399,7 +401,8 @@ public class HRegionServer implements HR
     this.serverInfo = new HServerInfo(new HServerAddress(
       new InetSocketAddress(address.getBindAddress(),
       this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
-      this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030), machineName);
+      this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
+          HConstants.DEFAULT_REGIONSERVER_INFOPORT), machineName);
     if (this.serverInfo.getServerAddress() == null) {
       throw new NullPointerException("Server address cannot be null; " +
         "hbase-958 debugging");
@@ -1342,7 +1345,8 @@ public class HRegionServer implements HR
     this.leases.setName(n + ".leaseChecker");
     this.leases.start();
     // Put up info server.
-    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, 60030);
+    int port = this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT,
+        HConstants.DEFAULT_REGIONSERVER_INFOPORT);
     // -1 is for disabling info server
     if (port >= 0) {
       String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
@@ -1579,7 +1583,7 @@ public class HRegionServer implements HR
    * Let the master know we're here
    * Run initialization using parameters passed us by the master.
    */
-  private MapWritable reportForDuty() {
+  private MapWritable reportForDuty() throws YouAreDeadException {
     while (!stopRequested.get() && !getMaster()) {
       sleeper.sleep();
       LOG.warn("Unable to get master for initialization");
@@ -1598,11 +1602,19 @@ public class HRegionServer implements HR
         if (LOG.isDebugEnabled())
           LOG.debug("sending initial server load: " + hsl);
         lastMsg = System.currentTimeMillis();
-        zooKeeperWrapper.writeRSLocation(this.serverInfo);
-        result = this.hbaseMaster.regionServerStartup(this.serverInfo);
-        break;
+        if (zooKeeperWrapper.writeRSLocation(this.serverInfo)) {
+          // We either created the znode, or it existed already. Check in with the master.
+          result = this.hbaseMaster.regionServerStartup(this.serverInfo);
+          break;
+        } else {
+          LOG.error("Could not write RS znode " + serverInfo.getServerName()
+              + " to ZK, will try again");
+        }
       } catch (IOException e) {
         LOG.warn("error telling master we are up", e);
+        if (e instanceof YouAreDeadException) {
+          throw (YouAreDeadException) e;
+        }
       }
       sleeper.sleep(lastMsg);
     }
@@ -2175,8 +2187,8 @@ public class HRegionServer implements HR
   throws IOException {
     return applyMutations(regionName, puts, "multiput_");
   }
-  
-  private int applyMutations(final byte[] regionName, 
+
+  private int applyMutations(final byte[] regionName,
       final List<? extends Mutation> mutations,
       String methodName)
   throws IOException {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java?rev=1352071&r1=1352070&r2=1352071&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWrapper.java Wed Jun 20 12:50:51 2012
@@ -915,27 +915,26 @@ public class ZooKeeperWrapper implements
     return false;
   }
 
+  private String getRSZNode(HServerInfo info) {
+    return rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName();
+  }
+
   /**
-   * Update the RS address and set a watcher on the znode
-   * @param info The RS info
-   * @param watcher The watcher to put on the znode
-   * @return true if the update is done, false if it failed
-   */
-  public boolean updateRSLocationGetWatch(HServerInfo info, Watcher watcher) {
-    byte[] data = Bytes.toBytes(info.getServerAddress().toString());
-    String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getServerName();
-    try {
-      recoverableZK.setData(znode, data, -1);
-      LOG.debug("<" + instanceName + ">" + "Updated ZNode " + znode
-          + " with data " + info.getServerAddress().toString());
+   * Set a watch on a region server location node
+   */
+  public boolean setRSLocationWatch(HServerInfo info, Watcher watcher) {
+    String znode = getRSZNode(info);
+    try {
       recoverableZK.getData(znode, watcher, null);
       return true;
     } catch (KeeperException e) {
-      LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
+      LOG.warn("<" + instanceName + ">" + "Failed to set watch on the " + znode
+          + " znode in ZooKeeper", e);
     } catch (InterruptedException e) {
-      LOG.warn("<" + instanceName + ">" + "Failed to update " + znode + " znode in ZooKeeper: " + e);
+      Thread.currentThread().interrupt();
+      LOG.warn("<" + instanceName + ">" + "Failed to set watch on the " + znode
+          + " znode in ZooKeeper", e);
     }
-
     return false;
   }
 

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java?rev=1352071&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/master/TestServerLoadMap.java Wed Jun 20 12:50:51 2012
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.junit.Test;
+
+public class TestServerLoadMap {
+
+  @Test
+  public void testOperations() {
+    ServerLoadMap<Integer> m = new ServerLoadMap<Integer>();
+    m.updateServerLoad("a", 2);
+    assertEquals(2, m.get("a").intValue());
+    m.removeServerLoad("a");
+    assertNull(m.get("a"));
+    m.updateServerLoad("a", 3);
+    assertEquals(3, m.get("a").intValue());
+    m.updateServerLoad("b", 1);
+    m.updateServerLoad("c", 2);
+    m.updateServerLoad("d", 2);
+    m.updateServerLoad("e", 3);
+    assertEquals("{1=[b], 2=[c, d]}", m.getLightServers(3).toString());
+    assertEquals("{2=[c, d], 3=[a, e]}", m.getHeavyServers(2).toString());
+    List<Map.Entry<String, Integer>> entries = m.entries();
+    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() {
+      @Override
+      public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
+        return o1.getKey().compareTo(o2.getKey());
+      }
+    });
+    assertEquals("[a=3, b=1, c=2, d=2, e=3]", entries.toString());
+    assertEquals(5, m.size());
+    assertTrue(m.isMostLoadedServer("a"));
+    assertTrue(m.isMostLoadedServer("e"));
+    assertFalse(m.isMostLoadedServer("b"));
+    assertFalse(m.isMostLoadedServer("c"));
+    assertFalse(m.isMostLoadedServer("d"));
+    assertEquals(2, m.numServersByLoad(3));
+    assertEquals(2, m.numServersByLoad(2));
+    assertEquals(1, m.numServersByLoad(1));
+    assertEquals(0, m.numServersByLoad(-135));
+  }
+
+  
+  @Test
+  public void testRandomizedPutRemove() {
+    final boolean debug = false;
+    ServerLoadMap<Integer> m = new ServerLoadMap<Integer>();
+    Map<String, Integer> serverToLoad = new HashMap<String, Integer>();
+    Map<Integer, Set<String>> loadToServers =  new HashMap<Integer, Set<String>>();
+    Random rand = new Random(23984727L);
+    for (int i = 0; i < 100000; ++i) {
+      String serverName = "server" + rand.nextInt(100);
+      if (rand.nextBoolean()) {
+        int load = rand.nextInt(50);
+        if (debug) {
+          System.err.println("Put: " + serverName + " -> " + load);
+        }
+        m.updateServerLoad(serverName, load);
+        Integer oldLoad = serverToLoad.remove(serverName);
+        if (oldLoad != null) {
+          loadToServers.get(oldLoad).remove(serverName);
+          if (loadToServers.get(oldLoad).isEmpty()) {
+            loadToServers.remove(oldLoad);
+          }
+        }
+        serverToLoad.put(serverName, load);
+        if (!loadToServers.containsKey(load)) {
+          loadToServers.put(load, new TreeSet<String>());
+        }
+        loadToServers.get(load).add(serverName);
+      } else {
+        if (debug) {
+          System.err.println("Remove: " + serverName);
+        }
+        m.removeServerLoad(serverName);
+        Integer load = serverToLoad.remove(serverName);
+        if (load != null && loadToServers.containsKey(load)) {
+          loadToServers.get(load).remove(serverName);
+          if (loadToServers.get(load).isEmpty()) {
+            loadToServers.remove(load);
+          }
+        }
+      }
+      Map<Integer, Collection<String>> mLoadToSrv = m.getLightServers(Integer.MAX_VALUE);
+      if (debug) {
+        System.err.println("  serverToLoad: " + serverToLoad);
+        System.err.println("  loadToServers: " + loadToServers);
+        System.err.println("  m.entries: " + m.entries());
+        System.err.println("  m.size: " + m.size());
+        System.err.println("  mLoadToSrv: " + mLoadToSrv);
+      }
+      assertEquals(serverToLoad.size(), m.size());
+      assertEquals(loadToServers.size(), mLoadToSrv.size());
+      for (Map.Entry<Integer, Collection<String>> entry : mLoadToSrv.entrySet()) {
+        List<String> mServers = new ArrayList<String>(entry.getValue());
+        Collections.sort(mServers);
+        assertEquals("Error at iteration #" + i + " for load " + entry.getKey(),
+            loadToServers.get(entry.getKey()).toString(), mServers.toString());
+      }
+    } 
+  }
+
+}