You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2014/10/29 06:48:08 UTC

[1/2] Implement Preemptive Fast Fail

Repository: hbase
Updated Branches:
  refs/heads/master 95282f2ea -> ece933fa3


http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
new file mode 100644
index 0000000..a9f5b27
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestFastFailWithoutTestUtil.java
@@ -0,0 +1,613 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.ipc.RemoteException;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestFastFailWithoutTestUtil {
+  private static final Log LOG = LogFactory.getLog(TestFastFailWithoutTestUtil.class);
+
+  @Test
+  public void testInterceptorFactoryMethods() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory(
+        conf);
+
+    RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory
+        .build();
+    assertTrue("We should be getting a PreemptiveFastFailInterceptor",
+        interceptorBeforeCast instanceof PreemptiveFastFailInterceptor);
+    PreemptiveFastFailInterceptor interceptor = (PreemptiveFastFailInterceptor) interceptorBeforeCast;
+
+    RetryingCallerInterceptorContext contextBeforeCast = interceptor
+        .createEmptyContext();
+    assertTrue(
+        "We should be getting a FastFailInterceptorContext since we are interacting with the"
+            + " PreemptiveFastFailInterceptor",
+        contextBeforeCast instanceof FastFailInterceptorContext);
+
+    FastFailInterceptorContext context = (FastFailInterceptorContext) contextBeforeCast;
+    assertTrue(context != null);
+
+    conf = HBaseConfiguration.create();
+    interceptorFactory = new RetryingCallerInterceptorFactory(conf);
+
+    interceptorBeforeCast = interceptorFactory.build();
+    assertTrue(
+        "We should be getting a NoOpRetryableCallerInterceptor since we disabled PFFE",
+        interceptorBeforeCast instanceof NoOpRetryableCallerInterceptor);
+
+    contextBeforeCast = interceptorBeforeCast.createEmptyContext();
+    assertTrue(
+        "We should be getting a NoOpRetryingInterceptorContext from NoOpRetryableCallerInterceptor",
+        contextBeforeCast instanceof NoOpRetryingInterceptorContext);
+
+    assertTrue(context != null);
+  }
+
+  @Test
+  public void testInterceptorContextClear() {
+    PreemptiveFastFailInterceptor interceptor = createPreemptiveInterceptor();
+    FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+        .createEmptyContext();
+    context.clear();
+    assertFalse(context.getCouldNotCommunicateWithServer().booleanValue());
+    assertEquals(context.didTry(), false);
+    assertEquals(context.getFailureInfo(), null);
+    assertEquals(context.getServer(), null);
+    assertEquals(context.getTries(), 0);
+  }
+
+  @Test
+  public void testInterceptorContextPrepare() throws IOException {
+    PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+        .createPreemptiveInterceptor();
+    FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+        .createEmptyContext();
+    RetryingCallable<?> callable = new RegionServerCallable<Boolean>(null,
+        null, null) {
+      @Override
+      public Boolean call(int callTimeout) throws Exception {
+        return true;
+      }
+
+      @Override
+      protected HRegionLocation getLocation() {
+        return new HRegionLocation(null, ServerName.valueOf("localhost", 1234,
+            987654321));
+      }
+    };
+    context.prepare(callable);
+    ServerName server = getSomeServerName();
+    assertEquals(context.getServer(), server);
+    context.clear();
+    context.prepare(callable, 2);
+    assertEquals(context.getServer(), server);
+  }
+
+  @Test
+  public void testInterceptorIntercept50Times() throws IOException,
+      InterruptedException {
+    for (int i = 0; i < 50; i++) {
+      testInterceptorIntercept();
+    }
+  }
+
+  public void testInterceptorIntercept() throws IOException,
+      InterruptedException {
+    Configuration conf = HBaseConfiguration.create();
+    long CLEANUP_TIMEOUT = 50;
+    long FAST_FAIL_THRESHOLD = 10;
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        CLEANUP_TIMEOUT);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        FAST_FAIL_THRESHOLD);
+
+    PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+        .createPreemptiveInterceptor(conf);
+    FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+        .createEmptyContext();
+
+    RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
+
+    // Lets simulate some work flow here.
+    int tries = 0;
+    context.prepare(callable, tries);
+    interceptor.intercept(context);
+    interceptor.handleFailure(context, new ConnectException(
+        "Failed to connect to server"));
+    interceptor.updateFailureInfo(context);
+    assertTrue("Interceptor should have updated didTry to true",
+        context.didTry());
+    assertTrue(
+        "The call shouldn't have been successful if there was a ConnectException",
+        context.getCouldNotCommunicateWithServer().booleanValue());
+    assertNull(
+        "Once a failure is identified, the first time the FailureInfo is generated for the server,"
+            + " but it is not assigned to the context yet. It would be assigned on the next"
+            + " intercept.", context.getFailureInfo());
+    assertEquals(context.getTries(), tries);
+    assertFalse(
+        "We are still in the first attempt and so we dont set this variable to true yet.",
+        context.isRetryDespiteFastFailMode());
+
+    Thread.sleep(FAST_FAIL_THRESHOLD + 1); // We sleep so as to make sure that
+                                           // we
+    // actually consider this server as a
+    // dead server in the next attempt.
+    tries++;
+
+    context.prepare(callable, tries);
+    interceptor.intercept(context);
+    interceptor.handleFailure(context, new ConnectException(
+        "Failed to connect to server"));
+    interceptor.updateFailureInfo(context);
+    assertTrue("didTru should remain true", context.didTry());
+    assertTrue(
+        "The call shouldn't have been successful if there was a ConnectException",
+        context.getCouldNotCommunicateWithServer().booleanValue());
+    assertNotNull(
+        "The context this time is updated with a failureInfo, since we already gave it a try.",
+        context.getFailureInfo());
+    assertEquals(context.getTries(), tries);
+    assertTrue(
+        "Since we are alone here we would be given the permission to retryDespiteFailures.",
+        context.isRetryDespiteFastFailMode());
+    context.clear();
+
+    Thread.sleep(CLEANUP_TIMEOUT); // Lets try and cleanup the data in the fast
+                                   // fail failure maps.
+
+    tries++;
+
+    context.clear();
+    context.prepare(callable, tries);
+    interceptor.occasionallyCleanupFailureInformation();
+    assertNull("The cleanup should have cleared the server",
+        interceptor.repeatedFailuresMap.get(context.getServer()));
+    interceptor.intercept(context);
+    interceptor.handleFailure(context, new ConnectException(
+        "Failed to connect to server"));
+    interceptor.updateFailureInfo(context);
+    assertTrue("didTru should remain true", context.didTry());
+    assertTrue(
+        "The call shouldn't have been successful if there was a ConnectException",
+        context.getCouldNotCommunicateWithServer().booleanValue());
+    assertNull("The failureInfo is cleared off from the maps.",
+        context.getFailureInfo());
+    assertEquals(context.getTries(), tries);
+    assertFalse(
+        "Since we are alone here we would be given the permission to retryDespiteFailures.",
+        context.isRetryDespiteFastFailMode());
+    context.clear();
+
+  }
+
+  private <T> RetryingCallable<T> getDummyRetryingCallable(
+      ServerName someServerName) {
+    return new RegionServerCallable<T>(null, null, null) {
+      @Override
+      public T call(int callTimeout) throws Exception {
+        return null;
+      }
+
+      @Override
+      protected HRegionLocation getLocation() {
+        return new HRegionLocation(null, serverName);
+      }
+    };
+  }
+
+  @Test
+  public void testExceptionsIdentifiedByInterceptor() throws IOException {
+    Throwable[] networkexceptions = new Throwable[] {
+        new ConnectException("Mary is unwell"),
+        new SocketTimeoutException("Mike is too late"),
+        new ClosedChannelException(),
+        new SyncFailedException("Dave is not on the same page"),
+        new TimeoutException("Mike is late again"),
+        new EOFException("This is the end... "),
+        new ConnectionClosingException("Its closing") };
+    final String INDUCED = "Induced";
+    Throwable[] nonNetworkExceptions = new Throwable[] {
+        new IOException("Bob died"),
+        new RemoteException("Bob's cousin died", null),
+        new NoSuchMethodError(INDUCED), new NullPointerException(INDUCED),
+        new DoNotRetryIOException(INDUCED), new Error(INDUCED) };
+
+    Configuration conf = HBaseConfiguration.create();
+    long CLEANUP_TIMEOUT = 0;
+    long FAST_FAIL_THRESHOLD = 1000000;
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        CLEANUP_TIMEOUT);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        FAST_FAIL_THRESHOLD);
+    for (Throwable e : networkexceptions) {
+      PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+          .createPreemptiveInterceptor(conf);
+      FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+          .createEmptyContext();
+
+      RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
+      context.prepare(callable, 0);
+      interceptor.intercept(context);
+      interceptor.handleFailure(context, e);
+      interceptor.updateFailureInfo(context);
+      assertTrue(
+          "The call shouldn't have been successful if there was a ConnectException",
+          context.getCouldNotCommunicateWithServer().booleanValue());
+    }
+    for (Throwable e : nonNetworkExceptions) {
+      try {
+        PreemptiveFastFailInterceptor interceptor = TestFastFailWithoutTestUtil
+            .createPreemptiveInterceptor(conf);
+        FastFailInterceptorContext context = (FastFailInterceptorContext) interceptor
+            .createEmptyContext();
+
+        RetryingCallable<?> callable = getDummyRetryingCallable(getSomeServerName());
+        context.prepare(callable, 0);
+        interceptor.intercept(context);
+        interceptor.handleFailure(context, e);
+        interceptor.updateFailureInfo(context);
+        assertFalse(
+            "The call shouldn't have been successful if there was a ConnectException",
+            context.getCouldNotCommunicateWithServer().booleanValue());
+      } catch (NoSuchMethodError t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      } catch (NullPointerException t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      } catch (DoNotRetryIOException t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      } catch (Error t) {
+        assertTrue("Exception not induced", t.getMessage().contains(INDUCED));
+      }
+    }
+  }
+
+  protected static PreemptiveFastFailInterceptor createPreemptiveInterceptor(
+      Configuration conf) {
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    RetryingCallerInterceptorFactory interceptorFactory = new RetryingCallerInterceptorFactory(
+        conf);
+    RetryingCallerInterceptor interceptorBeforeCast = interceptorFactory
+        .build();
+    return (PreemptiveFastFailInterceptor) interceptorBeforeCast;
+  }
+
+  static PreemptiveFastFailInterceptor createPreemptiveInterceptor() {
+    Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    return createPreemptiveInterceptor(conf);
+  }
+
+  @Test(timeout = 120000)
+  public void testPreemptiveFastFailException50Times()
+      throws InterruptedException, ExecutionException {
+    for (int i = 0; i < 50; i++) {
+      testPreemptiveFastFailException();
+    }
+  }
+
+  /***
+   * This test tries to create a thread interleaving of the 2 threads trying to do a 
+   * Retrying operation using a {@link PreemptiveFastFailInterceptor}. The goal here is to make sure
+   * that the second thread will be attempting the operation while the first thread is in the
+   * process of making an attempt after it has marked the server in fast fail. 
+   * 
+   * The thread execution is as follows :
+   * The PreemptiveFastFailInterceptor is extended in this test to achieve a good interleaving
+   * behavior without using any thread sleeps.
+   * 
+   *              Privileged Thread 1                         NonPrivileged Thread 2
+   *                                              
+   *  Retry 0 :   intercept               
+   *                                              
+   *  Retry 0 :   handleFailure
+   *                      latches[0].countdown
+   *                      latches2[0].await
+   *                                                                          latches[0].await
+   *                                                    intercept                 : Retry 0
+   * 
+   *                                                    handleFailure             : Retry 0
+   * 
+   *                                                    updateFailureinfo         : Retry 0
+   *                                                                          latches2[0].countdown
+   *                                                                          
+   *  Retry 0 :   updateFailureInfo
+   *  
+   *  Retry 1 : intercept
+   *  
+   *  Retry 1 :   handleFailure
+   *                      latches[1].countdown
+   *                      latches2[1].await
+   *     
+   *                                                                          latches[1].await
+   *                                                    intercept                 : Retry 1
+   *                                                        (throws PFFE)
+   *                                                    handleFailure             : Retry 1
+   *                                       
+   *                                                    updateFailureinfo         : Retry 1
+   *                                                                          latches2[1].countdown
+   *  Retry 1 :   updateFailureInfo
+   *  
+   * 
+   *  See getInterceptor() for more details on the interceptor implementation to make sure this
+   *  thread interleaving is achieved.
+   *  
+   *  We need 2 sets of latches of size MAX_RETRIES. We use an AtomicInteger done to make sure that
+   *  we short circuit the Thread 1 after we hit the PFFE on Thread 2
+   *  
+   *  
+   * @throws InterruptedException
+   * @throws ExecutionException
+   */
+  private void testPreemptiveFastFailException() throws InterruptedException,
+      ExecutionException {
+    LOG.debug("Setting up the counters to start the test");
+    priviRetryCounter.set(0);
+    nonPriviRetryCounter.set(0);
+    done.set(0);
+
+    for (int i = 0; i <= RETRIES; i++) {
+      latches[i] = new CountDownLatch(1);
+      latches2[i] = new CountDownLatch(1);
+    }
+
+    PreemptiveFastFailInterceptor interceptor = getInterceptor();
+
+    final RpcRetryingCaller<Void> priviCaller = getRpcRetryingCaller(
+        PAUSE_TIME, RETRIES, interceptor);
+    final RpcRetryingCaller<Void> nonPriviCaller = getRpcRetryingCaller(
+        PAUSE_TIME, RETRIES, interceptor);
+
+    LOG.debug("Submitting the thread 1");
+    Future<Boolean> priviFuture = executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        try {
+          isPriviThreadLocal.get().set(true);
+          priviCaller
+              .callWithRetries(
+                  getRetryingCallable(serverName, exception),
+                  CLEANUP_TIMEOUT);
+        } catch (RetriesExhaustedException e) {
+          return true;
+        } catch (PreemptiveFastFailException e) {
+          return false;
+        }
+        return false;
+      }
+    });
+    LOG.debug("Submitting the thread 2");
+    Future<Boolean> nonPriviFuture = executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        try {
+          isPriviThreadLocal.get().set(false);
+          nonPriviCaller.callWithRetries(
+              getRetryingCallable(serverName, exception),
+              CLEANUP_TIMEOUT);
+        } catch (PreemptiveFastFailException e) {
+          return true;
+        }
+        return false;
+      }
+    });
+    LOG.debug("Waiting for Thread 2 to finish");
+    assertTrue(nonPriviFuture.get());
+    LOG.debug("Waiting for Thread 1 to finish");
+    assertTrue(priviFuture.get());
+
+    // Now that the server in fast fail mode. Lets try to make contact with the
+    // server with a third thread. And make sure that when there is no
+    // exception,
+    // the fast fail gets cleared up.
+    assertTrue(interceptor.isServerInFailureMap(serverName));
+    final RpcRetryingCaller<Void> priviCallerNew = getRpcRetryingCaller(
+        PAUSE_TIME, RETRIES, interceptor);
+    executor.submit(new Callable<Boolean>() {
+      @Override
+      public Boolean call() throws Exception {
+        priviCallerNew.callWithRetries(
+            getRetryingCallable(serverName, null), CLEANUP_TIMEOUT);
+        return false;
+      }
+    }).get();
+    assertFalse("The server was supposed to be removed from the map",
+        interceptor.isServerInFailureMap(serverName));
+  }
+
+  ExecutorService executor = Executors.newCachedThreadPool();
+  
+  /**
+   * Some timeouts to make the test execution resonable.
+   */
+  final int PAUSE_TIME = 10;
+  final int RETRIES = 3;
+  final int CLEANUP_TIMEOUT = 10000;
+  final long FAST_FAIL_THRESHOLD = PAUSE_TIME / 1;
+  
+  /**
+   * The latches necessary to make the thread interleaving possible.
+   */
+  final CountDownLatch[] latches = new CountDownLatch[RETRIES + 1];
+  final CountDownLatch[] latches2 = new CountDownLatch[RETRIES + 1];
+  final AtomicInteger done = new AtomicInteger(0);
+
+  /**
+   * Global retry counters that give us an idea about which iteration of the retry we are in
+   */
+  final AtomicInteger priviRetryCounter = new AtomicInteger();
+  final AtomicInteger nonPriviRetryCounter = new AtomicInteger();
+  final ServerName serverName = getSomeServerName();
+
+  /**
+   * The variable which is used as an identifier within the 2 threads.
+   */
+  public final ThreadLocal<AtomicBoolean> isPriviThreadLocal = new ThreadLocal<AtomicBoolean>() {
+    @Override
+    public AtomicBoolean initialValue() {
+      return new AtomicBoolean(true);
+    }
+  };
+  final Exception exception = new ConnectionClosingException("The current connection is closed");
+
+  public PreemptiveFastFailInterceptor getInterceptor() {
+    final Configuration conf = HBaseConfiguration.create();
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        CLEANUP_TIMEOUT);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        FAST_FAIL_THRESHOLD);
+
+    return new PreemptiveFastFailInterceptor(
+        conf) {
+      @Override
+      public void updateFailureInfo(RetryingCallerInterceptorContext context) {
+        boolean pffe = false;
+        if (!isPriviThreadLocal.get().get()) {
+          pffe = !((FastFailInterceptorContext)context).isRetryDespiteFastFailMode();
+        }
+        if (isPriviThreadLocal.get().get()) {
+          try {
+            // Thread 2 should be done by 2 iterations. We should short circuit Thread 1 because
+            // Thread 2 would be dead and can't do a countdown.
+            if (done.get() <= 1) {
+              latches2[priviRetryCounter.get()].await();
+            }
+          } catch (InterruptedException e) {
+            fail();
+          }
+        }
+        super.updateFailureInfo(context);
+        if (!isPriviThreadLocal.get().get()) {
+          if (pffe) done.incrementAndGet();
+          latches2[nonPriviRetryCounter.get()].countDown();
+        }
+      }
+
+      @Override
+      public void intercept(RetryingCallerInterceptorContext context)
+          throws PreemptiveFastFailException {
+        if (!isPriviThreadLocal.get().get()) {
+          try {
+            latches[nonPriviRetryCounter.getAndIncrement()].await();
+          } catch (InterruptedException e) {
+            fail();
+          }
+        }
+        super.intercept(context);
+      }
+
+      @Override
+      public void handleFailure(RetryingCallerInterceptorContext context,
+          Throwable t) throws IOException {
+        super.handleFailure(context, t);
+        if (isPriviThreadLocal.get().get()) {
+          latches[priviRetryCounter.getAndIncrement()].countDown();
+        }
+      }
+    };
+  }
+
+  public RpcRetryingCaller<Void> getRpcRetryingCaller(int pauseTime,
+      int retries, RetryingCallerInterceptor interceptor) {
+    return new RpcRetryingCaller<Void>(pauseTime, retries, interceptor) {
+      @Override
+      public Void callWithRetries(RetryingCallable<Void> callable,
+          int callTimeout) throws IOException, RuntimeException {
+        Void ret = super.callWithRetries(callable, callTimeout);
+        return ret;
+      }
+    };
+  }
+
+  protected static ServerName getSomeServerName() {
+    return ServerName.valueOf("localhost", 1234, 987654321);
+  }
+
+  private RegionServerCallable<Void> getRetryingCallable(
+      final ServerName serverName, final Exception e) {
+    return new RegionServerCallable<Void>(null, null, null) {
+      @Override
+      public void prepare(boolean reload) throws IOException {
+        this.location = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+            serverName);
+      }
+
+      @Override
+      public Void call(int callTimeout) throws Exception {
+        if (e != null)
+          throw e;
+        return null;
+      }
+
+      @Override
+      protected HRegionLocation getLocation() {
+        return new HRegionLocation(null, serverName);
+      }
+      
+      @Override
+      public void throwable(Throwable t, boolean retrying) {
+        // Do nothing
+      }
+      
+      @Override
+      public long sleep(long pause, int tries) {
+        return ConnectionUtils.getPauseTime(pause, tries + 1);
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 7befad9..d4d17c9 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1069,6 +1069,33 @@ public final class HConstants {
    */
   public static final String BUCKET_CACHE_SIZE_KEY = "hbase.bucketcache.size";
 
+  /**
+   * HConstants for fast fail on the client side follow
+   */
+  /**
+   * Config for enabling/disabling the fast fail mode.
+   */
+  public static final String HBASE_CLIENT_FAST_FAIL_MODE_ENABLED =
+      "hbase.client.fast.fail.mode.enabled";
+
+  public static final boolean HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT =
+      false;
+
+  public static final String HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS =
+      "hbase.client.fastfail.threshold";
+  
+  public static final long HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT =
+      60000;
+
+  public static final String HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS =
+      "hbase.client.fast.fail.cleanup.duration";
+
+  public static final long HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT =
+      600000;
+
+  public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL =
+      "hbase.client.fast.fail.interceptor.impl"; 
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index aeb8646..f3bf9c4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -131,6 +131,9 @@ public class HConnectionTestingUtility {
           RpcControllerFactory.instantiate(conf)));
     Mockito.doNothing().when(c).incCount();
     Mockito.doNothing().when(c).decCount();
+    Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
+        RpcRetryingCallerFactory.instantiate(conf,
+            RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR));
     return c;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
new file mode 100644
index 0000000..709e94b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFastFail.java
@@ -0,0 +1,313 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterStatus;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.test.LoadTestKVGenerator;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MediumTests.class, ClientTests.class})
+public class TestFastFail {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static byte[] FAMILY = Bytes.toBytes("testFamily");
+  private static final Random random = new Random();
+  private static int SLAVES = 3;
+  private static byte[] QUALIFIER = Bytes.toBytes("testQualifier");
+  private static final int SLEEPTIME = 1000;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(SLAVES);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  @Test
+  public void testFastFail() throws IOException, InterruptedException {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+
+    final String tableName = "testClientRelearningExperiment";
+    HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(Bytes
+        .toBytes(tableName)));
+    desc.addFamily(new HColumnDescriptor(FAMILY));
+    admin.createTable(desc, Bytes.toBytes("aaaa"), Bytes.toBytes("zzzz"), 32);
+    final long numRows = 1000;
+
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setLong(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, SLEEPTIME * 100);
+    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, SLEEPTIME / 10);
+    conf.setBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED, true);
+    conf.setLong(HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS, 0);
+    conf.setClass(HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
+        MyPreemptiveFastFailInterceptor.class,
+        PreemptiveFastFailInterceptor.class);
+
+    final Connection connection = ConnectionFactory.createConnection(conf);
+
+    /**
+     * Write numRows worth of data, so that the workers can arbitrarily read.
+     */
+    try (Table table = connection.getTable(TableName.valueOf(tableName));) {
+      writeData(table, numRows);
+    }
+
+    /**
+     * The number of threads that are going to perform actions against the test
+     * table.
+     */
+    int nThreads = 200;
+    ExecutorService service = Executors.newFixedThreadPool(nThreads);
+    final CountDownLatch continueOtherHalf = new CountDownLatch(1);
+    final CountDownLatch doneHalfway = new CountDownLatch(nThreads);
+
+    final AtomicInteger numSuccessfullThreads = new AtomicInteger(0);
+    final AtomicInteger numFailedThreads = new AtomicInteger(0);
+
+    // The total time taken for the threads to perform the second put;
+    final AtomicLong totalTimeTaken = new AtomicLong(0);
+    final AtomicInteger numBlockedWorkers = new AtomicInteger(0);
+    final AtomicInteger numPreemptiveFastFailExceptions = new AtomicInteger(0);
+
+    List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+    for (int i = 0; i < nThreads; i++) {
+      futures.add(service.submit(new Callable<Boolean>() {
+        /**
+         * The workers are going to perform a couple of reads. The second read
+         * will follow the killing of a regionserver so that we make sure that
+         * some of threads go into PreemptiveFastFailExcception
+         */
+        public Boolean call() throws Exception {
+          try (Table table = connection.getTable(TableName.valueOf(tableName))) {
+            Thread.sleep(Math.abs(random.nextInt()) % 100); // Add some jitter
+                                                            // here
+            byte[] row = longToByteArrayKey(Math.abs(random.nextLong())
+                % numRows);
+            Get g = new Get(row);
+            g.addColumn(FAMILY, QUALIFIER);
+            try {
+              table.get(g);
+            } catch (Exception e) {
+              LOG.debug("Get failed : ", e);
+              doneHalfway.countDown();
+              return false;
+            }
+
+            // Done with one get, proceeding to do the next one.
+            doneHalfway.countDown();
+            continueOtherHalf.await();
+
+            long startTime = System.currentTimeMillis();
+            g = new Get(row);
+            g.addColumn(FAMILY, QUALIFIER);
+            try {
+              table.get(g);
+              // The get was successful
+              numSuccessfullThreads.addAndGet(1);
+            } catch (Exception e) {
+              if (e instanceof PreemptiveFastFailException) {
+                // We were issued a PreemptiveFastFailException
+                numPreemptiveFastFailExceptions.addAndGet(1);
+              }
+              // Irrespective of PFFE, the request failed.
+              numFailedThreads.addAndGet(1);
+              return false;
+            } finally {
+              long enTime = System.currentTimeMillis();
+              totalTimeTaken.addAndGet(enTime - startTime);
+              if ((enTime - startTime) >= SLEEPTIME) {
+                // Considering the slow workers as the blockedWorkers.
+                // This assumes that the threads go full throttle at performing
+                // actions. In case the thread scheduling itself is as slow as
+                // SLEEPTIME, then this test might fail and so, we might have
+                // set it to a higher number on slower machines.
+                numBlockedWorkers.addAndGet(1);
+              }
+            }
+            return true;
+          } catch (Exception e) {
+            LOG.error("Caught unknown exception", e);
+            doneHalfway.countDown();
+            return false;
+          }
+        }
+      }));
+    }
+
+    doneHalfway.await();
+
+    ClusterStatus status = TEST_UTIL.getHBaseCluster().getClusterStatus();
+
+    // Kill a regionserver
+    TEST_UTIL.getHBaseCluster().getRegionServer(0).getRpcServer().stop();
+    TEST_UTIL.getHBaseCluster().getRegionServer(0).stop("Testing");
+
+    // Let the threads continue going
+    continueOtherHalf.countDown();
+
+    Thread.sleep(2 * SLEEPTIME);
+    // Restore the cluster
+    TEST_UTIL.getHBaseCluster().restoreClusterStatus(status);
+
+    int numThreadsReturnedFalse = 0;
+    int numThreadsReturnedTrue = 0;
+    int numThreadsThrewExceptions = 0;
+    for (Future<Boolean> f : futures) {
+      try {
+        numThreadsReturnedTrue += f.get() ? 1 : 0;
+        numThreadsReturnedFalse += f.get() ? 0 : 1;
+      } catch (Exception e) {
+        numThreadsThrewExceptions++;
+      }
+    }
+    LOG.debug("numThreadsReturnedFalse:"
+        + numThreadsReturnedFalse
+        + " numThreadsReturnedTrue:"
+        + numThreadsReturnedTrue
+        + " numThreadsThrewExceptions:"
+        + numThreadsThrewExceptions
+        + " numFailedThreads:"
+        + numFailedThreads.get()
+        + " numSuccessfullThreads:"
+        + numSuccessfullThreads.get()
+        + " numBlockedWorkers:"
+        + numBlockedWorkers.get()
+        + " totalTimeWaited: "
+        + totalTimeTaken.get()
+        / (numBlockedWorkers.get() == 0 ? Long.MAX_VALUE : numBlockedWorkers
+            .get()) + " numPFFEs: " + numPreemptiveFastFailExceptions.get());
+
+    assertEquals("The expected number of all the successfull and the failed "
+        + "threads should equal the total number of threads that we spawned",
+        nThreads, numFailedThreads.get() + numSuccessfullThreads.get());
+    assertEquals(
+        "All the failures should be coming from the secondput failure",
+        numFailedThreads.get(), numThreadsReturnedFalse);
+    assertEquals("Number of threads that threw execution exceptions "
+        + "otherwise should be 0", numThreadsThrewExceptions, 0);
+    assertEquals("The regionservers that returned true should equal to the"
+        + " number of successful threads", numThreadsReturnedTrue,
+        numSuccessfullThreads.get());
+    assertTrue(
+        "There should be atleast one thread that retried instead of failing",
+        MyPreemptiveFastFailInterceptor.numBraveSouls.get() > 0);
+    assertTrue(
+        "There should be atleast one PreemptiveFastFail exception,"
+            + " otherwise, the test makes little sense."
+            + "numPreemptiveFastFailExceptions: "
+            + numPreemptiveFastFailExceptions.get(),
+        numPreemptiveFastFailExceptions.get() > 0);
+    assertTrue(
+        "Only few thread should ideally be waiting for the dead "
+            + "regionserver to be coming back. numBlockedWorkers:"
+            + numBlockedWorkers.get() + " threads that retried : "
+            + MyPreemptiveFastFailInterceptor.numBraveSouls.get(),
+        numBlockedWorkers.get() <= MyPreemptiveFastFailInterceptor.numBraveSouls
+            .get());
+  }
+
+  public static class MyPreemptiveFastFailInterceptor extends
+      PreemptiveFastFailInterceptor {
+    public static AtomicInteger numBraveSouls = new AtomicInteger();
+
+    @Override
+    protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+      boolean ret = super.shouldRetryInspiteOfFastFail(fInfo);
+      if (ret)
+        numBraveSouls.addAndGet(1);
+      return ret;
+    }
+
+    public MyPreemptiveFastFailInterceptor(Configuration conf) {
+      super(conf);
+    }
+  }
+
+  private byte[] longToByteArrayKey(long rowKey) {
+    return LoadTestKVGenerator.md5PrefixedKey(rowKey).getBytes();
+  }
+
+  public void writeData(Table table, long numRows) throws IOException,
+      InterruptedException {
+    table.flushCommits();
+    for (long i = 0; i < numRows; i++) {
+      byte[] rowKey = longToByteArrayKey(i);
+      Put put = new Put(rowKey);
+      byte[] value = rowKey; // value is the same as the row key
+      put.add(FAMILY, QUALIFIER, value);
+      table.put(put);
+    }
+    LOG.info("Written all puts.");
+  }
+}


[2/2] git commit: Implement Preemptive Fast Fail

Posted by st...@apache.org.
Implement Preemptive Fast Fail

Summary: This diff ports the Preemptive Fast Fail feature to OSS. In multi threaded clients, we use a feature developed on 0.89-fb branch called Preemptive Fast Fail. This allows the client threads which would potentially fail, fail fast. The idea behind this feature is that we allow, among the hundreds of client threads, one thread to try and establish connection with the regionserver and if that succeeds, we mark it as a live node again. Meanwhile, other threads which are trying to establish connection to the same server would ideally go into the timeouts which is effectively unfruitful. We can in those cases return appropriate exceptions to those clients instead of letting them retry.

Test Plan: Unit tests

Differential Revision: https://reviews.facebook.net/D24177

Signed-off-by: stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ece933fa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ece933fa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ece933fa

Branch: refs/heads/master
Commit: ece933fa3e272531ee443265c7aef7326e89e7cd
Parents: 95282f2
Author: manukranthk <ma...@fb.com>
Authored: Tue Sep 23 19:15:09 2014 -0700
Committer: stack <st...@apache.org>
Committed: Tue Oct 28 22:47:50 2014 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClusterConnection.java  |  11 +
 .../hadoop/hbase/client/ConnectionAdapter.java  |   5 +
 .../hadoop/hbase/client/ConnectionManager.java  |  10 +-
 .../apache/hadoop/hbase/client/FailureInfo.java |  61 ++
 .../client/FastFailInterceptorContext.java      | 123 ++++
 .../org/apache/hadoop/hbase/client/HTable.java  |   2 +-
 .../client/NoOpRetryableCallerInterceptor.java  |  68 ++
 .../client/NoOpRetryingInterceptorContext.java  |  44 ++
 .../client/PreemptiveFastFailInterceptor.java   | 405 ++++++++++++
 .../hbase/client/RetryingCallerInterceptor.java |  98 +++
 .../RetryingCallerInterceptorContext.java       |  69 +++
 .../RetryingCallerInterceptorFactory.java       |  81 +++
 .../hadoop/hbase/client/RpcRetryingCaller.java  |  21 +-
 .../hbase/client/RpcRetryingCallerFactory.java  |  15 +-
 .../exceptions/ConnectionClosingException.java  |  59 ++
 .../exceptions/PreemptiveFastFailException.java |  70 +++
 .../org/apache/hadoop/hbase/ipc/RpcClient.java  |  17 +-
 .../client/TestFastFailWithoutTestUtil.java     | 613 +++++++++++++++++++
 .../org/apache/hadoop/hbase/HConstants.java     |  27 +
 .../hbase/client/HConnectionTestingUtility.java |   3 +
 .../hadoop/hbase/client/TestFastFail.java       | 313 ++++++++++
 21 files changed, 2102 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 528e7d0..398ecad 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 import java.util.List;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.RegionLocations;
@@ -270,4 +271,14 @@ public interface ClusterConnection extends HConnection {
    * @return Default AsyncProcess associated with this connection.
    */
   AsyncProcess getAsyncProcess();
+
+  /**
+   * Returns a new RpcRetryingCallerFactory from the given {@link Configuration}.
+   * This RpcRetryingCallerFactory lets the users create {@link RpcRetryingCaller}s which can be
+   * intercepted with the configured {@link RetryingCallerInterceptor}
+   * @param conf
+   * @return RpcRetryingCallerFactory
+   */
+  RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf);
 }
+

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
index dc2dc29..927b56c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionAdapter.java
@@ -440,4 +440,9 @@ class ConnectionAdapter implements ClusterConnection {
   public AsyncProcess getAsyncProcess() {
     return wrappedConnection.getAsyncProcess();
   }
+
+  @Override
+  public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
+    return wrappedConnection.getNewRpcRetryingCallerFactory(conf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 45c32f0..b3a6295 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -583,6 +583,8 @@ class ConnectionManager {
 
     private RpcControllerFactory rpcControllerFactory;
 
+    private final RetryingCallerInterceptor interceptor;
+
     /**
      * Cluster registry of basic info such as clusterid and meta region location.
      */
@@ -613,7 +615,6 @@ class ConnectionManager {
       retrieveClusterId();
 
       this.rpcClient = new RpcClient(this.conf, this.clusterId);
-      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
       this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
 
       // Do we publish the status?
@@ -664,6 +665,8 @@ class ConnectionManager {
         this.nonceGenerator = new NoNonceGenerator();
       }
       this.asyncProcess = createAsyncProcess(this.conf);
+      this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
+      this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor);
     }
 
     @Override
@@ -2509,6 +2512,11 @@ class ConnectionManager {
         master.close();
       }
     }
+
+    @Override
+    public RpcRetryingCallerFactory getNewRpcRetryingCallerFactory(Configuration conf) {
+      return RpcRetryingCallerFactory.instantiate(conf, this.interceptor);
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java
new file mode 100644
index 0000000..9d685b8
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FailureInfo.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Keeps track of repeated failures to any region server. Multiple threads manipulate the contents
+ * of this thread.
+ *
+ * Access to the members is guarded by the concurrent nature of the members inherently.
+ * 
+ */
+@InterfaceAudience.Private
+class FailureInfo {
+  // The number of consecutive failures.
+  public final AtomicLong numConsecutiveFailures = new AtomicLong();
+  // The time when the server started to become unresponsive
+  // Once set, this would never be updated.
+  public final long timeOfFirstFailureMilliSec;
+  // The time when the client last tried to contact the server.
+  // This is only updated by one client at a time
+  public volatile long timeOfLatestAttemptMilliSec;
+  // Used to keep track of concurrent attempts to contact the server.
+  // In Fast fail mode, we want just one client thread to try to connect
+  // the rest of the client threads will fail fast.
+  public final AtomicBoolean exclusivelyRetringInspiteOfFastFail = new AtomicBoolean(
+      false);
+
+  @Override
+  public String toString() {
+    return "FailureInfo: numConsecutiveFailures = "
+        + numConsecutiveFailures + " timeOfFirstFailureMilliSec = "
+        + timeOfFirstFailureMilliSec + " timeOfLatestAttemptMilliSec = "
+        + timeOfLatestAttemptMilliSec
+        + " exclusivelyRetringInspiteOfFastFail  = "
+        + exclusivelyRetringInspiteOfFastFail.get();
+  }
+
+  FailureInfo(long firstFailureTime) {
+    this.timeOfFirstFailureMilliSec = firstFailureTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
new file mode 100644
index 0000000..9eb56bc
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/FastFailInterceptorContext.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class FastFailInterceptorContext extends
+    RetryingCallerInterceptorContext {
+
+  // The variable that indicates whether we were able to connect with the server
+  // in the last run
+  private MutableBoolean couldNotCommunicateWithServer = new MutableBoolean(
+      false);
+
+  // The variable which indicates whether this was a retry or the first time
+  private boolean didTry = false;
+
+  // The failure info that is associated with the machine which we are trying to
+  // contact as part of this attempt.
+  private FailureInfo fInfo = null;
+
+  // Variable indicating that the thread that is currently executing the
+  // operation is in a mode where it would retry instead of failing fast, so
+  // that we can figure out whether making contact with the server is
+  // possible or not.
+  private boolean retryDespiteFastFailMode = false;
+
+  // The server that would be contacted to successfully complete this operation.
+  private ServerName server;
+
+  // The number of the retry we are currenty doing.
+  private int tries;
+
+  public MutableBoolean getCouldNotCommunicateWithServer() {
+    return couldNotCommunicateWithServer;
+  }
+
+  public FailureInfo getFailureInfo() {
+    return fInfo;
+  }
+
+  public ServerName getServer() {
+    return server;
+  }
+
+  public int getTries() {
+    return tries;
+  }
+
+  public boolean didTry() {
+    return didTry;
+  }
+
+  public boolean isRetryDespiteFastFailMode() {
+    return retryDespiteFastFailMode;
+  }
+
+  public void setCouldNotCommunicateWithServer(
+      MutableBoolean couldNotCommunicateWithServer) {
+    this.couldNotCommunicateWithServer = couldNotCommunicateWithServer;
+  }
+
+  public void setDidTry(boolean didTry) {
+    this.didTry = didTry;
+  }
+
+  public void setFailureInfo(FailureInfo fInfo) {
+    this.fInfo = fInfo;
+  }
+
+  public void setRetryDespiteFastFailMode(boolean retryDespiteFastFailMode) {
+    this.retryDespiteFastFailMode = retryDespiteFastFailMode;
+  }
+
+  public void setServer(ServerName server) {
+    this.server = server;
+  }
+
+  public void setTries(int tries) {
+    this.tries = tries;
+  }
+
+  public void clear() {
+    server = null;
+    fInfo = null;
+    didTry = false;
+    couldNotCommunicateWithServer.setValue(false);
+    retryDespiteFastFailMode = false;
+    tries = 0;
+  }
+
+  public FastFailInterceptorContext prepare(RetryingCallable<?> callable) {
+    return prepare(callable, 0);
+  }
+
+  public FastFailInterceptorContext prepare(RetryingCallable<?> callable,
+      int tries) {
+    if (callable instanceof RegionServerCallable) {
+      RegionServerCallable<?> retryingCallable = (RegionServerCallable<?>) callable;
+      server = retryingCallable.getLocation().getServerName();
+    }
+    this.tries = tries;
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index cd4376d..c3a94e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -358,7 +358,7 @@ public class HTable implements HTableInterface, RegionLocator {
     this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 
-    this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
+    this.rpcCallerFactory = connection.getNewRpcRetryingCallerFactory(configuration);
     this.rpcControllerFactory = RpcControllerFactory.instantiate(configuration);
     // puts need to track errors globally due to how the APIs currently work.
     ap = new AsyncProcess(connection, configuration, pool, rpcCallerFactory, true, rpcControllerFactory);

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java
new file mode 100644
index 0000000..f3f9168
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryableCallerInterceptor.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+
+/**
+ * Class that acts as a NoOpInterceptor. This class is used in case the
+ * {@link RetryingCallerInterceptor} was not configured correctly or an
+ * {@link RetryingCallerInterceptor} was never configured in the first place.
+ * 
+ */
+@InterfaceAudience.Private
+class NoOpRetryableCallerInterceptor extends RetryingCallerInterceptor {
+  private static final RetryingCallerInterceptorContext NO_OP_CONTEXT =
+      new NoOpRetryingInterceptorContext();
+
+  public NoOpRetryableCallerInterceptor() {
+  }
+
+  public NoOpRetryableCallerInterceptor(Configuration conf) {
+    super();
+  }
+
+  @Override
+  public void intercept(
+      RetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext)
+      throws PreemptiveFastFailException {
+  }
+
+  @Override
+  public void handleFailure(RetryingCallerInterceptorContext context,
+      Throwable t) throws IOException {
+  }
+
+  @Override
+  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext createEmptyContext() {
+    return NO_OP_CONTEXT;
+  }
+
+  @Override
+  public String toString() {
+    return "NoOpRetryableCallerInterceptor";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
new file mode 100644
index 0000000..1ccf43c
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoOpRetryingInterceptorContext.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+class NoOpRetryingInterceptorContext extends RetryingCallerInterceptorContext {
+
+  @Override
+  public void clear() {
+    // Do Nothing
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable) {
+    // Do Nothing
+    return this;
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable, int tries) {
+    // Do Nothing
+    return this;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
new file mode 100644
index 0000000..4256120
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/PreemptiveFastFailInterceptor.java
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.SyncFailedException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.channels.ClosedChannelException;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.mutable.MutableBoolean;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
+import org.apache.hadoop.hbase.ipc.RpcClient.FailedServerException;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * 
+ * The concrete {@link RetryingCallerInterceptor} class that implements the preemptive fast fail
+ * feature.
+ * 
+ * The motivation is as follows : 
+ * In case where a large number of clients try and talk to a particular region server in hbase, if
+ * the region server goes down due to network problems, we might end up in a scenario where
+ * the clients would go into a state where they all start to retry.
+ * This behavior will set off many of the threads in pretty much the same path and they all would be
+ * sleeping giving rise to a state where the client either needs to create more threads to send new
+ * requests to other hbase machines or block because the client cannot create anymore threads.
+ * 
+ * In most cases the clients might prefer to have a bound on the number of threads that are created
+ * in order to send requests to hbase. This would mostly result in the client thread starvation.
+ * 
+ *  To circumvent this problem, the approach that is being taken here under is to let 1 of the many
+ *  threads who are trying to contact the regionserver with connection problems and let the other
+ *  threads get a {@link PreemptiveFastFailException} so that they can move on and take other
+ *  requests.
+ *  
+ *  This would give the client more flexibility on the kind of action he would want to take in cases
+ *  where the regionserver is down. He can either discard the requests and send a nack upstream
+ *  faster or have an application level retry or buffer the requests up so as to send them down to
+ *  hbase later.
+ *
+ */
+@InterfaceAudience.Private
+class PreemptiveFastFailInterceptor extends RetryingCallerInterceptor {
+
+  public static final Log LOG = LogFactory
+      .getLog(PreemptiveFastFailInterceptor.class);
+
+  // amount of time to wait before we consider a server to be in fast fail
+  // mode
+  protected final long fastFailThresholdMilliSec;
+
+  // Keeps track of failures when we cannot talk to a server. Helps in
+  // fast failing clients if the server is down for a long time.
+  protected final ConcurrentMap<ServerName, FailureInfo> repeatedFailuresMap =
+      new ConcurrentHashMap<ServerName, FailureInfo>();
+
+  // We populate repeatedFailuresMap every time there is a failure. So, to
+  // keep it from growing unbounded, we garbage collect the failure information
+  // every cleanupInterval.
+  protected final long failureMapCleanupIntervalMilliSec;
+
+  protected volatile long lastFailureMapCleanupTimeMilliSec;
+
+  // clear failure Info. Used to clean out all entries.
+  // A safety valve, in case the client does not exit the
+  // fast fail mode for any reason.
+  private long fastFailClearingTimeMilliSec;
+
+  private final ThreadLocal<MutableBoolean> threadRetryingInFastFailMode =
+      new ThreadLocal<MutableBoolean>();
+
+  public PreemptiveFastFailInterceptor(Configuration conf) {
+    this.fastFailThresholdMilliSec = conf.getLong(
+        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS,
+        HConstants.HBASE_CLIENT_FAST_FAIL_THREASHOLD_MS_DEFAULT);
+    this.failureMapCleanupIntervalMilliSec = conf.getLong(
+        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_MS_DURATION_MS,
+        HConstants.HBASE_CLIENT_FAST_FAIL_CLEANUP_DURATION_MS_DEFAULT);
+    lastFailureMapCleanupTimeMilliSec = EnvironmentEdgeManager.currentTime();
+  }
+
+  public void intercept(FastFailInterceptorContext context)
+      throws PreemptiveFastFailException {
+    context.setFailureInfo(repeatedFailuresMap.get(context.getServer()));
+    if (inFastFailMode(context.getServer()) && !currentThreadInFastFailMode()) {
+      // In Fast-fail mode, all but one thread will fast fail. Check
+      // if we are that one chosen thread.
+      context.setRetryDespiteFastFailMode(shouldRetryInspiteOfFastFail(context
+          .getFailureInfo()));
+      if (!context.isRetryDespiteFastFailMode()) { // we don't have to retry
+        LOG.debug("Throwing PFFE : " + context.getFailureInfo() + " tries : "
+            + context.getTries());
+        throw new PreemptiveFastFailException(
+            context.getFailureInfo().numConsecutiveFailures.get(),
+            context.getFailureInfo().timeOfFirstFailureMilliSec,
+            context.getFailureInfo().timeOfLatestAttemptMilliSec, context.getServer());
+      }
+    }
+    context.setDidTry(true);
+  }
+
+  public void handleFailure(FastFailInterceptorContext context,
+      Throwable t) throws IOException {
+    handleThrowable(t, context.getServer(),
+        context.getCouldNotCommunicateWithServer());
+  }
+
+  public void updateFailureInfo(FastFailInterceptorContext context) {
+    updateFailureInfoForServer(context.getServer(), context.getFailureInfo(),
+        context.didTry(), context.getCouldNotCommunicateWithServer()
+            .booleanValue(), context.isRetryDespiteFastFailMode());
+  }
+
+  /**
+   * Handles failures encountered when communicating with a server.
+   *
+   * Updates the FailureInfo in repeatedFailuresMap to reflect the failure.
+   * Throws RepeatedConnectException if the client is in Fast fail mode.
+   *
+   * @param serverName
+   * @param t
+   *          - the throwable to be handled.
+   * @throws PreemptiveFastFailException
+   */
+  private void handleFailureToServer(ServerName serverName, Throwable t) {
+    if (serverName == null || t == null) {
+      return;
+    }
+    long currentTime = EnvironmentEdgeManager.currentTime();
+    FailureInfo fInfo = repeatedFailuresMap.get(serverName);
+    if (fInfo == null) {
+      fInfo = new FailureInfo(currentTime);
+      FailureInfo oldfInfo = repeatedFailuresMap.putIfAbsent(serverName, fInfo);
+
+      if (oldfInfo != null) {
+        fInfo = oldfInfo;
+      }
+    }
+    fInfo.timeOfLatestAttemptMilliSec = currentTime;
+    fInfo.numConsecutiveFailures.incrementAndGet();
+  }
+
+  public void handleThrowable(Throwable t1, ServerName serverName,
+      MutableBoolean couldNotCommunicateWithServer) throws IOException {
+    Throwable t2 = translateException(t1);
+    boolean isLocalException = !(t2 instanceof RemoteException);
+    if (isLocalException && isConnectionException(t2)) {
+      couldNotCommunicateWithServer.setValue(true);
+      handleFailureToServer(serverName, t2);
+    }
+  }
+
+  private Throwable translateException(Throwable t) throws IOException {
+    if (t instanceof NoSuchMethodError) {
+      // We probably can't recover from this exception by retrying.
+      LOG.error(t);
+      throw (NoSuchMethodError) t;
+    }
+
+    if (t instanceof NullPointerException) {
+      // The same here. This is probably a bug.
+      LOG.error(t.getMessage(), t);
+      throw (NullPointerException) t;
+    }
+
+    if (t instanceof UndeclaredThrowableException) {
+      t = t.getCause();
+    }
+    if (t instanceof RemoteException) {
+      t = ((RemoteException) t).unwrapRemoteException();
+    }
+    if (t instanceof DoNotRetryIOException) {
+      throw (DoNotRetryIOException) t;
+    }
+    if (t instanceof Error) {
+      throw (Error) t;
+    }
+    return t;
+  }
+
+  /**
+   * Check if the exception is something that indicates that we cannot
+   * contact/communicate with the server.
+   *
+   * @param e
+   * @return true when exception indicates that the client wasn't able to make contact with server
+   */
+  private boolean isConnectionException(Throwable e) {
+    if (e == null)
+      return false;
+    // This list covers most connectivity exceptions but not all.
+    // For example, in SocketOutputStream a plain IOException is thrown
+    // at times when the channel is closed.
+    return (e instanceof SocketTimeoutException
+        || e instanceof ConnectException || e instanceof ClosedChannelException
+        || e instanceof SyncFailedException || e instanceof EOFException
+        || e instanceof TimeoutException
+        || e instanceof ConnectionClosingException || e instanceof FailedServerException);
+  }
+
+  /**
+   * Occasionally cleans up unused information in repeatedFailuresMap.
+   *
+   * repeatedFailuresMap stores the failure information for all remote hosts
+   * that had failures. In order to avoid these from growing indefinitely,
+   * occassionallyCleanupFailureInformation() will clear these up once every
+   * cleanupInterval ms.
+   */
+  protected void occasionallyCleanupFailureInformation() {
+    long now = System.currentTimeMillis();
+    if (!(now > lastFailureMapCleanupTimeMilliSec
+        + failureMapCleanupIntervalMilliSec))
+      return;
+
+    // remove entries that haven't been attempted in a while
+    // No synchronization needed. It is okay if multiple threads try to
+    // remove the entry again and again from a concurrent hash map.
+    StringBuilder sb = new StringBuilder();
+    for (Entry<ServerName, FailureInfo> entry : repeatedFailuresMap.entrySet()) {
+      if (now > entry.getValue().timeOfLatestAttemptMilliSec
+          + failureMapCleanupIntervalMilliSec) { // no recent failures
+        repeatedFailuresMap.remove(entry.getKey());
+      } else if (now > entry.getValue().timeOfFirstFailureMilliSec
+          + this.fastFailClearingTimeMilliSec) { // been failing for a long
+                                                 // time
+        LOG.error(entry.getKey()
+            + " been failing for a long time. clearing out."
+            + entry.getValue().toString());
+        repeatedFailuresMap.remove(entry.getKey());
+      } else {
+        sb.append(entry.getKey().toString()).append(" failing ")
+            .append(entry.getValue().toString()).append("\n");
+      }
+    }
+    if (sb.length() > 0) {
+      LOG.warn("Preemptive failure enabled for : " + sb.toString());
+    }
+    lastFailureMapCleanupTimeMilliSec = now;
+  }
+
+  /**
+   * Checks to see if we are in the Fast fail mode for requests to the server.
+   *
+   * If a client is unable to contact a server for more than
+   * fastFailThresholdMilliSec the client will get into fast fail mode.
+   *
+   * @param server
+   * @return true if the client is in fast fail mode for the server.
+   */
+  private boolean inFastFailMode(ServerName server) {
+    FailureInfo fInfo = repeatedFailuresMap.get(server);
+    // if fInfo is null --> The server is considered good.
+    // If the server is bad, wait long enough to believe that the server is
+    // down.
+    return (fInfo != null &&
+        EnvironmentEdgeManager.currentTime() >
+          (fInfo.timeOfFirstFailureMilliSec + this.fastFailThresholdMilliSec));
+  }
+
+  /**
+   * Checks to see if the current thread is already in FastFail mode for *some*
+   * server.
+   *
+   * @return true, if the thread is already in FF mode.
+   */
+  private boolean currentThreadInFastFailMode() {
+    return (this.threadRetryingInFastFailMode.get() != null && (this.threadRetryingInFastFailMode
+        .get().booleanValue() == true));
+  }
+
+  /**
+   * Check to see if the client should try to connnect to the server, inspite of
+   * knowing that it is in the fast fail mode.
+   *
+   * The idea here is that we want just one client thread to be actively trying
+   * to reconnect, while all the other threads trying to reach the server will
+   * short circuit.
+   *
+   * @param fInfo
+   * @return true if the client should try to connect to the server.
+   */
+  protected boolean shouldRetryInspiteOfFastFail(FailureInfo fInfo) {
+    // We believe that the server is down, But, we want to have just one
+    // client
+    // actively trying to connect. If we are the chosen one, we will retry
+    // and not throw an exception.
+    if (fInfo != null
+        && fInfo.exclusivelyRetringInspiteOfFastFail.compareAndSet(false, true)) {
+      MutableBoolean threadAlreadyInFF = this.threadRetryingInFastFailMode
+          .get();
+      if (threadAlreadyInFF == null) {
+        threadAlreadyInFF = new MutableBoolean();
+        this.threadRetryingInFastFailMode.set(threadAlreadyInFF);
+      }
+      threadAlreadyInFF.setValue(true);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   *
+   * This function updates the Failure info for a particular server after the
+   * attempt to 
+   *
+   * @param server
+   * @param fInfo
+   * @param couldNotCommunicate
+   * @param retryDespiteFastFailMode
+   */
+  private void updateFailureInfoForServer(ServerName server,
+      FailureInfo fInfo, boolean didTry, boolean couldNotCommunicate,
+      boolean retryDespiteFastFailMode) {
+    if (server == null || fInfo == null || didTry == false)
+      return;
+
+    // If we were able to connect to the server, reset the failure
+    // information.
+    if (couldNotCommunicate == false) {
+      LOG.info("Clearing out PFFE for server " + server.getServerName());
+      repeatedFailuresMap.remove(server);
+    } else {
+      // update time of last attempt
+      long currentTime = System.currentTimeMillis();
+      fInfo.timeOfLatestAttemptMilliSec = currentTime;
+
+      // Release the lock if we were retrying inspite of FastFail
+      if (retryDespiteFastFailMode) {
+        fInfo.exclusivelyRetringInspiteOfFastFail.set(false);
+        threadRetryingInFastFailMode.get().setValue(false);
+      }
+    }
+
+    occasionallyCleanupFailureInformation();
+  }
+
+  @Override
+  public void intercept(RetryingCallerInterceptorContext context)
+      throws PreemptiveFastFailException {
+    if (context instanceof FastFailInterceptorContext) {
+      intercept((FastFailInterceptorContext) context);
+    }
+  }
+
+  @Override
+  public void handleFailure(RetryingCallerInterceptorContext context,
+      Throwable t) throws IOException {
+    if (context instanceof FastFailInterceptorContext) {
+      handleFailure((FastFailInterceptorContext) context, t);
+    }
+  }
+
+  @Override
+  public void updateFailureInfo(RetryingCallerInterceptorContext context) {
+    if (context instanceof FastFailInterceptorContext) {
+      updateFailureInfo((FastFailInterceptorContext) context);
+    }
+  }
+
+  @Override
+  public RetryingCallerInterceptorContext createEmptyContext() {
+    return new FastFailInterceptorContext();
+  }
+
+  protected boolean isServerInFailureMap(ServerName serverName) {
+    return this.repeatedFailuresMap.containsKey(serverName);
+  }
+
+  @Override
+  public String toString() {
+    return "PreemptiveFastFailInterceptor";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java
new file mode 100644
index 0000000..f372e2d
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptor.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This class is designed to fit into the RetryingCaller class which forms the
+ * central piece of intelligence for the client side retries for most calls.
+ * 
+ * One can extend this class and intercept the RetryingCaller and add additional
+ * logic into the execution of a simple HTable operations like get, delete etc.
+ * 
+ * Concrete implementations of this calls are supposed to the thread safe. The
+ * object is used across threads to identify the fast failing threads.
+ * 
+ * For a concrete use case see {@link PreemptiveFastFailInterceptor}
+ * 
+ * Example use case : 
+ * try {
+ *   interceptor.intercept
+ *   doAction()
+ * } catch (Exception e) {
+ *   interceptor.handleFailure
+ * } finally {
+ *   interceptor.updateFaulireInfo
+ * }
+ * 
+ * The {@link RetryingCallerInterceptor} also acts as a factory
+ * for getting a new {@link RetryingCallerInterceptorContext}.
+ * 
+ */
+
+@InterfaceAudience.Private
+abstract class RetryingCallerInterceptor {
+
+  protected RetryingCallerInterceptor() {
+    // Empty constructor protected for NoOpRetryableCallerInterceptor
+  }
+
+  /**
+   * This returns the context object for the current call.
+   * 
+   * @return context : the context that needs to be used during this call.
+   */
+  public abstract RetryingCallerInterceptorContext createEmptyContext();
+
+  /**
+   * Call this function in case we caught a failure during retries.
+   * 
+   * @param context
+   *          : The context object that we obtained previously.
+   * @param t
+   *          : The exception that we caught in this particular try
+   * @throws IOException
+   */
+  public abstract void handleFailure(RetryingCallerInterceptorContext context,
+      Throwable t) throws IOException;
+
+  /**
+   * Call this function alongside the actual call done on the callable.
+   * 
+   * @param abstractRetryingCallerInterceptorContext
+   * @throws PreemptiveFastFailException
+   */
+  public abstract void intercept(
+      RetryingCallerInterceptorContext abstractRetryingCallerInterceptorContext)
+      throws IOException;
+
+  /**
+   * Call this function to update at the end of the retry. This is not necessary
+   * to happen.
+   * 
+   * @param context
+   */
+  public abstract void updateFailureInfo(
+      RetryingCallerInterceptorContext context);
+
+  @Override
+  public abstract String toString();
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
new file mode 100644
index 0000000..a9f414f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorContext.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * The context object used in the {@link RpcRetryingCaller} to enable
+ * {@link RetryingCallerInterceptor} to intercept calls.
+ * {@link RetryingCallerInterceptorContext} is the piece of information unique
+ * to a retrying call that transfers information from the call into the
+ * {@link RetryingCallerInterceptor} so that {@link RetryingCallerInterceptor}
+ * can take appropriate action according to the specific logic
+ *
+ */
+@InterfaceAudience.Private
+abstract class RetryingCallerInterceptorContext {
+  protected RetryingCallerInterceptorContext() {
+  }
+
+  /**
+   * This function clears the internal state of the context object.
+   */
+  public abstract void clear();
+
+  /**
+   * This prepares the context object by populating it with information specific
+   * to the implementation of the {@link RetryingCallerInterceptor} along with
+   * which this will be used.
+   * 
+   * @param callable
+   *          : The {@link RetryingCallable} that contains the information about
+   *          the call that is being made.
+   * @return A new {@link RetryingCallerInterceptorContext} object that can be
+   *         used for use in the current retrying call
+   */
+  public abstract RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable);
+
+  /**
+   * Telescopic extension that takes which of the many retries we are currently
+   * in.
+   * 
+   * @param callable
+   *          : The {@link RetryingCallable} that contains the information about
+   *          the call that is being made.
+   * @param tries
+   *          : The retry number that we are currently in.
+   * @return A new context object that can be used for use in the current
+   *         retrying call
+   */
+  public abstract RetryingCallerInterceptorContext prepare(
+      RetryingCallable<?> callable, int tries);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java
new file mode 100644
index 0000000..9799ec0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingCallerInterceptorFactory.java
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
+
+/**
+ * Factory implementation to provide the {@link HConnectionImplementation} with
+ * the implementation of the {@link RetryingCallerInterceptor} that we would use
+ * to intercept the {@link RpcRetryingCaller} during the course of their calls.
+ * 
+ */
+
+@InterfaceAudience.Private
+class RetryingCallerInterceptorFactory {
+  private static final Log LOG = LogFactory
+      .getLog(RetryingCallerInterceptorFactory.class);
+  private Configuration conf;
+  private final boolean failFast;
+  public static final RetryingCallerInterceptor NO_OP_INTERCEPTOR =
+      new NoOpRetryableCallerInterceptor(null);
+
+  public RetryingCallerInterceptorFactory(Configuration conf) {
+    this.conf = conf;
+    failFast = conf.getBoolean(HConstants.HBASE_CLIENT_FAST_FAIL_MODE_ENABLED,
+        HConstants.HBASE_CLIENT_ENABLE_FAST_FAIL_MODE_DEFAULT);
+  }
+
+  /**
+   * This builds the implementation of {@link RetryingCallerInterceptor} that we
+   * specify in the conf and returns the same.
+   * 
+   * To use {@link PreemptiveFastFailInterceptor}, set HBASE_CLIENT_ENABLE_FAST_FAIL_MODE to true.
+   * HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL is defaulted to {@link PreemptiveFastFailInterceptor}
+   * 
+   * @return The factory build method which creates the
+   *         {@link RetryingCallerInterceptor} object according to the
+   *         configuration.
+   */
+  public RetryingCallerInterceptor build() {
+    RetryingCallerInterceptor ret = NO_OP_INTERCEPTOR;
+    if (failFast) {
+      try {
+        Class<?> c = conf.getClass(
+            HConstants.HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL,
+            PreemptiveFastFailInterceptor.class);
+        Constructor<?> constructor = c
+            .getDeclaredConstructor(Configuration.class);
+        constructor.setAccessible(true);
+        ret = (RetryingCallerInterceptor) constructor.newInstance(conf);
+      } catch (Exception e) {
+        ret = new PreemptiveFastFailInterceptor(conf);
+      }
+    }
+    LOG.trace("Using " + ret.toString() + " for intercepting the RpcRetryingCaller");
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
index bf4c40b..97e6381 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
@@ -31,6 +31,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.PreemptiveFastFailException;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
 import org.apache.hadoop.ipc.RemoteException;
@@ -61,10 +62,19 @@ public class RpcRetryingCaller<T> {
   private final long pause;
   private final int retries;
   private final AtomicBoolean cancelled = new AtomicBoolean(false);
+  private final RetryingCallerInterceptor interceptor;
+  private final RetryingCallerInterceptorContext context;
 
   public RpcRetryingCaller(long pause, int retries) {
+    this(pause, retries, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+  }
+  
+  public RpcRetryingCaller(long pause, int retries,
+      RetryingCallerInterceptor interceptor) {
     this.pause = pause;
     this.retries = retries;
+    this.interceptor = interceptor;
+    context = interceptor.createEmptyContext();
   }
 
   private int getRemainingTime(int callTimeout) {
@@ -83,7 +93,7 @@ public class RpcRetryingCaller<T> {
       return remainingTime;
     }
   }
-
+  
   public void cancel(){
     cancelled.set(true);
     synchronized (cancelled){
@@ -104,11 +114,15 @@ public class RpcRetryingCaller<T> {
     List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
       new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
     this.globalStartTime = EnvironmentEdgeManager.currentTime();
+    context.clear();
     for (int tries = 0;; tries++) {
       long expectedSleep;
       try {
         callable.prepare(tries != 0); // if called with false, check table status on ZK
+        interceptor.intercept(context.prepare(callable, tries));
         return callable.call(getRemainingTime(callTimeout));
+      } catch (PreemptiveFastFailException e) {
+        throw e;
       } catch (Throwable t) {
         ExceptionUtil.rethrowIfInterrupt(t);
         if (LOG.isTraceEnabled()) {
@@ -118,6 +132,7 @@ public class RpcRetryingCaller<T> {
         }
 
         // translateException throws exception when should not retry: i.e. when request is bad.
+        interceptor.handleFailure(context, t);
         t = translateException(t);
         callable.throwable(t, retries != 1);
         RetriesExhaustedException.ThrowableWithExtraContext qt =
@@ -139,6 +154,8 @@ public class RpcRetryingCaller<T> {
               ": " + callable.getExceptionMessageAdditionalDetail();
           throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
         }
+      } finally {
+        interceptor.updateFailureInfo(context);
       }
       try {
         if (expectedSleep > 0) {
@@ -188,7 +205,7 @@ public class RpcRetryingCaller<T> {
       }
     }
   }
-
+  
   /**
    * Get the good or the remote exception if any, throws the DoNotRetryIOException.
    * @param t the throwable to analyze

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
index 9b070a5..f482262 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
@@ -31,27 +31,38 @@ public class RpcRetryingCallerFactory {
   protected final Configuration conf;
   private final long pause;
   private final int retries;
+  private final RetryingCallerInterceptor interceptor;
 
   public RpcRetryingCallerFactory(Configuration conf) {
+    this(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+  }
+  
+  public RpcRetryingCallerFactory(Configuration conf, RetryingCallerInterceptor interceptor) {
     this.conf = conf;
     pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     retries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    this.interceptor = interceptor;
   }
 
   public <T> RpcRetryingCaller<T> newCaller() {
     // We store the values in the factory instance. This way, constructing new objects
     //  is cheap as it does not require parsing a complex structure.
-    return new RpcRetryingCaller<T>(pause, retries);
+    return new RpcRetryingCaller<T>(pause, retries, interceptor);
   }
 
   public static RpcRetryingCallerFactory instantiate(Configuration configuration) {
+    return instantiate(configuration, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR);
+  }
+  
+  public static RpcRetryingCallerFactory instantiate(Configuration configuration,
+      RetryingCallerInterceptor interceptor) {
     String clazzName = RpcRetryingCallerFactory.class.getName();
     String rpcCallerFactoryClazz =
         configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY, clazzName);
     if (rpcCallerFactoryClazz.equals(clazzName)) {
-      return new RpcRetryingCallerFactory(configuration);
+      return new RpcRetryingCallerFactory(configuration, interceptor);
     }
     return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
       new Class[] { Configuration.class }, new Object[] { configuration });

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java
new file mode 100644
index 0000000..cb8e5df
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ConnectionClosingException.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.exceptions;
+
+/**
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+* Thrown when the client believes that we are trying to communicate to has
+* been repeatedly unresponsive for a while.
+*
+* On receiving such an exception. The HConnectionManager will skip all
+* retries and fast fail the operation.
+*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ConnectionClosingException extends IOException {
+  public ConnectionClosingException(String string) {
+    super(string);
+  }
+
+  private static final long serialVersionUID = -8980028569652624236L;
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java
new file mode 100644
index 0000000..2d66d54
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/PreemptiveFastFailException.java
@@ -0,0 +1,70 @@
+/**
+  *
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+*/
+
+package org.apache.hadoop.hbase.exceptions;
+
+import java.net.ConnectException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Thrown when the client believes that we are trying to communicate to has
+ * been repeatedly unresponsive for a while.
+ *
+ * On receiving such an exception. The HConnectionManager will skip all
+ * retries and fast fail the operation.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public class PreemptiveFastFailException extends ConnectException {
+   private static final long serialVersionUID = 7129103682617007177L;
+   private long failureCount, timeOfFirstFailureMilliSec, timeOfLatestAttemptMilliSec;
+
+   /**
+    * @param count
+    * @param timeOfFirstFailureMilliSec
+    * @param timeOfLatestAttemptMilliSec
+    * @param serverName
+    */
+   public PreemptiveFastFailException(long count, long timeOfFirstFailureMilliSec,
+       long timeOfLatestAttemptMilliSec, ServerName serverName) {
+     super("Exception happened " + count + " times. to" + serverName);
+     this.failureCount = count;
+     this.timeOfFirstFailureMilliSec = timeOfFirstFailureMilliSec;
+     this.timeOfLatestAttemptMilliSec = timeOfLatestAttemptMilliSec;
+   }
+
+   public long getFirstFailureAt() {
+     return timeOfFirstFailureMilliSec;
+   }
+
+   public long getLastAttemptAt() {
+     return timeOfLatestAttemptMilliSec;
+   }
+
+   public long getFailureCount() {
+     return failureCount;
+   }
+
+   public boolean wasOperationAttemptedByServer() {
+     return false;
+   }
+ }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ece933fa/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
index c520bbf..225a2c9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -107,7 +108,6 @@ import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
-
 /**
  * Does RPC against a cluster.  Manages connections per regionserver in the cluster.
  * <p>See HBaseServer
@@ -502,7 +502,7 @@ public class RpcClient {
       private void cleanup() {
         assert shouldCloseConnection.get();
 
-        IOException ie = new IOException("Connection to " + server + " is closing.");
+        IOException ie = new ConnectionClosingException("Connection to " + server + " is closing.");
         while (true) {
           CallFuture cts = callsToWrite.poll();
           if (cts == null) {
@@ -717,7 +717,7 @@ public class RpcClient {
      */
     private void checkIsOpen() throws IOException {
       if (shouldCloseConnection.get()) {
-        throw new IOException(getName() + " is closing");
+        throw new ConnectionClosingException(getName() + " is closing");
       }
     }
 
@@ -907,7 +907,7 @@ public class RpcClient {
       }
 
       if (shouldCloseConnection.get()){
-        throw new IOException("This connection is closing");
+        throw new ConnectionClosingException("This connection is closing");
       }
 
       if (failedServers.isFailedServer(remoteId.getAddress())) {
@@ -1254,7 +1254,7 @@ public class RpcClient {
           itor.remove();
         } else if (allCalls) {
           long waitTime = EnvironmentEdgeManager.currentTime() - c.getStartTime();
-          IOException ie = new IOException("Connection to " + getRemoteAddress()
+          IOException ie = new ConnectionClosingException("Connection to " + getRemoteAddress()
               + " is closing. Call id=" + c.id + ", waitTime=" + waitTime);
           c.setException(ie);
           itor.remove();
@@ -1510,8 +1510,8 @@ public class RpcClient {
         break;
       }
       if (connection.shouldCloseConnection.get()) {
-        throw new IOException("Call id=" + call.id + " on server "
-            + addr + " aborted: connection is closing");
+        throw new ConnectionClosingException("Call id=" + call.id +
+            " on server " + addr + " aborted: connection is closing");
       }
       try {
         synchronized (call) {
@@ -1559,6 +1559,9 @@ public class RpcClient {
     } else if (exception instanceof SocketTimeoutException) {
       return (SocketTimeoutException)new SocketTimeoutException("Call to " + addr +
         " failed because " + exception).initCause(exception);
+    } else if (exception instanceof ConnectionClosingException){
+      return (ConnectionClosingException) new ConnectionClosingException(
+          "Call to " + addr + " failed on local exception: " + exception).initCause(exception);
     } else {
       return (IOException)new IOException("Call to " + addr + " failed on local exception: " +
         exception).initCause(exception);