You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/08/28 22:58:20 UTC

svn commit: r1378337 - in /hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase: client/HConnectionManager.java client/HTableMultiplexer.java client/PreemptiveFastFailException.java client/ServerCallable.java ipc/HBaseClient.java

Author: mbautin
Date: Tue Aug 28 20:58:20 2012
New Revision: 1378337

URL: http://svn.apache.org/viewvc?rev=1378337&view=rev
Log:
[HBASE-6629] [0.89-fb] Fast fail, without retries, if the client has not been able to contact a server in a while

Author: aaiyer

Summary:
New way to improve client side waiting for a unresponsive regionserver.
 1) keep count of repeated connect failures to a server.
 2) throw RepeatedConnectException instead of ConnectException.
 This contains more deatils such as
 - # of failures
 - time of first failure
 show rpcRetryTimetout and current time

Test Plan:
Run unit tests on MR to test for existing stuff. -- All pass.

 Deploy to dev cluster.
   - run a load tester job,
   - kill one of the RS during the load testing job
   - see that the RepeatedConnectExceptions get thrown
   - see the # of ops should be better with the diff.
 Graph for #ops and latency (ms) uploaded to
https://www.facebook.com/pxlcld/8hnw

https://www.facebook.com/pxlcld/8hnh

Reviewers: kranganathan, kannan, avf

Reviewed By: kranganathan

CC: hbase-eng@, riyer, anshumansingh26, liujiakai

Differential Revision: https://phabricator.fb.com/D535439

Task ID: 1285514

Added:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue Aug 28 20:58:20 2012
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
 import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -39,12 +40,15 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -376,6 +380,44 @@ public class HConnectionManager {
       cachedRegionLocations =
         new HashMap<Integer, SortedMap<byte [], HRegionLocation>>();
 
+    // amount of time to wait before we consider a server to be in fast fail mode
+    private long fastFailThresholdMilliSec;
+    // Keeps track of failures when we cannot talk to a server. Helps in
+    // fast failing clients if the server is down for a long time.
+    private final ConcurrentMap<HServerAddress, FailureInfo> repeatedFailuresMap =
+      new ConcurrentHashMap<HServerAddress, FailureInfo>();
+    // We populate repeatedFailuresMap every time there is a failure. So, to keep it
+    // from growing unbounded, we garbage collect the failure information
+    // every cleanupInterval.
+    private final long failureMapCleanupIntervalMilliSec;
+    private volatile long lastFailureMapCleanupTimeMilliSec;
+    // Amount of time that has to pass, before we clear region -> regionserver cache
+    // again, when in fast fail mode.
+    private long cacheClearingTimeoutMilliSec;
+
+    /**
+     * Keeps track of repeated failures to any region server.
+     * @author amitanand.s
+     *
+     */
+    private class FailureInfo {
+      // The number of consecutive failures.
+      private final AtomicLong numConsecutiveFailures = new AtomicLong();
+      // The time when the server started to become unresponsive
+      // Once set, this would never be updated.
+      private long timeOfFirstFailureMilliSec;
+      // The time when the client last tried to contact the server.
+      // This is only updated by one client at a time
+      private volatile long timeOfLatestAttemptMilliSec;
+      // The time when the client last cleared cache for regions assigned
+      // to the server. Used to ensure we don't clearCache too often.
+      private volatile long timeOfLatestCacheClearMilliSec;
+      // Used to keep track of concurrent attempts to contact the server.
+      // In Fast fail mode, we want just one client thread to try to connect
+      // the rest of the client threads will fail fast.
+      private final AtomicBoolean
+          exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(false);
+    }
     // The presence of a server in the map implies it's likely that there is an
     // entry in cachedRegionLocations that map to this server; but the absence
     // of a server in this map guarentees that there is no entry in cache that
@@ -426,6 +468,12 @@ public class HConnectionManager {
           HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
       this.rpcRetryTimeout = conf.getLong("hbase.client.rpc.retry.timeout",
           Long.MAX_VALUE);
+      this.cacheClearingTimeoutMilliSec = conf.getLong("hbase.client.fastfail.cache.clear.interval",
+          10000); // 10 sec
+      this.fastFailThresholdMilliSec = conf.getLong("hbase.client.fastfail.threshold",
+          60000); // 1 min
+      this.failureMapCleanupIntervalMilliSec = conf.getLong(
+          "hbase.client.fastfail.cleanup.map.interval.millisec", 600000); // 10 min
 
       this.prefetchRegionLimit = conf.getInt("hbase.client.prefetch.limit",
           10);
@@ -863,10 +911,31 @@ public class HConnectionManager {
             + Bytes.toStringBinary(row) + " after " + numRetries + " tries.");
         }
 
+        FailureInfo fInfo = null;
+        HServerAddress server = null;
+        boolean didTry = false;
+        boolean couldNotCommunicateWithServer = false;
+        boolean retryDespiteFastFailMode = false;
         try {
           // locate the root or meta region
           HRegionLocation metaLocation = locateRegion(parentTable, metaKey);
-          HRegionInterface server =
+
+          server = metaLocation.getServerAddress();
+          // Handle the case where .META. is on an unresponsive server.
+          if (inFastFailMode(server)) {
+            // In Fast-fail mode, all but one thread will fast fail. Check
+            // if we are that one chosen thread.
+            fInfo = repeatedFailuresMap.get(server);
+            retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+
+            if (retryDespiteFastFailMode == false) { // we don't have to retry
+              throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+                  fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+            }
+          }
+          didTry = true;
+
+          HRegionInterface serverInterface =
             getHRegionConnection(metaLocation.getServerAddress());
 
           Result regionInfoRow = null;
@@ -895,7 +964,7 @@ public class HConnectionManager {
             }
 
           // Query the root or meta region for the location of the meta region
-            regionInfoRow = server.getClosestRowBefore(
+            regionInfoRow = serverInterface.getClosestRowBefore(
             metaLocation.getRegionInfo().getRegionName(), metaKey,
             HConstants.CATALOG_FAMILY);
           }
@@ -948,6 +1017,9 @@ public class HConnectionManager {
           if (e instanceof RemoteException) {
             e = RemoteExceptionHandler.decodeRemoteException(
                 (RemoteException) e);
+          } else if (isNetworkException(e)) {
+            couldNotCommunicateWithServer = true;
+            handleFailureToServer(server);
           }
           if (tries < numRetries - 1) {
             if (LOG.isDebugEnabled()) {
@@ -963,6 +1035,9 @@ public class HConnectionManager {
               e instanceof NoServerForRegionException)) {
             relocateRegion(parentTable, metaKey);
           }
+        } finally {
+          updateFailureInfoForServer(server, fInfo, didTry,
+              couldNotCommunicateWithServer, retryDespiteFastFailMode);
         }
         try{
           Thread.sleep(getPauseTime(tries));
@@ -972,6 +1047,24 @@ public class HConnectionManager {
       }
     }
 
+    /**
+     * Check if the exception is something that indicates that we cannot
+     * contact/communicate with the server.
+     *
+     * @param e
+     * @return
+     */
+    private boolean isNetworkException(Throwable e) {
+      // This list covers most connectivity exceptions but not all.
+      // For example, in SocketOutputStream a plain IOException is thrown
+      // at times when the channel is closed.
+      return (e instanceof SocketTimeoutException ||
+              e instanceof ConnectException ||
+              e instanceof ClosedChannelException ||
+              e instanceof SyncFailedException ||
+              e instanceof EOFException);
+    }
+
     /*
      * Search the cache for a location that fits our table and row key.
      * Return null if no suitable region is located. TODO: synchronization note
@@ -1082,6 +1175,7 @@ public class HConnectionManager {
     private void clearCachedLocationForServer(
         final String server) {
       boolean deletedSomething = false;
+
       synchronized (this.cachedRegionLocations) {
         if (!cachedServers.contains(server)) {
           return;
@@ -1345,14 +1439,16 @@ public class HConnectionManager {
 
           // If we are not supposed to retry; Let it pass through.
           throw ioe;
+        } catch (RegionOverloadedException roe) {
+          exceptions.add(roe);
+          serverRequestedWaitTime = roe.getBackoffTimeMillis();
+          continue;
+        } catch (PreemptiveFastFailException pfe) {
+          // Bail out of the retry loop, if the host has been consistently unreachable.
+          throw pfe;
         } catch (Throwable t) {
           exceptions.add(t);
 
-          if (t instanceof RegionOverloadedException) {
-            serverRequestedWaitTime = ((RegionOverloadedException)t).getBackoffTimeMillis();
-            continue;
-          }
-
           if (tries == numRetries - 1) {
             throw new RetriesExhaustedException(callable.getServerName(),
                 callable.getRegionName(), callable.getRow(), tries, exceptions);
@@ -1369,6 +1465,7 @@ public class HConnectionManager {
 
           if (prevLoc.getServerAddress().
               equals(callable.location.getServerAddress())) {
+            // Bail out of the retry loop if we have to wait too long
             long pauseTime = getPauseTime(tries);
             if ((System.currentTimeMillis() - callStartTime + pauseTime) >
                  rpcRetryTimeout) {
@@ -1402,6 +1499,7 @@ public class HConnectionManager {
         return getRegionServerWithoutRetries(callable, true);
     }
 
+
     /**
      * Pass in a ServerCallable with your particular bit of logic defined and
      * this method will pass it to the defined region server.
@@ -1410,32 +1508,46 @@ public class HConnectionManager {
      * @return an object of type T
      * @throws IOException if a remote or network exception occurs
      * @throws RuntimeException other unspecified error
+     * @throws PreemptiveFastFailException if the remote host has been known to be
+     *         unreachable for more than this.fastFailThresholdMilliSec.
      */
     private <T> T getRegionServerWithoutRetries(ServerCallable<T> callable,
         boolean instantiateRegionLocation)
-        throws IOException, RuntimeException {
+        throws IOException, RuntimeException, PreemptiveFastFailException {
+      FailureInfo fInfo = null;
+      HServerAddress server = null;
+      boolean didTry = false;
+      boolean couldNotCommunicateWithServer = false;
+      boolean retryDespiteFastFailMode = false;
       try {
         if (instantiateRegionLocation) callable.instantiateRegionLocation(false);
+
+        // Logic to fast fail requests to unreachable servers.
+        server = callable.getServerAddress();
+        if (inFastFailMode(server)) {
+          // In Fast-fail mode, all but one thread will fast fail. Check
+          // if we are that one chosen thread.
+          fInfo = repeatedFailuresMap.get(server);
+          retryDespiteFastFailMode = shouldRetryInspiteOfFastFail(fInfo);
+          if (retryDespiteFastFailMode == false) { // we don't have to retry
+            throw new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+                fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+          }
+        }
+        didTry = true;
+
         callable.instantiateServer();
         return callable.call();
+      } catch (PreemptiveFastFailException pfe) {
+        throw pfe;
       } catch (Throwable t1) {
         Throwable t2 = translateException(t1);
         boolean isLocalException = !(t2 instanceof RemoteException);
         // translateException throws DoNotRetryException or any
         // non-IOException.
-        if (isLocalException && (t2 instanceof SocketTimeoutException ||
-            t2 instanceof ConnectException ||
-            t2 instanceof ClosedChannelException ||
-            t2 instanceof SyncFailedException ||
-            t2 instanceof EOFException)) {
-          // XXX this list covers most connectivity exceptions but not all.
-          // For example, in SocketOutputStream a plain IOException is thrown
-          // at times when the channel is closed.
-
-          // if thrown these exceptions, we clear all the cache entries that
-          // map to that slow/dead server; otherwise, let cache miss and ask
-          // .META. again to find the new location
-          clearCachedLocationForServer(callable.location.getServerAddress().toString());
+        if (isLocalException && isNetworkException(t2)) {
+          couldNotCommunicateWithServer = true;
+          handleFailureToServer(server);
         }
 
         if (t2 instanceof IOException) {
@@ -1443,9 +1555,142 @@ public class HConnectionManager {
         } else {
           throw new RuntimeException(t2);
         }
+      } finally {
+        updateFailureInfoForServer(server, fInfo, didTry,
+            couldNotCommunicateWithServer, retryDespiteFastFailMode);
       }
     }
 
+    /**
+     * Handles failures encountered when communicating with a server.
+     *
+     * Updates the FailureInfo in repeatedFailuresMap to reflect the
+     * failure. Throws RepeatedConnectException if the client is in
+     * Fast fail mode.
+     *
+     * @param server
+     * @throws PreemptiveFastFailException
+     */
+    private void handleFailureToServer(HServerAddress server)
+        throws PreemptiveFastFailException {
+      if (server == null) return;
+
+      long currentTime = System.currentTimeMillis();
+      FailureInfo fInfo = repeatedFailuresMap.get(server);
+      if (fInfo == null) {
+        fInfo = new FailureInfo();
+        fInfo.timeOfFirstFailureMilliSec = currentTime;
+        fInfo = repeatedFailuresMap.putIfAbsent(server, fInfo);
+      }
+      fInfo.timeOfLatestAttemptMilliSec = currentTime;
+      fInfo.numConsecutiveFailures.incrementAndGet();
+
+      if (inFastFailMode(server)) {
+          // In FastFail mode, do not clear out the cache if it was done recently.
+          if (currentTime > fInfo.timeOfLatestCacheClearMilliSec + cacheClearingTimeoutMilliSec) {
+            fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+            clearCachedLocationForServer(server.toString());
+          }
+          throw  new PreemptiveFastFailException(fInfo.numConsecutiveFailures.get(),
+            fInfo.timeOfFirstFailureMilliSec, fInfo.timeOfLatestAttemptMilliSec);
+      }
+
+      // if thrown these exceptions, we clear all the cache entries that
+      // map to that slow/dead server; otherwise, let cache miss and ask
+      // .META. again to find the new location
+      fInfo.timeOfLatestCacheClearMilliSec = currentTime;
+      clearCachedLocationForServer(server.toString());
+    }
+
+    /**
+     * Occasionally cleans up unused information in repeatedFailuresMap.
+     *
+     * repeatedFailuresMap stores the failure information for all
+     * remote hosts that had failures. In order to avoid these from growing
+     * indefinitely, occassionallyCleanupFailureInformation() will clear these up once
+     * every cleanupInterval ms.
+     */
+    private void occasionallyCleanupFailureInformation() {
+      long now = System.currentTimeMillis();
+      if (!(now > lastFailureMapCleanupTimeMilliSec + failureMapCleanupIntervalMilliSec))
+        return;
+
+      // remove entries that haven't been attempted in a while
+      // No synchronization needed. It is okay if multiple threads try to
+      // remove the entry again and again from a concurrent hash map.
+      lastFailureMapCleanupTimeMilliSec = now;
+      for(Entry<HServerAddress, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
+        if (now > entry.getValue().timeOfLatestAttemptMilliSec
+            + failureMapCleanupIntervalMilliSec) {
+          repeatedFailuresMap.remove(entry.getKey());
+        }
+      }
+    }
+
+    /**
+     * Checks to see if we are in the Fast fail mode for requests to the server.
+     *
+     * If a client is unable to contact a server for more than fastFailThresholdMilliSec
+     * the client will get into fast fail mode.
+     *
+     * @param server
+     * @return true if the client is in fast fail mode for the server.
+     */
+    private boolean inFastFailMode(HServerAddress server) {
+      FailureInfo fInfo = repeatedFailuresMap.get(server);
+      // if fInfo is null --> The server is considered good.
+      // If the server is bad, wait long enough to believe that the server is down.
+      return  (fInfo != null && System.currentTimeMillis() >
+            fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec);
+    }
+
+    /**
+     * Check to see if the client should try to connnect to the server, inspite of
+     * knowing that it is in the fast fail mode.
+     *
+     * The idea here is that we want just one client thread to be actively trying to
+     * reconnect, while all the other threads trying to reach the server will short circuit.
+     *
+     * @param fInfo
+     * @return true if the client should try to connect to the server.
+     */
+    private boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+      // We believe that the server is down, But, we want to have just one client
+      // actively trying to connect. If we are the chosen one, we will retry
+      // and not throw an exception.
+      return  (fInfo != null &&
+          fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true));
+    }
+
+    /**
+     * updates the failure information for the server.
+     *
+     * @param server
+     * @param fInfo
+     * @param couldNotCommunicate
+     * @param retryDespiteFastFailMode
+     */
+    private void updateFailureInfoForServer(HServerAddress server, FailureInfo fInfo,
+        boolean didTry, boolean couldNotCommunicate, boolean retryDespiteFastFailMode) {
+      if (server == null || fInfo == null || didTry == false) return;
+
+      // If we were able to connect to the server, reset the failure information.
+      if (couldNotCommunicate == false) {
+        repeatedFailuresMap.remove(server);
+      } else {
+        // update time of last attempt
+        long currentTime = System.currentTimeMillis();
+        fInfo.timeOfLatestAttemptMilliSec = currentTime;
+
+        // Release the lock if we were retrying inspite of FastFail
+        if (retryDespiteFastFailMode) {
+          fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+        }
+      }
+
+      occasionallyCleanupFailureInformation();
+    }
+
     private <R> Callable<MultiResponse> createMultiActionCallable(final HServerAddress address,
         final MultiAction multi, final byte [] tableName,
         final HBaseRPCOptions options) {
@@ -2123,6 +2368,11 @@ public class HConnectionManager {
               toThrow = roe;
             }
           }
+
+          if (singleServer &&
+              ex.getCause() instanceof PreemptiveFastFailException) {
+            throw (PreemptiveFastFailException)ex.getCause();
+          }
         }
 
         // For each region
@@ -2222,6 +2472,8 @@ public class HConnectionManager {
             serverRequestedWaitTime = ex.getBackoffTimeMillis();
             // do not clear the list
             continue;
+        } catch (PreemptiveFastFailException pfe) {
+          throw pfe;
         }
 
         list.clear();

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java Tue Aug 28 20:58:20 2012
@@ -403,7 +403,19 @@ public class HTableMultiplexer {
             }
             
             // Process this multiput request
-            List<Put> failed = connection.processListOfMultiPut(Arrays.asList(mput), null, options);
+            List<Put> failed = null;
+            try {
+              failed = connection.processListOfMultiPut(Arrays.asList(mput), null, options);
+            } catch(PreemptiveFastFailException e) {
+              // Client is not blocking on us. So, let us treat this
+              // as a normal failure, and retry.
+              for (PutStatus putStatus: processingList) {
+                if (!resubmitFailedPut(putStatus, this.addr)) {
+                  failedCount++;
+                }
+              }
+            }
+
             if (failed != null) {
               if (failed.size() == processingList.size()) {
                 // All the puts for this region server are failed. Going to retry it later

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java?rev=1378337&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailException.java Tue Aug 28 20:58:20 2012
@@ -0,0 +1,43 @@
+/**
+ *
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.net.ConnectException;
+
+/**
+ * Thrown when the client believes that we are trying to communicate to has
+ * been repeatedly unresponsive for a while.
+ *
+ * On receiving such an exception. The HConnectionManager will skip all
+ * retries and fast fail the operation.
+ */
+public class PreemptiveFastFailException extends ConnectException {
+  private static final long serialVersionUID = 7129103682617007177L;
+  private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
+
+  /**
+   * @param count
+   * @param timeOfFirstFailureMilliSec
+   * @param timeOfLatestAttemptMilliSec
+   */
+  public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
+      long timeOfLatestAttemptMilliSec) {
+    super("Exception happened " + count + " times.");
+    this.failureCount = count;
+    this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
+    this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
+  }
+
+  public long getFirstFailureAt() {
+    return timeOfFirstFailureMilliSec;
+  }
+
+  public long getLastAttemptAt() {
+    return timeOfLatestAttemptMilliSec;
+  }
+
+  public long getFailureCount() {
+    return failureCount;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ServerCallable.java Tue Aug 28 20:58:20 2012
@@ -21,6 +21,7 @@
 package org.apache.hadoop.hbase.client;
 
 import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
@@ -89,6 +90,14 @@ public abstract class ServerCallable<T> 
     return location.getServerAddress().toString();
   }
 
+  /** @return the server address */
+  public HServerAddress getServerAddress() {
+    if (location == null) {
+      return null;
+    }
+    return location.getServerAddress();
+  }
+
   /** @return the region name */
   public byte[] getRegionName() {
     if (location == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java?rev=1378337&r1=1378336&r2=1378337&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HBaseClient.java Tue Aug 28 20:58:20 2012
@@ -41,6 +41,8 @@ import java.net.UnknownHostException;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -77,8 +79,9 @@ public class HBaseClient {
 
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.ipc.HBaseClient");
-  protected final Hashtable<ConnectionId, Connection> connections =
-    new Hashtable<ConnectionId, Connection>();
+  // Active connections are stored in connections.
+  protected final ConcurrentMap<ConnectionId, Connection> connections =
+    new ConcurrentHashMap<ConnectionId, Connection>();
 
   protected final Class<? extends Writable> valueClass;   // class of call values
   protected int counter;                            // counter for call ids
@@ -215,9 +218,14 @@ public class HBaseClient {
      * @param call to add
      * @return true if the call was added.
      */
-    protected synchronized boolean addCall(Call call) {
-      if (shouldCloseConnection.get())
+    protected synchronized boolean addCall(Call call) throws IOException {
+      if (shouldCloseConnection.get()) {
+        // If there was something bad. Let not the next thread spend a while
+        // figuring it out.
+        if (closeException != null)
+          throw closeException;
         return false;
+      }
       calls.put(call.id, call);
       notify();
       return true;
@@ -285,9 +293,12 @@ public class HBaseClient {
      * @throws java.io.IOException e
      */
     protected synchronized void setupIOstreams(byte version) throws IOException {
-      if (socket != null || shouldCloseConnection.get()) {
+      if (socket != null) { // somebody already set up the IOStreams
         return;
       }
+      if (shouldCloseConnection.get()) { // somebody already tried and failed.
+        throw closeException;
+      }
 
       short ioFailures = 0;
       short timeoutFailures = 0;
@@ -301,7 +312,7 @@ public class HBaseClient {
             this.socket.setTcpNoDelay(tcpNoDelay);
             this.socket.setKeepAlive(tcpKeepAlive);
             NetUtils.connect(this.socket, remoteId.getAddress(),
-			connectionTimeOutMillSec);
+                connectionTimeOutMillSec);
             if (remoteId.rpcTimeout > 0) {
               pingInterval = remoteId.rpcTimeout; // overwrite pingInterval
             }
@@ -648,11 +659,7 @@ public class HBaseClient {
 
       // release the resources
       // first thing to do;take the connection out of the connection list
-      synchronized (connections) {
-        if (connections.get(remoteId) == this) {
-          connections.remove(remoteId);
-        }
-      }
+      connections.remove(remoteId, this);
 
       // close the streams and therefore the socket
       IOUtils.closeStream(out);
@@ -759,7 +766,7 @@ public class HBaseClient {
     this.conf = conf;
     this.socketFactory = factory;
     this.connectionTimeOutMillSec =
-	conf.getInt("hbase.client.connection.timeout.millsec", 5000);
+      conf.getInt("hbase.client.connection.timeout.millsec", 5000);
   }
 
   /**
@@ -956,12 +963,13 @@ public class HBaseClient {
     ConnectionId remoteId = new ConnectionId(addr, ticket, rpcTimeout,
         call.getVersion());
     do {
-      synchronized (connections) {
+      connection = connections.get(remoteId);
+      if (connection == null) {
+        // Do not worry about creating a new Connection object if
+        // the hash map was already updated. The unused connection
+        // will be automatically closed after a 10 sec timeout (maxIdleTime).
+        connections.putIfAbsent(remoteId, new Connection(remoteId));
         connection = connections.get(remoteId);
-        if (connection == null) {
-          connection = new Connection(remoteId);
-          connections.put(remoteId, connection);
-        }
       }
     } while (!connection.addCall(call));