You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/08 13:57:32 UTC

hbase git commit: HBASE-16505 Add AsyncRegion interface to pass deadline and support async operations (Phil Yang)

Repository: hbase
Updated Branches:
  refs/heads/master c04b38918 -> 1574c6ef3


HBASE-16505 Add AsyncRegion interface to pass deadline and support async operations (Phil Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1574c6ef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1574c6ef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1574c6ef

Branch: refs/heads/master
Commit: 1574c6ef39ef8e624e5caaf9c378fffe480b0702
Parents: c04b389
Author: tedyu <yu...@gmail.com>
Authored: Thu Sep 8 06:57:03 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu Sep 8 06:57:03 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/DelegatingHBaseRpcController.java |  10 +
 .../hadoop/hbase/ipc/HBaseRpcController.java    |   4 +
 .../hbase/ipc/HBaseRpcControllerImpl.java       |  13 ++
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   7 +-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |   4 +-
 .../hadoop/hbase/regionserver/AsyncRegion.java  |  99 ++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      | 197 ++++++++++++++++++-
 .../hbase/regionserver/OperationListener.java   |  33 ++++
 .../hadoop/hbase/regionserver/Region.java       |  19 +-
 .../regionserver/RegionOperationContext.java    |  90 +++++++++
 .../SynchronousOperationListener.java           |  63 ++++++
 11 files changed, 527 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
index 9f9c636..e098499 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
@@ -114,6 +114,16 @@ public class DelegatingHBaseRpcController implements HBaseRpcController {
   }
 
   @Override
+  public void setDeadline(long deadline) {
+    delegate.setDeadline(deadline);
+  }
+
+  @Override
+  public long getDeadline() {
+    return delegate.getDeadline();
+  }
+
+  @Override
   public void setFailed(IOException e) {
     delegate.setFailed(e);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 2c4b335..9d87845 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -67,6 +67,10 @@ public interface HBaseRpcController extends RpcController, CellScannable {
 
   boolean hasCallTimeout();
 
+  void setDeadline(long deadline);
+
+  long getDeadline();
+
   /**
    * Set failed with an exception to pass on. For use in async rpc clients
    * @param e exception to set with

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index a976473..7bfc9b8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
 
   private IOException exception;
 
+  private long deadline = Long.MAX_VALUE;
+
   /**
    * Priority to set on this request. Set it here in controller so available composing the request.
    * This is the ordained way of setting priorities going forward. We will be undoing the old
@@ -117,6 +119,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
     cellScanner = null;
     exception = null;
     callTimeout = null;
+    deadline = Long.MAX_VALUE;
     // In the implementations of some callable with replicas, rpc calls are executed in a executor
     // and we could cancel the operation from outside which means there could be a race between
     // reset and startCancel. Although I think the race should be handled by the callable since the
@@ -148,6 +151,16 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
   }
 
   @Override
+  public void setDeadline(long deadline) {
+    this.deadline = deadline;
+  }
+
+  @Override
+  public long getDeadline() {
+    return this.deadline;
+  }
+
+  @Override
   public synchronized String errorText() {
     if (!done || exception == null) {
       return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index b2b3c66..7e4ffee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
 import org.apache.hadoop.hbase.ipc.RpcServer.Call;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.util.Pair;
@@ -121,7 +122,11 @@ public class CallRunner {
         }
         // make the call
         resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
-          call.timestamp, this.status, call.startTime, call.timeout);
+            call.timestamp, this.status, call.startTime, call.timeout);
+      } catch (TimeoutIOException e) {
+        RpcServer.LOG.info("Timeout while handling request, won't send response to client: " + call,
+            e);
+        return;
       } catch (Throwable e) {
         RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
         errorThrowable = e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 0df5097..928f96d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -400,7 +400,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       return "callId: " + this.id + " service: " + serviceName +
           " methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
           " size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
-          " connection: " + connection.toString();
+          " connection: " + connection.toString() +
+          " timeout: " + timeout;
     }
 
     String toTraceString() {
@@ -2207,6 +2208,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
       //get an instance of the method arg type
       HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
       controller.setCallTimeout(timeout);
+      controller.setDeadline(timeout > 0 ? receiveTime + timeout : Long.MAX_VALUE);
       Message result = service.callBlockingMethod(md, controller, param);
       long endTime = System.currentTimeMillis();
       int processingTime = (int) (endTime - startTime);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java
new file mode 100644
index 0000000..ab1a9df
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+
+/**
+ * Async version of Region. Support non-blocking operations and can pass more information into
+ * the operations.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface AsyncRegion extends Region {
+
+  void getRowLock(RegionOperationContext<RowLock> context, byte[] row, boolean readLock);
+
+  void append(RegionOperationContext<Result> context, Append append, long nonceGroup, long nonce);
+
+  void batchMutate(RegionOperationContext<OperationStatus[]> context, Mutation[] mutations,
+      long nonceGroup, long nonce);
+
+  void batchReplay(RegionOperationContext<OperationStatus[]> context, WALSplitter.MutationReplay[] mutations,
+      long replaySeqId);
+
+  void checkAndMutate(RegionOperationContext<Boolean> context, byte [] row, byte [] family,
+      byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,
+      boolean writeToWAL);
+
+  void checkAndRowMutate(RegionOperationContext<Boolean> context, byte [] row, byte [] family,
+      byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator,
+      RowMutations mutations, boolean writeToWAL);
+
+  void delete(RegionOperationContext<Void> context, Delete delete);
+
+  void get(RegionOperationContext<Result> context, Get get);
+
+  void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor);
+
+  void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor,
+      long nonceGroup, long nonce);
+
+  void getScanner(RegionOperationContext<RegionScanner> context, Scan scan);
+
+  void getScanner(RegionOperationContext<RegionScanner> context, Scan scan,
+      List<KeyValueScanner> additionalScanners);
+
+  void increment(RegionOperationContext<Result> context, Increment increment, long nonceGroup,
+      long nonce);
+
+  void mutateRow(RegionOperationContext<Void> context, RowMutations mutations);
+
+  void mutateRowsWithLocks(RegionOperationContext<Void> context, Collection<Mutation> mutations,
+      Collection<byte[]> rowsToLock, long nonceGroup, long nonce);
+
+  void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor);
+
+  void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor,
+      long nonceGroup, long nonce);
+
+  void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor,
+      long timeout, long nonceGroup, long nonce);
+
+  void put(RegionOperationContext<Void> context, Put put);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f97f6b2..f4a23f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.filter.FilterWrapper;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -197,7 +198,7 @@ import org.apache.htrace.TraceScope;
 
 @SuppressWarnings("deprecation")
 @InterfaceAudience.Private
-public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
+public class HRegion implements HeapSize, PropagatingConfigurationObserver, AsyncRegion {
   private static final Log LOG = LogFactory.getLog(HRegion.class);
 
   public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
@@ -5296,6 +5297,200 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
+  @Override
+  public void append(RegionOperationContext<Result> context, Append append, long nonceGroup,
+      long nonce) {
+    try {
+      context.done(append(append, nonceGroup, nonce));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void batchMutate(RegionOperationContext<OperationStatus[]> context, Mutation[] mutations,
+      long nonceGroup, long nonce) {
+    try {
+      context.done(batchMutate(mutations, nonceGroup, nonce));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void batchReplay(RegionOperationContext<OperationStatus[]> context,
+      MutationReplay[] mutations, long replaySeqId) {
+    try {
+      context.done(batchReplay(mutations, replaySeqId));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void checkAndMutate(RegionOperationContext<Boolean> context, byte[] row, byte[] family,
+      byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+      Mutation mutation, boolean writeToWAL) {
+    try {
+      context.done(
+          checkAndMutate(row, family, qualifier, compareOp, comparator, mutation, writeToWAL));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void checkAndRowMutate(RegionOperationContext<Boolean> context, byte[] row, byte[] family,
+      byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+      RowMutations mutations, boolean writeToWAL) {
+    try {
+      context.done(
+          checkAndRowMutate(row, family, qualifier, compareOp, comparator, mutations, writeToWAL));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void delete(RegionOperationContext<Void> context, Delete delete) {
+    try {
+      delete(delete);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void get(RegionOperationContext<Result> context, Get get) {
+    try {
+      context.done(get(get));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor) {
+    try {
+      context.done(get(get, withCoprocessor));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor,
+      long nonceGroup, long nonce) {
+    try {
+      context.done(get(get, withCoprocessor, nonceGroup, nonce));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void getScanner(RegionOperationContext<RegionScanner> context, Scan scan) {
+    try {
+      context.done(getScanner(scan));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void getScanner(RegionOperationContext<RegionScanner> context, Scan scan,
+      List<KeyValueScanner> additionalScanners) {
+    try {
+      context.done(getScanner(scan, additionalScanners));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void getRowLock(RegionOperationContext<RowLock> context, byte[] row, boolean readLock) {
+    try {
+      context.done(getRowLock(row, readLock));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void increment(RegionOperationContext<Result> context, Increment increment,
+      long nonceGroup, long nonce) {
+    try {
+      context.done(increment(increment, nonceGroup, nonce));
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void mutateRow(RegionOperationContext<Void> context, RowMutations mutations) {
+    try {
+      mutateRow(mutations);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void mutateRowsWithLocks(RegionOperationContext<Void> context,
+      Collection<Mutation> mutations, Collection<byte[]> rowsToLock, long nonceGroup, long nonce) {
+    try {
+      mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void processRowsWithLocks(RegionOperationContext<Void> context,
+      RowProcessor<?, ?> processor) {
+    try {
+      processRowsWithLocks(processor);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void processRowsWithLocks(RegionOperationContext<Void> context,
+      RowProcessor<?, ?> processor, long nonceGroup, long nonce) {
+    try {
+      processRowsWithLocks(processor, nonceGroup, nonce);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void processRowsWithLocks(RegionOperationContext<Void> context,
+      RowProcessor<?, ?> processor, long timeout, long nonceGroup, long nonce) {
+    try {
+      processRowsWithLocks(processor, timeout, nonceGroup, nonce);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
+  @Override
+  public void put(RegionOperationContext<Void> context, Put put) {
+    try {
+      put(put);
+      context.done(null);
+    } catch (Throwable e) {
+      context.error(e);
+    }
+  }
+
   public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
     return lockedRows;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java
new file mode 100644
index 0000000..613d128
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface for listeners of AsyncRegion.
+ * @param <T> type of result, Void of it has no result.
+ */
+@InterfaceAudience.Private
+public interface OperationListener<T> {
+
+  void completed(T result);
+
+  void failed(Throwable t);
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index efd68b8..c04ead2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -282,15 +282,16 @@ public interface Region extends ConfigurationObserver {
   }
 
   /**
-   * Tries to acquire a lock on the given row.
-   * @param waitForLock if true, will block until the lock is available.
-   *        Otherwise, just tries to obtain the lock and returns
-   *        false if unavailable.
-   * @return the row lock if acquired,
-   *   null if waitForLock was false and the lock was not acquired
-   * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
-   */
-  RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException;
+   *
+   * Get a row lock for the specified row. All locks are reentrant.
+   *
+   * Before calling this function make sure that a region operation has already been
+   * started (the calling thread has already acquired the region-close-guard lock).
+   * @param row The row actions will be performed against
+   * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+   *                 lock is requested
+   */
+  RowLock getRowLock(byte[] row, boolean readLock) throws IOException;
 
   /**
    * If the given list of row locks is not null, releases all locks.

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java
new file mode 100644
index 0000000..6d20f84
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.protobuf.RpcController;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+
+/**
+ * In each operation of AsyncRegion, we pass a context object with information of the request.
+ * We can pass deadline of this request to AsyncRegion implementation to drop timeout request and
+ * not waste time on timed out requests.
+ * We can add listeners to watch the event of completion/failure of this operation, which helps us
+ * make operation of AsyncRegion non-blocking. It is important for Staged Event-Driven Architecture
+ * (SEDA), see HBASE-16583 for details.
+ * The context is RPC-free, don't add RPC related code. In RPC we should use listener to deal with
+ * the result.
+ * @param <T> The type of result, Void if the operation has no result.
+ */
+@InterfaceAudience.Private
+public class RegionOperationContext<T> {
+
+  private long deadline = Long.MAX_VALUE;
+  private List<OperationListener<T>> listeners;
+
+  public long getDeadline() {
+    return deadline;
+  }
+
+  public void setDeadline(long deadline) {
+    this.deadline = deadline;
+  }
+
+  public RegionOperationContext() {
+    listeners = new ArrayList<>();
+  }
+
+  public RegionOperationContext(RegionOperationContext<T> context) {
+    this.deadline = context.deadline;
+    this.listeners = new ArrayList<>(context.listeners);
+  }
+
+  public RegionOperationContext(RpcController controller) {
+    if (controller instanceof HBaseRpcController) {
+      this.deadline = ((HBaseRpcController) controller).getDeadline();
+    }
+  }
+
+  public synchronized void addListener(OperationListener<T> listener) {
+    listeners.add(listener);
+  }
+
+  /**
+   * We will call this only in one thread, so no need to lock.
+   */
+  public void error(Throwable error) {
+    for (OperationListener<T> listener : listeners) {
+      listener.failed(error);
+    }
+  }
+
+  /**
+   * We will call this only in one thread, so no need to lock.
+   */
+  public void done(T result) {
+    for (OperationListener<T> listener : listeners) {
+      listener.completed(result);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java
new file mode 100644
index 0000000..24de981
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * An OperationListener supporting getting result directly. Temporarily used when AsyncRegion is
+ * not fully non-blocking. When call getResult of this class, the operation must have been done.
+ */
+@InterfaceAudience.Private
+public class SynchronousOperationListener<T> implements OperationListener<T> {
+
+  private T result;
+  private boolean done;
+  private Throwable error;
+
+  @Override
+  public void completed(T result) {
+    done = true;
+    this.result = result;
+  }
+
+  @Override
+  public void failed(Throwable error) {
+    done = true;
+    this.error = error;
+  }
+
+  /**
+   * We call this method after calling operation of AsyncRegion synchronously in the same thread.
+   * So no need to lock and success/fail must has been called.
+   */
+  public T getResult() throws IOException {
+    assert done;
+    if (error != null) {
+      // We also need throw unchecked throwable
+      Throwables.propagateIfPossible(error, IOException.class);
+      // Wrap to IOE if it is not IOE or unchecked
+      throw new IOException(error);
+    }
+    return result;
+  }
+}