You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nk...@apache.org on 2014/03/07 15:00:22 UTC

svn commit: r1575261 - in /hbase/branches/hbase-10070: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-common/src/main/java/org/apache/hadoop/hbase/util/ hbase-server/src/main/java/org/apache/hadoop/hbase/client/ hbase-server/src/main/...

Author: nkeywal
Date: Fri Mar  7 14:00:21 2014
New Revision: 1575261

URL: http://svn.apache.org/r1575261
Log:
HBASE-10355 Failover RPC's from client using region replicas

Added:
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
    hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
    hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
Modified:
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
    hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java Fri Mar  7 14:00:21 2014
@@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -49,12 +50,9 @@ import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.ipc.RemoteException;
 import org.cloudera.htrace.Trace;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ServiceException;
 
 /**
  * This class  allows a continuous flow of requests. It's written to be compatible with a
@@ -95,7 +93,7 @@ class AsyncProcess {
   protected static final Log LOG = LogFactory.getLog(AsyncProcess.class);
   protected static final AtomicLong COUNTER = new AtomicLong();
 
-  public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout";
+  public static final String PRIMARY_CALL_TIMEOUT_KEY = "hbase.client.primaryCallTimeout.multiget";
 
   /**
    * The context used to wait for results from one submit call.
@@ -180,7 +178,7 @@ class AsyncProcess {
   protected int numTries;
   protected int serverTrackerTimeout;
   protected long clientOpTimeout;
-  protected long primaryCallTimeout;
+  protected long primaryCallTimeoutMicroseconds;
   // End configuration settings.
 
   protected static class BatchErrors {
@@ -239,7 +237,7 @@ class AsyncProcess {
         HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
     this.clientOpTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
         HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-    this.primaryCallTimeout = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10);
+    this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
 
     this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
       HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
@@ -564,9 +562,9 @@ class AsyncProcess {
       @Override
       public void run() {
         boolean done = false;
-        if (primaryCallTimeout > 0) {
+        if (primaryCallTimeoutMicroseconds > 0) {
           try {
-            done = waitUntilDone(startTime + primaryCallTimeout);
+            done = waitUntilDone(startTime * 1000L + primaryCallTimeoutMicroseconds);
           } catch (InterruptedException ex) {
             LOG.error("Replica thread was interrupted - no replica calls: " + ex.getMessage());
             return;
@@ -875,7 +873,7 @@ class AsyncProcess {
       long startTime = EnvironmentEdgeManager.currentTimeMillis();
       ReplicaCallIssuingRunnable replicaRunnable = new ReplicaCallIssuingRunnable(
           actionsForReplicaThread, startTime);
-      if (primaryCallTimeout == 0) {
+      if (primaryCallTimeoutMicroseconds == 0) {
         // Start replica calls immediately.
         replicaRunnable.run();
       } else {
@@ -1287,22 +1285,25 @@ class AsyncProcess {
 
     private boolean waitUntilDone(long cutoff) throws InterruptedException {
       boolean hasWait = cutoff != Long.MAX_VALUE;
-      long lastLog = hasWait ? 0 : EnvironmentEdgeManager.currentTimeMillis();
+      long lastLog = EnvironmentEdgeManager.currentTimeMillis();
       long currentInProgress;
       while (0 != (currentInProgress = actionsInProgress.get())) {
-        long now = 0;
-        if (hasWait && (now = EnvironmentEdgeManager.currentTimeMillis()) > cutoff) {
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        if (hasWait && (now * 1000L) > cutoff) {
           return false;
         }
-        if (!hasWait) {
-          // Only log if wait is infinite.
-          now = EnvironmentEdgeManager.currentTimeMillis();
+        if (!hasWait) { // Only log if wait is infinite.
           if (now > lastLog + 10000) {
             lastLog = now;
             LOG.info("#" + id + ", waiting for " + currentInProgress + "  actions to finish");
           }
-          synchronized (actionsInProgress) {
-            actionsInProgress.wait(Math.min(100, hasWait ? (cutoff - now) : Long.MAX_VALUE));
+        }
+        synchronized (actionsInProgress) {
+          if (!hasWait) {
+            actionsInProgress.wait(100);
+          } else {
+            long waitMicroSecond = Math.min(100000L, (cutoff - now * 1000L));
+            TimeUnit.MICROSECONDS.timedWait(actionsInProgress, waitMicroSecond);
           }
         }
       }

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java Fri Mar  7 14:00:21 2014
@@ -143,7 +143,17 @@ interface ClusterConnection extends HCon
       final boolean useCache,
       final boolean offlined) throws IOException;
 
-
+  /**
+   *
+   * @param tableName table to get regions of
+   * @param row the row
+   * @param useCache Should we use the cache to retrieve the region information.
+   * @param retry do we retry
+   * @return region locations for this row.
+   * @throws IOException
+   */
+  RegionLocations locateRegion(TableName tableName,
+                               byte[] row, boolean useCache, boolean retry) throws IOException;
   /**
    * Returns a {@link MasterKeepAliveConnection} to the active master
    */

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java Fri Mar  7 14:00:21 2014
@@ -1021,7 +1021,8 @@ class ConnectionManager {
     }
 
 
-    private RegionLocations locateRegion(final TableName tableName,
+    @Override
+    public RegionLocations locateRegion(final TableName tableName,
       final byte [] row, boolean useCache, boolean retry)
     throws IOException {
       if (this.closed) throw new IOException(toString() + " closed");

Modified: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Fri Mar  7 14:00:21 2014
@@ -132,8 +132,12 @@ public class HTable implements HTableInt
   private ExecutorService pool;  // For Multi
   private boolean closed;
   private int operationTimeout;
+  private int retries;
   private final boolean cleanupPoolOnClose; // shutdown the pool in close()
   private final boolean cleanupConnectionOnClose; // close the connection in close()
+  private Consistency defaultConsistency = Consistency.STRONG;
+  private int primaryCallTimeoutMicroSecond;
+
 
   /** The Async process for puts with autoflush set to false or multiputs */
   protected AsyncProcess ap;
@@ -355,6 +359,10 @@ public class HTable implements HTableInt
     this.scannerCaching = this.configuration.getInt(
         HConstants.HBASE_CLIENT_SCANNER_CACHING,
         HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+    this.primaryCallTimeoutMicroSecond =
+        this.configuration.getInt("hbase.client.primaryCallTimeout.get", 10000); // 10 ms
+    this.retries = configuration.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration);
     // puts need to track errors globally due to how the APIs currently work.
@@ -789,16 +797,28 @@ public class HTable implements HTableInt
    */
   @Override
   public Result get(final Get get) throws IOException {
-    RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
-        getName(), get.getRow()) {
-      @Override
-      public Result call() throws IOException {
-        return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
-      }
-    };
-    return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout);
+    if (get.getConsistency() == null){
+      get.setConsistency(defaultConsistency);
+    }
+
+    if (get.getConsistency() == Consistency.STRONG) {
+      // Good old call.
+      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
+          getName(), get.getRow()) {
+        public Result call() throws IOException {
+          return ProtobufUtil.get(getStub(), getLocation().getRegionInfo().getRegionName(), get);
+        }
+      };
+      return rpcCallerFactory.<Result>newCaller().callWithRetries(callable, this.operationTimeout);
+    }
+
+    // Call that takes into account the replica
+    RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
+        tableName, this.connection, get, pool, retries, operationTimeout, primaryCallTimeoutMicroSecond);
+    return callable.call();
   }
 
+
   /**
    * {@inheritDoc}
    */

Added: hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java?rev=1575261&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java (added)
+++ hbase/branches/hbase-10070/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java Fri Mar  7 14:00:21 2014
@@ -0,0 +1,264 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.hbase.client;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.util.BoundedCompletionService;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Caller that goes to replica if the primary region does no answer within a configurable
+ * timeout. If the timeout is reached, it calls all the secondary replicas, and returns
+ * the first answer. If the answer comes from one of the secondary replica, it will
+ * be marked as stale.
+ */
+public class RpcRetryingCallerWithReadReplicas {
+  static final Log LOG = LogFactory.getLog(RpcRetryingCallerWithReadReplicas.class);
+  protected final ExecutorService pool;
+  protected final ClusterConnection cConnection;
+  protected final Configuration conf;
+  protected final Get get;
+  protected final TableName tableName;
+  protected final int timeBeforeReplicas;
+  private final int callTimeout;
+  private final int retries;
+
+  public RpcRetryingCallerWithReadReplicas(TableName tableName,
+                                           ClusterConnection cConnection, final Get get,
+                                           ExecutorService pool, int retries, int callTimeout,
+                                           int timeBeforeReplicas) {
+    this.tableName = tableName;
+    this.cConnection = cConnection;
+    this.conf = cConnection.getConfiguration();
+    this.get = get;
+    this.pool = pool;
+    this.retries = retries;
+    this.callTimeout = callTimeout;
+    this.timeBeforeReplicas = timeBeforeReplicas;
+  }
+
+  /**
+   * A RegionServerCallable that takes into account the replicas, i.e.
+   * - the call can be on any replica
+   * - we need to stop retrying when the call is completed
+   * - we can be interrupted
+   */
+  class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
+    final int id;
+
+    public ReplicaRegionServerCallable(int id, HRegionLocation location) {
+      super(RpcRetryingCallerWithReadReplicas.this.cConnection,
+          RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow());
+      this.id = id;
+      this.location = location;
+    }
+
+    /**
+     * Two responsibilities
+     * - if the call is already completed (by another replica) stops the retries.
+     * - set the location to the right region, depending on the replica.
+     */
+    @Override
+    public void prepare(final boolean reload) throws IOException {
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException();
+      }
+
+      if (reload || location == null) {
+        RegionLocations rl = getRegionLocations(false);
+        location = id < rl.size() ? rl.getRegionLocation(id) : null;
+      }
+
+      if (location == null) {
+        // With this exception, there will be a retry. The location can be null for a replica
+        //  when the table is created or after a split.
+        throw new HBaseIOException("There is no location for replica id #" + id);
+      }
+
+      ServerName dest = location.getServerName();
+      assert dest != null;
+
+      setStub(cConnection.getClient(dest));
+    }
+
+    @Override
+    public Result call() throws Exception {
+      if (Thread.interrupted()) {
+        throw new InterruptedIOException();
+      }
+
+      byte[] reg = location.getRegionInfo().getRegionName();
+
+      return ProtobufUtil.get(getStub(), reg, get);
+      // Ideally, we would build the object only once, but we can't right now as
+      // the protobufUtils wants  the destination server.
+    }
+  }
+
+  /**
+   * Adapter to put the HBase retrying caller into a Java callable.
+   */
+  class RetryingRPC implements Callable<Result> {
+    final RetryingCallable<Result> callable;
+
+    RetryingRPC(RetryingCallable<Result> callable) {
+      this.callable = callable;
+    }
+
+    @Override
+    public Result call() throws IOException {
+      return new RpcRetryingCallerFactory(conf).<Result>newCaller().
+          callWithRetries(callable, callTimeout);
+    }
+  }
+
+  /**
+   * Algo:
+   * - we put the query into the execution pool.
+   * - after x ms, if we don't have a result, we add the queries for the secondary replicas
+   * - we take the first answer
+   * - when done, we cancel what's left. Cancelling means:
+   * - removing from the pool if the actual call was not started
+   * - interrupting the call if it has started
+   * Client side, we need to take into account
+   * - a call is not executed immediately after being put into the pool
+   * - a call is a thread. Let's not multiply the number of thread by the number of replicas.
+   * Server side, if we can cancel when it's still in the handler pool, it's much better, as a call
+   * can take some i/o.
+   * <p/>
+   * Globally, the number of retries, timeout and so on still applies, but it's per replica,
+   * not global. We continue until all retries are done, or all timeouts are exceeded.
+   */
+  public synchronized Result call()
+      throws DoNotRetryIOException, InterruptedIOException, RetriesExhaustedException {
+    RegionLocations rl = getRegionLocations(true);
+    BoundedCompletionService<Result> cs = new BoundedCompletionService<Result>(pool, rl.size());
+
+    addCallsForReplica(cs, rl, 0, 0); // primary.
+
+    try {
+      Future<Result> f = cs.poll(timeBeforeReplicas, TimeUnit.MICROSECONDS); // Yes, microseconds
+      if (f == null) {
+        addCallsForReplica(cs, rl, 1, rl.size() - 1);  // secondaries
+        f = cs.take();
+      }
+      return f.get();
+    } catch (ExecutionException e) {
+      throwEnrichedException(e);
+      return null; // unreachable
+    } catch (CancellationException e) {
+      throw new InterruptedIOException();
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException();
+    } finally {
+      // We get there because we were interrupted or because one or more of the
+      //  calls succeeded or failed. In all case, we stop all our tasks.
+      cs.cancelAll(true);
+    }
+  }
+
+  /**
+   * Extract the real exception from the ExecutionException, and throws what makes more
+   * sense.
+   */
+  private void throwEnrichedException(ExecutionException e)
+      throws RetriesExhaustedException, DoNotRetryIOException {
+    Throwable t = e.getCause();
+    assert t != null; // That's what ExecutionException is about: holding an exception
+
+    if (t instanceof RetriesExhaustedException) {
+      throw (RetriesExhaustedException) t;
+    }
+
+    if (t instanceof DoNotRetryIOException) {
+      throw (DoNotRetryIOException) t;
+    }
+
+    RetriesExhaustedException.ThrowableWithExtraContext qt =
+        new RetriesExhaustedException.ThrowableWithExtraContext(t,
+            EnvironmentEdgeManager.currentTimeMillis(), toString());
+
+    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
+        Collections.singletonList(qt);
+
+    throw new RetriesExhaustedException(retries, exceptions);
+  }
+
+  /**
+   * Creates the calls and submit them
+   *
+   * @param cs         - the completion service to use for submitting
+   * @param rl         - the region locations
+   * @param min        - the id of the first replica, inclusive
+   * @param max        - the id of the last replica, inclusive.
+   */
+  private void addCallsForReplica(BoundedCompletionService<Result> cs,
+                                  RegionLocations rl, int min, int max) {
+    for (int id = min; id <= max; id++) {
+      HRegionLocation hrl = rl.getRegionLocation(id);
+      ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
+      RetryingRPC retryingOnReplica = new RetryingRPC(callOnReplica);
+      cs.submit(retryingOnReplica);
+    }
+  }
+
+  private RegionLocations getRegionLocations(boolean useCache)
+      throws RetriesExhaustedException, DoNotRetryIOException {
+    RegionLocations rl;
+    try {
+      rl = cConnection.locateRegion(tableName, get.getRow(), useCache, true);
+    } catch (IOException e) {
+      if (e instanceof DoNotRetryIOException) {
+        throw (DoNotRetryIOException) e;
+      } else if (e instanceof RetriesExhaustedException) {
+        throw (RetriesExhaustedException) e;
+      } else {
+        throw new RetriesExhaustedException("Can't get the location", e);
+      }
+    }
+    if (rl == null) {
+      throw new RetriesExhaustedException("Can't get the locations");
+    }
+
+    return rl;
+  }
+}
\ No newline at end of file

Added: hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java?rev=1575261&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java (added)
+++ hbase/branches/hbase-10070/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BoundedCompletionService.java Fri Mar  7 14:00:21 2014
@@ -0,0 +1,81 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A completion service, close to the one available in the JDK 1.7
+ * However, this ones keeps the list of the future, and allows to cancel them all.
+ * This means as well that it can be used for a small set of tasks only.
+ */
+public class BoundedCompletionService<V> {
+  private final Executor executor;
+  private final List<Future<V>> sent; // alls the call we sent
+  private final BlockingQueue<Future<V>> completed; // all the results we got so far.
+
+  class QueueingFuture extends FutureTask<V> {
+
+    public QueueingFuture(Callable<V> callable) {
+      super(callable);
+    }
+
+    protected void done() {
+      completed.add(QueueingFuture.this);
+    }
+  }
+
+  public BoundedCompletionService(Executor executor, int maxTasks) {
+    this.executor = executor;
+    this.sent = new ArrayList<Future<V>>(maxTasks);
+    this.completed = new ArrayBlockingQueue<Future<V>>(maxTasks);
+  }
+
+
+  public Future<V> submit(Callable<V> task) {
+    QueueingFuture newFuture = new QueueingFuture(task);
+    executor.execute(newFuture);
+    sent.add(newFuture);
+    return newFuture;
+  }
+
+  public  Future<V> take() throws InterruptedException{
+    return completed.take();
+  }
+
+  public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException{
+    return completed.poll(timeout, unit);
+  }
+
+  public void cancelAll(boolean interrupt) {
+    for (Future<V> future : sent) {
+      future.cancel(interrupt);
+    }
+  }
+}

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/client/CoprocessorHConnection.java Fri Mar  7 14:00:21 2014
@@ -325,6 +325,12 @@ class CoprocessorHConnection implements 
   }
 
   @Override
+  public RegionLocations locateRegion(TableName tableName, byte[] row,
+                                      boolean useCache, boolean retry) throws IOException {
+    return delegate.locateRegion(tableName, row, useCache, retry);
+  }
+
+  @Override
   public List<HRegionLocation> locateRegions(byte[] tableName, boolean useCache, boolean offlined)
       throws IOException {
     return delegate.locateRegions(tableName, useCache, offlined);

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Mar  7 14:00:21 2014
@@ -211,6 +211,7 @@ import org.apache.hadoop.hbase.util.FSUt
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.JvmPauseMonitor;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Sleeper;
 import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.hbase.util.Threads;

Modified: hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java?rev=1575261&r1=1575260&r2=1575261&view=diff
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java (original)
+++ hbase/branches/hbase-10070/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StorefileRefresherChore.java Fri Mar  7 14:00:21 2014
@@ -47,7 +47,7 @@ public class StorefileRefresherChore ext
   /**
    * The period (in milliseconds) for refreshing the store files for the secondary regions.
    */
-  static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD
+  public static final String REGIONSERVER_STOREFILE_REFRESH_PERIOD
     = "hbase.regionserver.storefile.refresh.period";
   static final int DEFAULT_REGIONSERVER_STOREFILE_REFRESH_PERIOD = 0; //disabled by default
 

Added: hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
URL: http://svn.apache.org/viewvc/hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java?rev=1575261&view=auto
==============================================================================
--- hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java (added)
+++ hbase/branches/hbase-10070/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java Fri Mar  7 14:00:21 2014
@@ -0,0 +1,466 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
+ * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
+ */
+@Category(MediumTests.class)
+public class TestReplicasClient {
+  private static final Log LOG = LogFactory.getLog(TestReplicasClient.class);
+
+  private static final int NB_SERVERS = 1;
+  private static HTable table = null;
+  private static final byte[] row = TestReplicasClient.class.getName().getBytes();
+
+  private static HRegionInfo hriPrimary;
+  private static HRegionInfo hriSecondary;
+
+  private static final HBaseTestingUtility HTU = new HBaseTestingUtility();
+  private static final byte[] f = HConstants.CATALOG_FAMILY;
+
+  private final static int REFRESH_PERIOD = 1000;
+
+  /**
+   * This copro is used to synchronize the tests.
+   */
+  public static class SlowMeCopro extends BaseRegionObserver {
+    static final AtomicLong sleepTime = new AtomicLong(0);
+    static final AtomicReference<CountDownLatch> cdl =
+        new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+
+    public SlowMeCopro() {
+    }
+
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+                         final Get get, final List<Cell> results) throws IOException {
+
+      if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
+        CountDownLatch latch = cdl.get();
+        try {
+          if (sleepTime.get() > 0) {
+            LOG.info("Sleeping for " + sleepTime.get() + " ms");
+            Thread.sleep(sleepTime.get());
+          } else if (latch.getCount() > 0) {
+            LOG.info("Waiting for the counterCountDownLatch");
+            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
+            if (latch.getCount() > 0) {
+              throw new RuntimeException("Can't wait more");
+            }
+          }
+        } catch (InterruptedException e1) {
+          LOG.error(e1);
+        }
+      } else {
+        LOG.info("We're not the primary replicas.");
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    // enable store file refreshing
+    HTU.getConfiguration().setInt(
+        StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
+
+    HTU.startMiniCluster(NB_SERVERS);
+
+    // Create table then get the single region for our new table.
+    HTableDescriptor hdt = HTU.createTableDescriptor(TestReplicasClient.class.getSimpleName());
+    hdt.addCoprocessor(SlowMeCopro.class.getName());
+    table = HTU.createTable(hdt, new byte[][]{f}, HTU.getConfiguration());
+
+    hriPrimary = table.getRegionLocation(row, false).getRegionInfo();
+
+    // mock a secondary region info to open
+    hriSecondary = new HRegionInfo(hriPrimary.getTable(), hriPrimary.getStartKey(),
+        hriPrimary.getEndKey(), hriPrimary.isSplit(), hriPrimary.getRegionId(), 1);
+
+    // No master
+    LOG.info("Master is going to be stopped");
+    HTU.getHBaseCluster().getMaster().stopMaster();
+    Configuration c = new Configuration(HTU.getConfiguration());
+    c.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    HBaseAdmin ha = new HBaseAdmin(c);
+    for (boolean masterRuns = true; masterRuns; ) {
+      Thread.sleep(100);
+      try {
+        masterRuns = false;
+        masterRuns = ha.isMasterRunning();
+      } catch (MasterNotRunningException ignored) {
+      }
+    }
+    LOG.info("Master has stopped");
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    if (table != null) table.close();
+    HTU.shutdownMiniCluster();
+  }
+
+  @Before
+  public void before() throws IOException {
+    HTU.getHBaseAdmin().getConnection().clearRegionCache();
+  }
+
+  @After
+  public void after() throws IOException, KeeperException {
+    try {
+      closeRegion(hriSecondary);
+    } catch (Exception ignored) {
+    }
+    ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriPrimary);
+    ZKAssign.deleteNodeFailSilent(HTU.getZooKeeperWatcher(), hriSecondary);
+
+    HTU.getHBaseAdmin().getConnection().clearRegionCache();
+  }
+
+  private HRegionServer getRS() {
+    return HTU.getMiniHBaseCluster().getRegionServer(0);
+  }
+
+  private void openRegion(HRegionInfo hri) throws Exception {
+    ZKAssign.createNodeOffline(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+    // first version is '0'
+    AdminProtos.OpenRegionRequest orr = RequestConverter.buildOpenRegionRequest(hri, 0, null);
+    AdminProtos.OpenRegionResponse responseOpen = getRS().openRegion(null, orr);
+    Assert.assertEquals(responseOpen.getOpeningStateCount(), 1);
+    Assert.assertEquals(responseOpen.getOpeningState(0),
+      AdminProtos.OpenRegionResponse.RegionOpeningState.OPENED);
+    checkRegionIsOpened(hri);
+  }
+
+  private void closeRegion(HRegionInfo hri) throws Exception {
+    ZKAssign.createNodeClosing(HTU.getZooKeeperWatcher(), hri, getRS().getServerName());
+
+    AdminProtos.CloseRegionRequest crr = RequestConverter.buildCloseRegionRequest(
+        hri.getEncodedName(), true);
+    AdminProtos.CloseRegionResponse responseClose = getRS().closeRegion(null, crr);
+    Assert.assertTrue(responseClose.getClosed());
+
+    checkRegionIsClosed(hri.getEncodedName());
+
+    ZKAssign.deleteClosedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null);
+  }
+
+  private void checkRegionIsOpened(HRegionInfo hri) throws Exception {
+
+    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+      Thread.sleep(1);
+    }
+
+    Assert.assertTrue(
+        ZKAssign.deleteOpenedNode(HTU.getZooKeeperWatcher(), hri.getEncodedName(), null));
+  }
+
+  private void checkRegionIsClosed(String encodedRegionName) throws Exception {
+
+    while (!getRS().getRegionsInTransitionInRS().isEmpty()) {
+      Thread.sleep(1);
+    }
+
+    try {
+      Assert.assertFalse(getRS().getRegionByEncodedName(encodedRegionName).isAvailable());
+    } catch (NotServingRegionException expected) {
+      // That's how it work: if the region is closed we have an exception.
+    }
+
+    // We don't delete the znode here, because there is not always a znode.
+  }
+
+  @Test
+  public void testUseRegionWithoutReplica() throws Exception {
+    byte[] b1 = "testUseRegionWithoutReplica".getBytes();
+    openRegion(hriSecondary);
+    SlowMeCopro.cdl.set(new CountDownLatch(0));
+    try {
+      Get g = new Get(b1);
+      Result r = table.get(g);
+      Assert.assertFalse(r.isStale());
+    } finally {
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testLocations() throws Exception {
+    byte[] b1 = "testLocations".getBytes();
+    openRegion(hriSecondary);
+    ClusterConnection hc = (ClusterConnection) HTU.getHBaseAdmin().getConnection();
+
+    try {
+      hc.clearRegionCache();
+      RegionLocations rl = hc.locateRegion(table.getName(), b1, false, false);
+      Assert.assertEquals(2, rl.size());
+
+      rl = hc.locateRegion(table.getName(), b1, true, false);
+      Assert.assertEquals(2, rl.size());
+
+      hc.clearRegionCache();
+      rl = hc.locateRegion(table.getName(), b1, true, false);
+      Assert.assertEquals(2, rl.size());
+
+      rl = hc.locateRegion(table.getName(), b1, false, false);
+      Assert.assertEquals(2, rl.size());
+    } finally {
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testGetNoResultNoStaleRegionWithReplica() throws Exception {
+    byte[] b1 = "testGetNoResultNoStaleRegionWithReplica".getBytes();
+    openRegion(hriSecondary);
+
+    try {
+      // A get works and is not stale
+      Get g = new Get(b1);
+      Result r = table.get(g);
+      Assert.assertFalse(r.isStale());
+    } finally {
+      closeRegion(hriSecondary);
+    }
+  }
+
+
+  @Test
+  public void testGetNoResultStaleRegionWithReplica() throws Exception {
+    byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
+    openRegion(hriSecondary);
+
+    SlowMeCopro.cdl.set(new CountDownLatch(1));
+    try {
+      Get g = new Get(b1);
+      g.setConsistency(Consistency.TIMELINE);
+      Result r = table.get(g);
+      Assert.assertTrue(r.isStale());
+    } finally {
+      SlowMeCopro.cdl.get().countDown();
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testGetNoResultNotStaleSleepRegionWithReplica() throws Exception {
+    byte[] b1 = "testGetNoResultNotStaleSleepRegionWithReplica".getBytes();
+    openRegion(hriSecondary);
+
+    try {
+      // We sleep; but we won't go to the stale region as we don't get the stale by default.
+      SlowMeCopro.sleepTime.set(2000);
+      Get g = new Get(b1);
+      Result r = table.get(g);
+      Assert.assertFalse(r.isStale());
+
+    } finally {
+      SlowMeCopro.sleepTime.set(0);
+      closeRegion(hriSecondary);
+    }
+  }
+
+
+  @Test
+  public void testFlushTable() throws Exception {
+    openRegion(hriSecondary);
+    try {
+      HTU.getHBaseAdmin().flush(table.getTableName());
+
+      Put p = new Put(row);
+      p.add(f, row, row);
+      table.put(p);
+
+      HTU.getHBaseAdmin().flush(table.getTableName());
+    } finally {
+      Delete d = new Delete(row);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testFlushPrimary() throws Exception {
+    openRegion(hriSecondary);
+
+    try {
+      HTU.getHBaseAdmin().flush(hriPrimary.getEncodedNameAsBytes());
+
+      Put p = new Put(row);
+      p.add(f, row, row);
+      table.put(p);
+
+      HTU.getHBaseAdmin().flush(hriPrimary.getEncodedNameAsBytes());
+    } finally {
+      Delete d = new Delete(row);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testFlushSecondary() throws Exception {
+    openRegion(hriSecondary);
+    try {
+      HTU.getHBaseAdmin().flush(hriSecondary.getEncodedNameAsBytes());
+
+      Put p = new Put(row);
+      p.add(f, row, row);
+      table.put(p);
+
+      HTU.getHBaseAdmin().flush(hriSecondary.getEncodedNameAsBytes());
+    } catch (TableNotFoundException expected) {
+    } finally {
+      Delete d = new Delete(row);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testUseRegionWithReplica() throws Exception {
+    byte[] b1 = "testUseRegionWithReplica".getBytes();
+    openRegion(hriSecondary);
+
+    try {
+      // A simple put works, even if there here a second replica
+      Put p = new Put(b1);
+      p.add(f, b1, b1);
+      table.put(p);
+      LOG.info("Put done");
+
+      // A get works and is not stale
+      Get g = new Get(b1);
+      Result r = table.get(g);
+      Assert.assertFalse(r.isStale());
+      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+      LOG.info("get works and is not stale done");
+
+      // Even if it we have to wait a little on the main region
+      SlowMeCopro.sleepTime.set(2000);
+      g = new Get(b1);
+      r = table.get(g);
+      Assert.assertFalse(r.isStale());
+      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+      SlowMeCopro.sleepTime.set(0);
+      LOG.info("sleep and is not stale done");
+
+      // But if we ask for stale we will get it
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
+      SlowMeCopro.cdl.get().countDown();
+
+      LOG.info("stale done");
+
+      // exists works and is not stale
+      g = new Get(b1);
+      g.setCheckExistenceOnly(true);
+      r = table.get(g);
+      Assert.assertFalse(r.isStale());
+      Assert.assertTrue(r.getExists());
+      LOG.info("exists not stale done");
+
+      // exists works on stale but don't see the put
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setCheckExistenceOnly(true);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+      Assert.assertFalse("The secondary has stale data", r.getExists());
+      SlowMeCopro.cdl.get().countDown();
+      LOG.info("exists stale before flush done");
+
+      HTU.getHBaseAdmin().flush(table.getTableName());
+      LOG.info("flush done");
+      Thread.sleep(1000 + REFRESH_PERIOD * 2);
+
+      // get works and is not stale
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+      Assert.assertFalse(r.isEmpty());
+      SlowMeCopro.cdl.get().countDown();
+      LOG.info("stale done");
+
+      // exists works on stale and we see the put after the flush
+      SlowMeCopro.cdl.set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setCheckExistenceOnly(true);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+      Assert.assertTrue(r.getExists());
+      SlowMeCopro.cdl.get().countDown();
+      LOG.info("exists stale after flush done");
+
+    } finally {
+      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+      Delete d = new Delete(b1);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+}
\ No newline at end of file