You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by en...@apache.org on 2014/06/28 02:31:20 UTC

[34/49] git commit: HBASE-10701 Cache invalidation improvements from client side

HBASE-10701 Cache invalidation improvements from client side

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-10070@1585766 13f79535-47bb-0310-9956-ffa450edef68


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad05de17
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad05de17
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad05de17

Branch: refs/heads/master
Commit: ad05de172f4df735c56f83b0d590724603b3c2e9
Parents: e04e61e
Author: Enis Soztutar <en...@apache.org>
Authored: Tue Apr 8 15:48:17 2014 +0000
Committer: Enis Soztutar <en...@apache.org>
Committed: Fri Jun 27 16:39:39 2014 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HRegionInfo.java    |   8 +-
 .../apache/hadoop/hbase/RegionLocations.java    | 100 ++++++++++-----
 .../apache/hadoop/hbase/catalog/MetaReader.java |   2 +-
 .../hadoop/hbase/client/ClusterConnection.java  |  28 +++++
 .../hadoop/hbase/client/ConnectionAdapter.java  |  12 ++
 .../hadoop/hbase/client/ConnectionManager.java  | 124 ++++++++++++++++---
 .../apache/hadoop/hbase/client/MetaCache.java   | 120 +++++++++++++-----
 .../RpcRetryingCallerWithReadReplicas.java      | 104 ++++++++++------
 .../hadoop/hbase/TestRegionLocations.java       |  52 ++++++--
 .../hbase/util/BoundedCompletionService.java    |  12 +-
 .../org/apache/hadoop/hbase/util/Threads.java   |  32 ++++-
 .../hbase/client/CoprocessorHConnection.java    |  12 ++
 .../hbase/client/TestReplicaWithCluster.java    |   4 +-
 .../hadoop/hbase/util/MultiThreadedReader.java  |   4 +-
 .../hadoop/hbase/util/MultiThreadedWriter.java  |   2 +
 .../hbase/util/MultiThreadedWriterBase.java     |   4 +-
 16 files changed, 475 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
index 59a3248..3e5224b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HRegionInfo.java
@@ -561,7 +561,9 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
         break;
       }
     }
-    if(offset == -1) throw new IOException("Invalid regionName format");
+    if (offset == -1) {
+      throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName));
+    }
     byte[] tableName = new byte[offset];
     System.arraycopy(regionName, 0, tableName, 0, offset);
     offset = -1;
@@ -590,7 +592,9 @@ public class HRegionInfo implements Comparable<HRegionInfo> {
         break;
       }
     }
-    if(offset == -1) throw new IOException("Invalid regionName format");
+    if (offset == -1) {
+      throw new IOException("Invalid regionName format: " + Bytes.toStringBinary(regionName));
+    }
     byte [] startKey = HConstants.EMPTY_BYTE_ARRAY;
     if(offset != tableName.length + 1) {
       startKey = new byte[offset - tableName.length - 1];

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
index b5db549..8b77af1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/RegionLocations.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase;
 import java.util.Collection;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -34,27 +33,42 @@ import org.apache.hadoop.hbase.util.Bytes;
 public class RegionLocations {
 
   private final int numNonNullElements;
+
+  // locations array contains the HRL objects for known region replicas indexes by the replicaId.
+  // elements can be null if the region replica is not known at all. A null value indicates
+  // that there is a region replica with the index as replicaId, but the location is not known
+  // in the cache.
   private final HRegionLocation[] locations; // replicaId -> HRegionLocation.
 
   /**
    * Constructs the region location list. The locations array should
    * contain all the locations for known replicas for the region, and should be
-   * sorted in replicaId ascending order.
+   * sorted in replicaId ascending order, although it can contain nulls indicating replicaIds
+   * that the locations of which are not known.
    * @param locations an array of HRegionLocations for the same region range
    */
   public RegionLocations(HRegionLocation... locations) {
     int numNonNullElements = 0;
     int maxReplicaId = -1;
+    int maxReplicaIdIndex = -1;
+    int index = 0;
     for (HRegionLocation loc : locations) {
       if (loc != null) {
-        numNonNullElements++;
-        if (loc.getRegionInfo().getReplicaId() > maxReplicaId) {
+        if (loc.getServerName() != null) {
+          numNonNullElements++;
+        }
+        if (loc.getRegionInfo().getReplicaId() >= maxReplicaId) {
           maxReplicaId = loc.getRegionInfo().getReplicaId();
+          maxReplicaIdIndex = index;
         }
       }
+      index++;
     }
     this.numNonNullElements = numNonNullElements;
 
+    // account for the null elements in the array after maxReplicaIdIndex
+    maxReplicaId = maxReplicaId + (locations.length - (maxReplicaIdIndex + 1) );
+
     if (maxReplicaId + 1 == locations.length) {
       this.locations = locations;
     } else {
@@ -97,10 +111,10 @@ public class RegionLocations {
   }
 
   /**
-   * Returns a new HRegionLocationList with the locations removed (set to null)
+   * Returns a new RegionLocations with the locations removed (set to null)
    * which have the destination server as given.
    * @param serverName the serverName to remove locations of
-   * @return an HRegionLocationList object with removed locations or the same object
+   * @return an RegionLocations object with removed locations or the same object
    * if nothing is removed
    */
   public RegionLocations removeByServer(ServerName serverName) {
@@ -123,36 +137,58 @@ public class RegionLocations {
   /**
    * Removes the given location from the list
    * @param location the location to remove
-   * @return an HRegionLocationList object with removed locations or the same object
+   * @return an RegionLocations object with removed locations or the same object
    * if nothing is removed
    */
   public RegionLocations remove(HRegionLocation location) {
-    HRegionLocation[] newLocations = null;
-    for (int i = 0; i < locations.length; i++) {
-      // check whether something to remove. HRL.compareTo() compares ONLY the
-      // serverName. We want to compare the HRI's as well.
-      if (locations[i] != null
-          && location.getRegionInfo().equals(locations[i].getRegionInfo())
-          && location.equals(locations[i])) {
-        if (newLocations == null) { //first time
-          newLocations = new HRegionLocation[locations.length];
-          System.arraycopy(locations, 0, newLocations, 0, i);
-        }
-        newLocations[i] = null;
-      } else if (newLocations != null) {
-        newLocations[i] = locations[i];
-      }
+    if (location == null) return this;
+    if (location.getRegionInfo() == null) return this;
+    int replicaId = location.getRegionInfo().getReplicaId();
+    if (replicaId >= locations.length) return this;
+
+    // check whether something to remove. HRL.compareTo() compares ONLY the
+    // serverName. We want to compare the HRI's as well.
+    if (locations[replicaId] == null
+        || !location.getRegionInfo().equals(locations[replicaId].getRegionInfo())
+        || !location.equals(locations[replicaId])) {
+      return this;
     }
-    return newLocations == null ? this : new RegionLocations(newLocations);
+
+    HRegionLocation[] newLocations = new HRegionLocation[locations.length];
+    System.arraycopy(locations, 0, newLocations, 0, locations.length);
+    newLocations[replicaId] = null;
+
+    return new RegionLocations(newLocations);
+  }
+
+  /**
+   * Removes location of the given replicaId from the list
+   * @param replicaId the replicaId of the location to remove
+   * @return an RegionLocations object with removed locations or the same object
+   * if nothing is removed
+   */
+  public RegionLocations remove(int replicaId) {
+    if (getRegionLocation(replicaId) == null) {
+      return this;
+    }
+
+    HRegionLocation[] newLocations = new HRegionLocation[locations.length];
+
+    System.arraycopy(locations, 0, newLocations, 0, locations.length);
+    if (replicaId < newLocations.length) {
+      newLocations[replicaId] = null;
+    }
+
+    return new RegionLocations(newLocations);
   }
 
   /**
-   * Merges this HRegionLocation list with the given list assuming
+   * Merges this RegionLocations list with the given list assuming
    * same range, and keeping the most up to date version of the
    * HRegionLocation entries from either list according to seqNum. If seqNums
    * are equal, the location from the argument (other) is taken.
    * @param other the locations to merge with
-   * @return an HRegionLocationList object with merged locations or the same object
+   * @return an RegionLocations object with merged locations or the same object
    * if nothing is merged
    */
   public RegionLocations mergeLocations(RegionLocations other) {
@@ -160,7 +196,9 @@ public class RegionLocations {
 
     HRegionLocation[] newLocations = null;
 
-    int max = Math.max(this.locations.length, other.locations.length);
+    // Use the length from other, since it is coming from meta. Otherwise,
+    // in case of region replication going down, we might have a leak here.
+    int max = other.locations.length;
 
     for (int i = 0; i < max; i++) {
       HRegionLocation thisLoc = this.getRegionLocation(i);
@@ -207,7 +245,7 @@ public class RegionLocations {
    * @param checkForEquals whether to update the location if seqNums for the
    * HRegionLocations for the old and new location are the same
    * @param force whether to force update
-   * @return an HRegionLocationList object with updated locations or the same object
+   * @return an RegionLocations object with updated locations or the same object
    * if nothing is updated
    */
   public RegionLocations updateLocation(HRegionLocation location,
@@ -282,12 +320,10 @@ public class RegionLocations {
   public String toString() {
     StringBuilder builder = new StringBuilder("[");
     for (HRegionLocation loc : locations) {
-      if (loc != null) {
-        if (builder.length() > 1) {
-          builder.append(", ");
-        }
-        builder.append(loc);
+      if (builder.length() > 1) {
+        builder.append(", ");
       }
+      builder.append(loc == null ? "null" : loc);
     }
     builder.append("]");
     return builder.toString();

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
index e510857..ca5ae39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/catalog/MetaReader.java
@@ -238,7 +238,7 @@ public class MetaReader {
       parsedInfo = parseRegionInfoFromRegionName(regionName);
       row = getMetaKeyForRegion(parsedInfo);
     } catch (Exception parseEx) {
-      LOG.warn("Received parse exception:" + parseEx);
+      // Ignore. This is used with tableName passed as regionName.
     }
     Get get = new Get(row);
     get.addFamily(HConstants.CATALOG_FAMILY);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index fb63473..ef9a120 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -107,6 +107,19 @@ interface ClusterConnection extends HConnection {
       final byte [] row) throws IOException;
 
   /**
+   * Find the location of the region of <i>tableName</i> that <i>row</i>
+   * lives in, ignoring any value that might be in the cache.
+   * @param tableName name of the table <i>row</i> is in
+   * @param row row key you're trying to find the region of
+   * @param replicaId the replicaId of the region
+   * @return HRegionLocation that describes where to find the region in
+   * question
+   * @throws IOException if a remote or network exception occurs
+   */
+  HRegionLocation relocateRegion(final TableName tableName,
+      final byte [] row, int replicaId) throws IOException;
+
+  /**
    * Update the location cache. This is used internally by HBase, in most cases it should not be
    *  used by the client application.
    * @param tableName the table name
@@ -165,6 +178,20 @@ interface ClusterConnection extends HConnection {
    */
   RegionLocations locateRegion(TableName tableName,
                                byte[] row, boolean useCache, boolean retry) throws IOException;
+
+  /**
+  *
+  * @param tableName table to get regions of
+  * @param row the row
+  * @param useCache Should we use the cache to retrieve the region information.
+  * @param retry do we retry
+  * @param replicaId the replicaId for the region
+  * @return region locations for this row.
+  * @throws IOException
+  */
+ RegionLocations locateRegion(TableName tableName,
+                              byte[] row, boolean useCache, boolean retry, int replicaId) throws IOException;
+
   /**
    * Returns a {@link MasterKeepAliveConnection} to the active master
    */
@@ -247,4 +274,5 @@ interface ClusterConnection extends HConnection {
    * @return All locations for a particular region.
    */
   RegionLocations locateRegionAll(TableName tableName, byte[] row) throws IOException;
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index bea5fa8..30555ff 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -294,6 +294,18 @@ class ConnectionAdapter implements ClusterConnection {
   }
 
   @Override
+  public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
+      boolean retry, int replicaId) throws IOException {
+    return wrappedConnection.locateRegion(tableName, row, useCache, retry, replicaId);
+  }
+
+  @Override
+  public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId)
+      throws IOException {
+    return wrappedConnection.relocateRegion(tableName, row, replicaId);
+  }
+
+  @Override
   public MasterService.BlockingInterface getMaster() throws IOException {
     return wrappedConnection.getMaster();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 98f9d65..2d97ebf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -75,7 +75,89 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.*;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.BalanceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableCatalogJanitorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableNamesResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsCatalogJanitorEnabledResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsProcedureDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsRestoreSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsSnapshotDoneResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListNamespaceDescriptorsResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableDescriptorsByNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ListTableNamesByNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MasterService;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyColumnResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ModifyTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.MoveRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.OfflineRegionResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RestoreSnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.RunCatalogScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SetBalancerRunningResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ShutdownResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.SnapshotResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.StopMasterResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.TruncateTableResponse;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.UnassignRegionResponse;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
@@ -968,6 +1050,12 @@ class ConnectionManager {
     @Override
     public HRegionLocation relocateRegion(final TableName tableName,
         final byte [] row) throws IOException{
+      return relocateRegion(tableName, row, RegionReplicaUtil.DEFAULT_REPLICA_ID);
+    }
+
+    @Override
+    public HRegionLocation relocateRegion(final TableName tableName,
+        final byte [] row, int replicaId) throws IOException{
       // Since this is an explicit request not to use any caching, finding
       // disabled tables should not be desirable.  This will ensure that an exception is thrown when
       // the first time a disabled table is interacted with.
@@ -975,8 +1063,8 @@ class ConnectionManager {
         throw new TableNotEnabledException(tableName.getNameAsString() + " is disabled.");
       }
 
-      RegionLocations locations = locateRegion(tableName, row, false, true);
-      return locations == null ? null : locations.getRegionLocation();
+      RegionLocations locations = locateRegion(tableName, row, false, true, replicaId);
+      return locations == null ? null : locations.getRegionLocation(replicaId);
     }
 
     @Override
@@ -985,11 +1073,17 @@ class ConnectionManager {
       return relocateRegion(TableName.valueOf(tableName), row);
     }
 
-
     @Override
     public RegionLocations locateRegion(final TableName tableName,
       final byte [] row, boolean useCache, boolean retry)
     throws IOException {
+      return locateRegion(tableName, row, useCache, retry, RegionReplicaUtil.DEFAULT_REPLICA_ID);
+    }
+
+    @Override
+    public RegionLocations locateRegion(final TableName tableName,
+      final byte [] row, boolean useCache, boolean retry, int replicaId)
+    throws IOException {
       if (this.closed) throw new IOException(toString() + " closed");
       if (tableName== null || tableName.getName().length == 0) {
         throw new IllegalArgumentException(
@@ -1000,7 +1094,7 @@ class ConnectionManager {
         return this.registry.getMetaRegionLocation();
       } else {
         // Region not in the cache - have to go to the meta RS
-        return locateRegionInMeta(tableName, row, useCache, retry);
+        return locateRegionInMeta(tableName, row, useCache, retry, replicaId);
       }
     }
 
@@ -1009,13 +1103,13 @@ class ConnectionManager {
       * info that contains the table and row we're seeking.
       */
     private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
-                   boolean useCache, boolean retry) throws IOException {
+                   boolean useCache, boolean retry, int replicaId) throws IOException {
 
       // If we are supposed to be using the cache, look in the cache to see if
       // we already have the region.
       if (useCache) {
         RegionLocations locations = getCachedLocation(tableName, row);
-        if (locations != null) {
+        if (locations != null && locations.getRegionLocation(replicaId) != null) {
           return locations;
         }
       }
@@ -1042,9 +1136,13 @@ class ConnectionManager {
         }
         if (useCache) {
           RegionLocations locations = getCachedLocation(tableName, row);
-          if (locations != null) {
+          if (locations != null && locations.getRegionLocation(replicaId) != null) {
             return locations;
           }
+        } else {
+          // If we are not supposed to be using the cache, delete any existing cached location
+          // so it won't interfere.
+          metaCache.clearCache(tableName, row);
         }
 
         // Query the meta region
@@ -1066,11 +1164,11 @@ class ConnectionManager {
 
           // convert the row result into the HRegionLocation we need!
           RegionLocations locations = MetaReader.getRegionLocations(regionInfoRow);
-          if (locations == null || locations.getRegionLocation() == null) {
+          if (locations == null || locations.getRegionLocation(replicaId) == null) {
             throw new IOException("HRegionInfo was null in " +
               tableName + ", row=" + regionInfoRow);
           }
-          HRegionInfo regionInfo = locations.getRegionLocation().getRegionInfo();
+          HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
           if (regionInfo == null) {
             throw new IOException("HRegionInfo was null or empty in " +
               TableName.META_TABLE_NAME + ", row=" + regionInfoRow);
@@ -1094,7 +1192,7 @@ class ConnectionManager {
               regionInfo.getRegionNameAsString());
           }
 
-          ServerName serverName = locations.getRegionLocation().getServerName();
+          ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
           if (serverName == null) {
             throw new NoServerForRegionException("No server address listed " +
               "in " + TableName.META_TABLE_NAME + " for region " +
@@ -1107,11 +1205,9 @@ class ConnectionManager {
                 regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
                 ", but it is dead.");
           }
-
           // Instantiate the location
           cacheLocation(tableName, locations);
           return locations;
-
         } catch (TableNotFoundException e) {
           // if we got this error, probably means the table just plain doesn't
           // exist. rethrow the error immediately. this should always be coming
@@ -1137,7 +1233,7 @@ class ConnectionManager {
           // Only relocate the parent region if necessary
           if(!(e instanceof RegionOfflineException ||
               e instanceof NoServerForRegionException)) {
-            relocateRegion(TableName.META_TABLE_NAME, metaKey);
+            relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
           }
         }
         try{

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index 10a48ae..a7314a0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -107,6 +107,9 @@ public class MetaCache {
     RegionLocations oldLocations = tableLocations.putIfAbsent(startKey, locations);
     boolean isNewCacheEntry = (oldLocations == null);
     if (isNewCacheEntry) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Cached location: " + location);
+      }
       addToCachedServers(locations);
       return;
     }
@@ -124,7 +127,10 @@ public class MetaCache {
     // an additional counter on top of seqNum would be necessary to handle them all.
     RegionLocations updatedLocations = oldLocations.updateLocation(location, false, force);
     if (oldLocations != updatedLocations) {
-      tableLocations.replace(startKey, oldLocations, updatedLocations);
+      boolean replaced = tableLocations.replace(startKey, oldLocations, updatedLocations);
+      if (replaced && LOG.isTraceEnabled()) {
+        LOG.trace("Changed cached location to: " + location);
+      }
       addToCachedServers(updatedLocations);
     }
   }
@@ -132,24 +138,30 @@ public class MetaCache {
   /**
    * Put a newly discovered HRegionLocation into the cache.
    * @param tableName The table name.
-   * @param location the new location
+   * @param locations the new locations
    */
-  public void cacheLocation(final TableName tableName, final RegionLocations location) {
-    byte [] startKey = location.getRegionLocation().getRegionInfo().getStartKey();
+  public void cacheLocation(final TableName tableName, final RegionLocations locations) {
+    byte [] startKey = locations.getRegionLocation().getRegionInfo().getStartKey();
     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
-    RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, location);
+    RegionLocations oldLocation = tableLocations.putIfAbsent(startKey, locations);
     boolean isNewCacheEntry = (oldLocation == null);
     if (isNewCacheEntry) {
-      addToCachedServers(location);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Cached location: " + locations);
+      }
+      addToCachedServers(locations);
       return;
     }
 
     // merge old and new locations and add it to the cache
     // Meta record might be stale - some (probably the same) server has closed the region
     // with later seqNum and told us about the new location.
-    RegionLocations mergedLocation = oldLocation.mergeLocations(location);
-    tableLocations.replace(startKey, oldLocation, mergedLocation);
-    addToCachedServers(location);
+    RegionLocations mergedLocation = oldLocation.mergeLocations(locations);
+    boolean replaced = tableLocations.replace(startKey, oldLocation, mergedLocation);
+    if (replaced && LOG.isTraceEnabled()) {
+      LOG.trace("Merged cached locations: " + mergedLocation);
+    }
+    addToCachedServers(locations);
   }
 
   private void addToCachedServers(RegionLocations locations) {
@@ -238,12 +250,11 @@ public class MetaCache {
           RegionLocations regionLocations = e.getValue();
           if (regionLocations != null) {
             RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
-            deletedSomething |= regionLocations == updatedLocations;
             if (updatedLocations != regionLocations) {
               if (updatedLocations.isEmpty()) {
-                tableLocations.remove(e.getKey(), regionLocations);
+                deletedSomething |= tableLocations.remove(e.getKey(), regionLocations);
               } else {
-                tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
+                deletedSomething |= tableLocations.replace(e.getKey(), regionLocations, updatedLocations);
               }
             }
           }
@@ -251,8 +262,8 @@ public class MetaCache {
       }
       this.cachedServers.remove(serverName);
     }
-    if (deletedSomething && LOG.isDebugEnabled()) {
-      LOG.debug("Removed all cached region locations that map to " + serverName);
+    if (deletedSomething && LOG.isTraceEnabled()) {
+      LOG.trace("Removed all cached region locations that map to " + serverName);
     }
   }
 
@@ -260,6 +271,9 @@ public class MetaCache {
    * Delete all cached entries of a table.
    */
   public void clearCache(final TableName tableName) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("Removed all cached region locations for table " + tableName);
+    }
     this.cachedRegionLocations.remove(tableName);
   }
 
@@ -268,6 +282,34 @@ public class MetaCache {
    * @param tableName tableName
    * @param row
    */
+  public void clearCache(final TableName tableName, final byte [] row, int replicaId) {
+    ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
+
+    boolean removed = false;
+    RegionLocations regionLocations = getCachedLocation(tableName, row);
+    if (regionLocations != null) {
+      HRegionLocation toBeRemoved = regionLocations.getRegionLocation(replicaId);
+      RegionLocations updatedLocations = regionLocations.remove(replicaId);
+      if (updatedLocations != regionLocations) {
+        byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+        if (updatedLocations.isEmpty()) {
+          removed = tableLocations.remove(startKey, regionLocations);
+        } else {
+          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
+        }
+      }
+
+      if (removed && LOG.isTraceEnabled() && toBeRemoved != null) {
+        LOG.trace("Removed " + toBeRemoved + " from cache");
+      }
+    }
+  }
+
+  /**
+   * Delete a cached location, no matter what it is. Called when we were told to not use cache.
+   * @param tableName tableName
+   * @param row
+   */
   public void clearCache(final TableName tableName, final byte [] row) {
     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
 
@@ -275,8 +317,8 @@ public class MetaCache {
     if (regionLocations != null) {
       byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
       boolean removed = tableLocations.remove(startKey, regionLocations);
-      if (removed && LOG.isDebugEnabled()) {
-        LOG.debug("Removed " + regionLocations + " from cache");
+      if (removed && LOG.isTraceEnabled()) {
+        LOG.trace("Removed " + regionLocations + " from cache");
       }
     }
   }
@@ -292,10 +334,15 @@ public class MetaCache {
       RegionLocations updatedLocations = regionLocations.removeByServer(serverName);
       if (updatedLocations != regionLocations) {
         byte[] startKey = regionLocations.getRegionLocation().getRegionInfo().getStartKey();
+        boolean removed = false;
         if (updatedLocations.isEmpty()) {
-          tableLocations.remove(startKey, regionLocations);
+          removed = tableLocations.remove(startKey, regionLocations);
         } else {
-          tableLocations.replace(startKey, regionLocations, updatedLocations);
+          removed = tableLocations.replace(startKey, regionLocations, updatedLocations);
+        }
+        if (removed && LOG.isTraceEnabled()) {
+          LOG.trace("Removed locations of table: " + tableName + " ,row: " + Bytes.toString(row)
+            + " mapping to server: " + serverName + " from cache");
         }
       }
     }
@@ -310,12 +357,17 @@ public class MetaCache {
     RegionLocations regionLocations = tableLocations.get(hri.getStartKey());
     if (regionLocations != null) {
       HRegionLocation oldLocation = regionLocations.getRegionLocation(hri.getReplicaId());
+      if (oldLocation == null) return;
       RegionLocations updatedLocations = regionLocations.remove(oldLocation);
+      boolean removed = false;
       if (updatedLocations != regionLocations) {
         if (updatedLocations.isEmpty()) {
-          tableLocations.remove(hri.getStartKey(), regionLocations);
+          removed = tableLocations.remove(hri.getStartKey(), regionLocations);
         } else {
-          tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
+          removed = tableLocations.replace(hri.getStartKey(), regionLocations, updatedLocations);
+        }
+        if (removed && LOG.isTraceEnabled()) {
+          LOG.trace("Removed " + oldLocation + " from cache");
         }
       }
     }
@@ -325,22 +377,22 @@ public class MetaCache {
     if (location == null) {
       return;
     }
-
     TableName tableName = location.getRegionInfo().getTable();
     ConcurrentMap<byte[], RegionLocations> tableLocations = getTableLocations(tableName);
-    RegionLocations rll = tableLocations.get(location.getRegionInfo().getStartKey());
-    if (rll == null) {
-      return;
-    }
-    RegionLocations updatedLocations = rll.remove(location);
-    if (updatedLocations.isEmpty()) {
-      tableLocations.remove(location.getRegionInfo().getStartKey(), rll);
-    }
-    if (LOG.isDebugEnabled() && (rll == updatedLocations)) {
-      LOG.debug("Removed " +
-          location.getRegionInfo().getRegionNameAsString() +
-          " for tableName=" + tableName +
-          " from cache");
+    RegionLocations regionLocations = tableLocations.get(location.getRegionInfo().getStartKey());
+    if (regionLocations != null) {
+      RegionLocations updatedLocations = regionLocations.remove(location);
+      boolean removed = false;
+      if (updatedLocations != regionLocations) {
+        if (updatedLocations.isEmpty()) {
+          removed = tableLocations.remove(location.getRegionInfo().getStartKey(), regionLocations);
+        } else {
+          removed = tableLocations.replace(location.getRegionInfo().getStartKey(), regionLocations, updatedLocations);
+        }
+        if (removed && LOG.isTraceEnabled()) {
+          LOG.trace("Removed " + location + " from cache");
+        }
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 6afbe01..ba2417d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -21,6 +21,18 @@
 package org.apache.hadoop.hbase.client;
 
 
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,17 +52,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 import com.google.protobuf.ServiceException;
 
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Caller that goes to replica if the primary region does no answer within a configurable
  * timeout. If the timeout is reached, it calls all the secondary replicas, and returns
@@ -113,11 +114,11 @@ public class RpcRetryingCallerWithReadReplicas {
       }
 
       if (reload || location == null) {
-        RegionLocations rl = getRegionLocations(false);
+        RegionLocations rl = getRegionLocations(false, id);
         location = id < rl.size() ? rl.getRegionLocation(id) : null;
       }
 
-      if (location == null) {
+      if (location == null || location.getServerName() == null) {
         // With this exception, there will be a retry. The location can be null for a replica
         //  when the table is created or after a split.
         throw new HBaseIOException("There is no location for replica id #" + id);
@@ -188,30 +189,61 @@ public class RpcRetryingCallerWithReadReplicas {
    */
   public synchronized Result call()
       throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
-    RegionLocations rl = getRegionLocations(true);
+    RegionLocations rl = getRegionLocations(true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
     BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool, rl.size());
 
-    addCallsForReplica(cs, rl, 0, 0); // primary.
-
+    List<ExecutionException> exceptions = null;
+    int submitted = 0, completed = 0;
+    // submit call for the primary replica.
+    submitted += addCallsForReplica(cs, rl, 0, 0);
     try {
+      // wait for the timeout to see whether the primary responds back
       Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
-      if (f == null) {
-        addCallsForReplica(cs, rl, 1, rl.size() - 1);  // secondaries
-        f = cs.take();
+      if (f != null) {
+        return f.get(); //great we got a response
       }
-      return f.get();
     } catch (ExecutionException e) {
-      throwEnrichedException(e);
-      return null; // unreachable
+      // the primary call failed with RetriesExhaustedException or DoNotRetryIOException
+      // but the secondaries might still succeed. Continue on the replica RPCs.
+      exceptions = new ArrayList<ExecutionException>(rl.size());
+      exceptions.add(e);
+      completed++;
+    } catch (CancellationException e) {
+      throw new InterruptedIOException();
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException();
+    }
+
+    // submit call for the all of the secondaries at once
+    // TODO: this may be an overkill for large region replication
+    submitted += addCallsForReplica(cs, rl, 1, rl.size() - 1);
+    try {
+      while (completed < submitted) {
+        try {
+          Future<Result> f = cs.take();
+          return f.get(); // great we got an answer
+        } catch (ExecutionException e) {
+          // if not cancel or interrupt, wait until all RPC's are done
+          // one of the tasks failed. Save the exception for later.
+          if (exceptions == null) exceptions = new ArrayList<ExecutionException>(rl.size());
+          exceptions.add(e);
+          completed++;
+        }
+      }
     } catch (CancellationException e) {
       throw new InterruptedIOException();
     } catch (InterruptedException e) {
       throw new InterruptedIOException();
     } finally {
       // We get there because we were interrupted or because one or more of the
-      //  calls succeeded or failed. In all case, we stop all our tasks.
+      // calls succeeded or failed. In all case, we stop all our tasks.
       cs.cancelAll(true);
     }
+
+    if (exceptions != null && !exceptions.isEmpty()) {
+      throwEnrichedException(exceptions.get(0)); // just rethrow the first exception for now.
+    }
+    return null; // unreachable
   }
 
   /**
@@ -248,8 +280,9 @@ public class RpcRetryingCallerWithReadReplicas {
    * @param rl         - the region locations
    * @param min        - the id of the first replica, inclusive
    * @param max        - the id of the last replica, inclusive.
+   * @return the number of submitted calls
    */
-  private void addCallsForReplica(BoundedCompletionService<Result> cs,
+  private int addCallsForReplica(BoundedCompletionService<Result> cs,
                                   RegionLocations rl, int min, int max) {
     for (int id = min; id <= max; id++) {
       HRegionLocation hrl = rl.getRegionLocation(id);
@@ -257,21 +290,22 @@ public class RpcRetryingCallerWithReadReplicas {
       RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
       cs.submit(retryingOnReplica);
     }
+    return max - min + 1;
   }
 
-  private RegionLocations getRegionLocations(boolean useCache)
-      throws RetriesExhaustedException, DoNotRetryIOException {
+  private RegionLocations getRegionLocations(boolean useCache, int replicaId)
+      throws RetriesExhaustedException, DoNotRetryIOException, InterruptedIOException {
     RegionLocations rl;
     try {
-      rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true);
+      rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true, replicaId);
+    } catch (DoNotRetryIOException e) {
+      throw e;
+    } catch (RetriesExhaustedException e) {
+      throw e;
+    } catch (InterruptedIOException e) {
+      throw e;
     } catch (IOException e) {
-      if (e instanceof DoNotRetryIOException) {
-        throw (DoNotRetryIOException) e;
-      } else if (e instanceof RetriesExhaustedException) {
-        throw (RetriesExhaustedException) e;
-      } else {
-        throw new RetriesExhaustedException("Can't get the location", e);
-      }
+      throw new RetriesExhaustedException("Can't get the location", e);
     }
     if (rl == null) {
       throw new RetriesExhaustedException("Can't get the locations");
@@ -279,4 +313,4 @@ public class RpcRetryingCallerWithReadReplicas {
 
     return rl;
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java
index 603f8d5..c9257d7 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/TestRegionLocations.java
@@ -46,25 +46,25 @@ public class TestRegionLocations {
 
     list = hrll((HRegionLocation)null);
     assertTrue(list.isEmpty());
-    assertEquals(0, list.size());
+    assertEquals(1, list.size());
     assertEquals(0, list.numNonNullElements());
 
     HRegionInfo info0 = hri(0);
     list = hrll(hrl(info0, null));
-    assertFalse(list.isEmpty());
+    assertTrue(list.isEmpty());
     assertEquals(1, list.size());
-    assertEquals(1, list.numNonNullElements());
+    assertEquals(0, list.numNonNullElements());
 
     HRegionInfo info9 = hri(9);
     list = hrll(hrl(info9, null));
-    assertFalse(list.isEmpty());
+    assertTrue(list.isEmpty());
     assertEquals(10, list.size());
-    assertEquals(1, list.numNonNullElements());
+    assertEquals(0, list.numNonNullElements());
 
     list = hrll(hrl(info0, null), hrl(info9, null));
-    assertFalse(list.isEmpty());
+    assertTrue(list.isEmpty());
     assertEquals(10, list.size());
-    assertEquals(2, list.numNonNullElements());
+    assertEquals(0, list.numNonNullElements());
   }
 
   private HRegionInfo hri(int replicaId) {
@@ -100,7 +100,7 @@ public class TestRegionLocations {
     list = hrll(hrl(info0, sn0));
     assertTrue(list == list.removeByServer(sn1));
     list = list.removeByServer(sn0);
-    assertTrue(list.isEmpty());
+    assertEquals(0, list.numNonNullElements());
 
     // test remove from multi element list
     list = hrll(hrl(info0, sn0), hrl(info1, sn1), hrl(info2, sn2), hrl(info9, sn2));
@@ -226,7 +226,7 @@ public class TestRegionLocations {
     list1 = list2.mergeLocations(list1);
     assertEquals(sn0, list1.getRegionLocation(0).getServerName());
     assertEquals(sn1, list1.getRegionLocation(1).getServerName());
-    assertEquals(sn2, list1.getRegionLocation(2).getServerName());
+    assertEquals(2, list1.size()); // the size is taken from the argument list to merge
 
     // do the other way merge as well
     list1 = hrll(hrl(info0, sn0), hrl(info1, sn1));
@@ -240,10 +240,9 @@ public class TestRegionLocations {
     list1 = hrll(hrl(info0, sn0), hrl(info1, sn1));
     list2 = hrll(hrl(info0, sn2), hrl(info1, sn2), hrl(info9, sn3));
     list1 = list2.mergeLocations(list1); // list1 should override
-    assertEquals(10, list1.size());
+    assertEquals(2, list1.size());
     assertEquals(sn0, list1.getRegionLocation(0).getServerName());
     assertEquals(sn1, list1.getRegionLocation(1).getServerName());
-    assertEquals(sn3, list1.getRegionLocation(9).getServerName());
 
     // do the other way
     list1 = hrll(hrl(info0, sn0), hrl(info1, sn1));
@@ -272,4 +271,35 @@ public class TestRegionLocations {
     assertEquals(sn2, list1.getRegionLocation(1).getServerName());
     assertEquals(sn3, list1.getRegionLocation(9).getServerName());
   }
+
+  @Test
+  public void testConstructWithNullElements() {
+    // RegionLocations can contain null elements as well. These null elements can
+
+    RegionLocations list = new RegionLocations((HRegionLocation)null);
+    assertTrue(list.isEmpty());
+    assertEquals(1, list.size());
+    assertEquals(0, list.numNonNullElements());
+
+    list = new RegionLocations(null, hrl(info1, sn0));
+    assertFalse(list.isEmpty());
+    assertEquals(2, list.size());
+    assertEquals(1, list.numNonNullElements());
+
+    list = new RegionLocations(hrl(info0, sn0), null);
+    assertEquals(2, list.size());
+    assertEquals(1, list.numNonNullElements());
+
+    list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0));
+    assertEquals(10, list.size());
+    assertEquals(2, list.numNonNullElements());
+
+    list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null);
+    assertEquals(11, list.size());
+    assertEquals(2, list.numNonNullElements());
+
+    list = new RegionLocations(null, hrl(info2, sn0), null, hrl(info9, sn0), null, null);
+    assertEquals(12, list.size());
+    assertEquals(2, list.numNonNullElements());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
index 514505b..d89d337 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
@@ -34,11 +34,12 @@ import java.util.concurrent.TimeUnit;
  * A completion service, close to the one available in the JDK 1.7
  * However, this ones keeps the list of the future, and allows to cancel them all.
  * This means as well that it can be used for a small set of tasks only.
+ * <br>Implementation is not Thread safe.
  */
 public class BoundedCompletionService<V> {
   private final Executor executor;
-  private final List<Future<V>> sent; // alls the call we sent
-  private final BlockingQueue<Future<V>> completed; // all the results we got so far.
+  private final List<Future<V>> tasks; // alls the tasks
+  private final BlockingQueue<Future<V>> completed; // all the tasks that are completed
 
   class QueueingFuture extends FutureTask<V> {
 
@@ -46,6 +47,7 @@ public class BoundedCompletionService<V> {
       super(callable);
     }
 
+    @Override
     protected void done() {
       completed.add(QueueingFuture.this);
     }
@@ -53,7 +55,7 @@ public class BoundedCompletionService<V> {
 
   public BoundedCompletionService(Executor executor, int maxTasks) {
     this.executor = executor;
-    this.sent = new ArrayList<Future<V>>(maxTasks);
+    this.tasks = new ArrayList<Future<V>>(maxTasks);
     this.completed = new ArrayBlockingQueue<Future<V>>(maxTasks);
   }
 
@@ -61,7 +63,7 @@ public class BoundedCompletionService<V> {
   public Future<V> submit(Callable<V> task) {
     QueueingFuture newFuture = new QueueingFuture(task);
     executor.execute(newFuture);
-    sent.add(newFuture);
+    tasks.add(newFuture);
     return newFuture;
   }
 
@@ -74,7 +76,7 @@ public class BoundedCompletionService<V> {
   }
 
   public void cancelAll(boolean interrupt) {
-    for (Future<V> future : sent) {
+    for (Future<V> future : tasks) {
       future.cancel(interrupt);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index 0b06a4e..18747c9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * Thread Utility
@@ -38,6 +39,16 @@ import org.apache.hadoop.util.ReflectionUtils;
 public class Threads {
   protected static final Log LOG = LogFactory.getLog(Threads.class);
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
+
+  private static UncaughtExceptionHandler LOGGING_EXCEPTION_HANDLER =
+    new UncaughtExceptionHandler() {
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      LOG.warn("Thread:" + t + " exited with Exception:"
+          + StringUtils.stringifyException(e));
+    }
+  };
+
   /**
    * Utility method that sets name, daemon status and starts passed thread.
    * @param t thread to run
@@ -160,15 +171,15 @@ public class Threads {
   }
 
   /**
-   * Create a new CachedThreadPool with a bounded number as the maximum 
+   * Create a new CachedThreadPool with a bounded number as the maximum
    * thread size in the pool.
-   * 
+   *
    * @param maxCachedThread the maximum thread could be created in the pool
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @param threadFactory the factory to use when creating new threads
-   * @return threadPoolExecutor the cachedThreadPool with a bounded number 
-   * as the maximum thread size in the pool. 
+   * @return threadPoolExecutor the cachedThreadPool with a bounded number
+   * as the maximum thread size in the pool.
    */
   public static ThreadPoolExecutor getBoundedCachedThreadPool(
       int maxCachedThread, long timeout, TimeUnit unit,
@@ -180,8 +191,8 @@ public class Threads {
     boundedCachedThreadPool.allowCoreThreadTimeOut(true);
     return boundedCachedThreadPool;
   }
-  
-  
+
+
   /**
    * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
    * with a common prefix.
@@ -230,6 +241,8 @@ public class Threads {
         Thread t = namedFactory.newThread(r);
         if (handler != null) {
           t.setUncaughtExceptionHandler(handler);
+        } else {
+          t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
         }
         if (!t.isDaemon()) {
           t.setDaemon(true);
@@ -242,4 +255,11 @@ public class Threads {
 
     };
   }
+
+  /** Sets an UncaughtExceptionHandler for the thread which logs the
+   * Exception stack if the thread dies.
+   */
+  public static void setLoggingUncaughtExceptionHandler(Thread t) {
+    t.setUncaughtExceptionHandler(LOGGING_EXCEPTION_HANDLER);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
index 646c86d..aef9f4e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
@@ -244,6 +244,12 @@ class CoprocessorHConnection implements ClusterConnection {
   }
 
   @Override
+  public HRegionLocation relocateRegion(TableName tableName, byte[] row, int replicaId)
+      throws IOException {
+    return delegate.relocateRegion(tableName, row, replicaId);
+  }
+
+  @Override
   public HRegionLocation relocateRegion(byte[] tableName, byte[] row) throws IOException {
     return delegate.relocateRegion(tableName, row);
   }
@@ -294,6 +300,12 @@ class CoprocessorHConnection implements ClusterConnection {
   }
 
   @Override
+  public RegionLocations locateRegion(TableName tableName, byte[] row, boolean useCache,
+      boolean retry, int replicaId) throws IOException {
+    return delegate.locateRegion(tableName, row, useCache, retry, replicaId);
+  }
+
+  @Override
   public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
       throws IOException {
     return delegate.locateRegions(tableName, useCache, offlined);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index b9fe633..bf7a93b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -325,7 +325,7 @@ public class TestReplicaWithCluster {
     RegionServerCallable<Void> callable = new RegionServerCallable<Void>(
       conn, hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0)) {
         @Override
-        public Void call() throws Exception {
+        public Void call(int timeout) throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
             + Bytes.toStringBinary(getRow()));
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
@@ -337,7 +337,7 @@ public class TestReplicaWithCluster {
       };
     RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
     RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
-    caller.callWithRetries(callable);
+    caller.callWithRetries(callable, 10000);
 
     // verify we can read them from the primary
     LOG.debug("Verifying data load");

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
index cc87800..5bafd09 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedReader.java
@@ -120,7 +120,9 @@ public class MultiThreadedReader extends MultiThreadedAction
   }
 
   protected HBaseReaderThread createReaderThread(int readerId) throws IOException {
-    return new HBaseReaderThread(readerId);
+    HBaseReaderThread reader = new HBaseReaderThread(readerId);
+    Threads.setLoggingUncaughtExceptionHandler(reader);
+    return reader;
   }
 
   public class HBaseReaderThread extends Thread {

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
index bfe3ebd..80e0d52 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
@@ -73,6 +73,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
   protected void createWriterThreads(int numThreads) throws IOException {
     for (int i = 0; i < numThreads; ++i) {
       HBaseWriterThread writer = new HBaseWriterThread(i);
+      Threads.setLoggingUncaughtExceptionHandler(writer);
       writers.add(writer);
     }
   }
@@ -89,6 +90,7 @@ public class MultiThreadedWriter extends MultiThreadedWriterBase {
       return new HTable(conf, tableName);
     }
 
+    @Override
     public void run() {
       try {
         long rowKeyBase;

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad05de17/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
index 9373e6f..340f5f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriterBase.java
@@ -101,8 +101,8 @@ public abstract class MultiThreadedWriterBase extends MultiThreadedAction {
     if (cached != null) {
       result = "cached: " + cached.toString();
     }
-    if (real != null) {
-      if (real.equals(cached)) {
+    if (real != null && real.getServerName() != null) {
+      if (cached != null && cached.getServerName() != null && real.equals(cached)) {
         result += "; cache is up to date";
       } else {
         result = (cached != null) ? (result + "; ") : "";