You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/08 13:57:32 UTC
hbase git commit: HBASE-16505 Add AsyncRegion interface to pass
deadline and support async operations (Phil Yang)
Repository: hbase
Updated Branches:
refs/heads/master c04b38918 -> 1574c6ef3
HBASE-16505 Add AsyncRegion interface to pass deadline and support async operations (Phil Yang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1574c6ef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1574c6ef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1574c6ef
Branch: refs/heads/master
Commit: 1574c6ef39ef8e624e5caaf9c378fffe480b0702
Parents: c04b389
Author: tedyu <yu...@gmail.com>
Authored: Thu Sep 8 06:57:03 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu Sep 8 06:57:03 2016 -0700
----------------------------------------------------------------------
.../hbase/ipc/DelegatingHBaseRpcController.java | 10 +
.../hadoop/hbase/ipc/HBaseRpcController.java | 4 +
.../hbase/ipc/HBaseRpcControllerImpl.java | 13 ++
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 7 +-
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 4 +-
.../hadoop/hbase/regionserver/AsyncRegion.java | 99 ++++++++++
.../hadoop/hbase/regionserver/HRegion.java | 197 ++++++++++++++++++-
.../hbase/regionserver/OperationListener.java | 33 ++++
.../hadoop/hbase/regionserver/Region.java | 19 +-
.../regionserver/RegionOperationContext.java | 90 +++++++++
.../SynchronousOperationListener.java | 63 ++++++
11 files changed, 527 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
index 9f9c636..e098499 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/DelegatingHBaseRpcController.java
@@ -114,6 +114,16 @@ public class DelegatingHBaseRpcController implements HBaseRpcController {
}
@Override
+ public void setDeadline(long deadline) {
+ delegate.setDeadline(deadline);
+ }
+
+ @Override
+ public long getDeadline() {
+ return delegate.getDeadline();
+ }
+
+ @Override
public void setFailed(IOException e) {
delegate.setFailed(e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 2c4b335..9d87845 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -67,6 +67,10 @@ public interface HBaseRpcController extends RpcController, CellScannable {
boolean hasCallTimeout();
+ void setDeadline(long deadline);
+
+ long getDeadline();
+
/**
* Set failed with an exception to pass on. For use in async rpc clients
* @param e exception to set with
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index a976473..7bfc9b8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -51,6 +51,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
private IOException exception;
+ private long deadline = Long.MAX_VALUE;
+
/**
* Priority to set on this request. Set it here in controller so available composing the request.
* This is the ordained way of setting priorities going forward. We will be undoing the old
@@ -117,6 +119,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
cellScanner = null;
exception = null;
callTimeout = null;
+ deadline = Long.MAX_VALUE;
// In the implementations of some callable with replicas, rpc calls are executed in a executor
// and we could cancel the operation from outside which means there could be a race between
// reset and startCancel. Although I think the race should be handled by the callable since the
@@ -148,6 +151,16 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
}
@Override
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
+
+ @Override
+ public long getDeadline() {
+ return this.deadline;
+ }
+
+ @Override
public synchronized String errorText() {
if (!done || exception == null) {
return null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index b2b3c66..7e4ffee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.util.Pair;
@@ -121,7 +122,11 @@ public class CallRunner {
}
// make the call
resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
- call.timestamp, this.status, call.startTime, call.timeout);
+ call.timestamp, this.status, call.startTime, call.timeout);
+ } catch (TimeoutIOException e) {
+ RpcServer.LOG.info("Timeout while handling request, won't send response to client: " + call,
+ e);
+ return;
} catch (Throwable e) {
RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(), e);
errorThrowable = e;
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 0df5097..928f96d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -400,7 +400,8 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
return "callId: " + this.id + " service: " + serviceName +
" methodName: " + ((this.md != null) ? this.md.getName() : "n/a") +
" size: " + StringUtils.TraditionalBinaryPrefix.long2String(this.size, "", 1) +
- " connection: " + connection.toString();
+ " connection: " + connection.toString() +
+ " timeout: " + timeout;
}
String toTraceString() {
@@ -2207,6 +2208,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
//get an instance of the method arg type
HBaseRpcController controller = new HBaseRpcControllerImpl(cellScanner);
controller.setCallTimeout(timeout);
+ controller.setDeadline(timeout > 0 ? receiveTime + timeout : Long.MAX_VALUE);
Message result = service.callBlockingMethod(md, controller, param);
long endTime = System.currentTimeMillis();
int processingTime = (int) (endTime - startTime);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java
new file mode 100644
index 0000000..ab1a9df
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AsyncRegion.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RowMutations;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter;
+import org.apache.hadoop.hbase.wal.WALSplitter;
+
+/**
+ * Async version of Region. Support non-blocking operations and can pass more information into
+ * the operations.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface AsyncRegion extends Region {
+
+ void getRowLock(RegionOperationContext<RowLock> context, byte[] row, boolean readLock);
+
+ void append(RegionOperationContext<Result> context, Append append, long nonceGroup, long nonce);
+
+ void batchMutate(RegionOperationContext<OperationStatus[]> context, Mutation[] mutations,
+ long nonceGroup, long nonce);
+
+ void batchReplay(RegionOperationContext<OperationStatus[]> context, WALSplitter.MutationReplay[] mutations,
+ long replaySeqId);
+
+ void checkAndMutate(RegionOperationContext<Boolean> context, byte [] row, byte [] family,
+ byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator, Mutation mutation,
+ boolean writeToWAL);
+
+ void checkAndRowMutate(RegionOperationContext<Boolean> context, byte [] row, byte [] family,
+ byte [] qualifier, CompareFilter.CompareOp compareOp, ByteArrayComparable comparator,
+ RowMutations mutations, boolean writeToWAL);
+
+ void delete(RegionOperationContext<Void> context, Delete delete);
+
+ void get(RegionOperationContext<Result> context, Get get);
+
+ void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor);
+
+ void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor,
+ long nonceGroup, long nonce);
+
+ void getScanner(RegionOperationContext<RegionScanner> context, Scan scan);
+
+ void getScanner(RegionOperationContext<RegionScanner> context, Scan scan,
+ List<KeyValueScanner> additionalScanners);
+
+ void increment(RegionOperationContext<Result> context, Increment increment, long nonceGroup,
+ long nonce);
+
+ void mutateRow(RegionOperationContext<Void> context, RowMutations mutations);
+
+ void mutateRowsWithLocks(RegionOperationContext<Void> context, Collection<Mutation> mutations,
+ Collection<byte[]> rowsToLock, long nonceGroup, long nonce);
+
+ void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor);
+
+ void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor,
+ long nonceGroup, long nonce);
+
+ void processRowsWithLocks(RegionOperationContext<Void> context, RowProcessor<?,?> processor,
+ long timeout, long nonceGroup, long nonce);
+
+ void put(RegionOperationContext<Void> context, Put put);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f97f6b2..f4a23f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -128,6 +128,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.RegionInRecoveryException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.FilterWrapper;
import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
@@ -197,7 +198,7 @@ import org.apache.htrace.TraceScope;
@SuppressWarnings("deprecation")
@InterfaceAudience.Private
-public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region {
+public class HRegion implements HeapSize, PropagatingConfigurationObserver, AsyncRegion {
private static final Log LOG = LogFactory.getLog(HRegion.class);
public static final String LOAD_CFS_ON_DEMAND_CONFIG_KEY =
@@ -5296,6 +5297,200 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ @Override
+ public void append(RegionOperationContext<Result> context, Append append, long nonceGroup,
+ long nonce) {
+ try {
+ context.done(append(append, nonceGroup, nonce));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void batchMutate(RegionOperationContext<OperationStatus[]> context, Mutation[] mutations,
+ long nonceGroup, long nonce) {
+ try {
+ context.done(batchMutate(mutations, nonceGroup, nonce));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void batchReplay(RegionOperationContext<OperationStatus[]> context,
+ MutationReplay[] mutations, long replaySeqId) {
+ try {
+ context.done(batchReplay(mutations, replaySeqId));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void checkAndMutate(RegionOperationContext<Boolean> context, byte[] row, byte[] family,
+ byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+ Mutation mutation, boolean writeToWAL) {
+ try {
+ context.done(
+ checkAndMutate(row, family, qualifier, compareOp, comparator, mutation, writeToWAL));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void checkAndRowMutate(RegionOperationContext<Boolean> context, byte[] row, byte[] family,
+ byte[] qualifier, CompareOp compareOp, ByteArrayComparable comparator,
+ RowMutations mutations, boolean writeToWAL) {
+ try {
+ context.done(
+ checkAndRowMutate(row, family, qualifier, compareOp, comparator, mutations, writeToWAL));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void delete(RegionOperationContext<Void> context, Delete delete) {
+ try {
+ delete(delete);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void get(RegionOperationContext<Result> context, Get get) {
+ try {
+ context.done(get(get));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor) {
+ try {
+ context.done(get(get, withCoprocessor));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void get(RegionOperationContext<List<Cell>> context, Get get, boolean withCoprocessor,
+ long nonceGroup, long nonce) {
+ try {
+ context.done(get(get, withCoprocessor, nonceGroup, nonce));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void getScanner(RegionOperationContext<RegionScanner> context, Scan scan) {
+ try {
+ context.done(getScanner(scan));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void getScanner(RegionOperationContext<RegionScanner> context, Scan scan,
+ List<KeyValueScanner> additionalScanners) {
+ try {
+ context.done(getScanner(scan, additionalScanners));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void getRowLock(RegionOperationContext<RowLock> context, byte[] row, boolean readLock) {
+ try {
+ context.done(getRowLock(row, readLock));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void increment(RegionOperationContext<Result> context, Increment increment,
+ long nonceGroup, long nonce) {
+ try {
+ context.done(increment(increment, nonceGroup, nonce));
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void mutateRow(RegionOperationContext<Void> context, RowMutations mutations) {
+ try {
+ mutateRow(mutations);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void mutateRowsWithLocks(RegionOperationContext<Void> context,
+ Collection<Mutation> mutations, Collection<byte[]> rowsToLock, long nonceGroup, long nonce) {
+ try {
+ mutateRowsWithLocks(mutations, rowsToLock, nonceGroup, nonce);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void processRowsWithLocks(RegionOperationContext<Void> context,
+ RowProcessor<?, ?> processor) {
+ try {
+ processRowsWithLocks(processor);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void processRowsWithLocks(RegionOperationContext<Void> context,
+ RowProcessor<?, ?> processor, long nonceGroup, long nonce) {
+ try {
+ processRowsWithLocks(processor, nonceGroup, nonce);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void processRowsWithLocks(RegionOperationContext<Void> context,
+ RowProcessor<?, ?> processor, long timeout, long nonceGroup, long nonce) {
+ try {
+ processRowsWithLocks(processor, timeout, nonceGroup, nonce);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
+ @Override
+ public void put(RegionOperationContext<Void> context, Put put) {
+ try {
+ put(put);
+ context.done(null);
+ } catch (Throwable e) {
+ context.error(e);
+ }
+ }
+
public ConcurrentHashMap<HashedBytes, RowLockContext> getLockedRows() {
return lockedRows;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java
new file mode 100644
index 0000000..613d128
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/OperationListener.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface for listeners of AsyncRegion.
+ * @param <T> type of result, Void of it has no result.
+ */
+@InterfaceAudience.Private
+public interface OperationListener<T> {
+
+ void completed(T result);
+
+ void failed(Throwable t);
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index efd68b8..c04ead2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -282,15 +282,16 @@ public interface Region extends ConfigurationObserver {
}
/**
- * Tries to acquire a lock on the given row.
- * @param waitForLock if true, will block until the lock is available.
- * Otherwise, just tries to obtain the lock and returns
- * false if unavailable.
- * @return the row lock if acquired,
- * null if waitForLock was false and the lock was not acquired
- * @throws IOException if waitForLock was true and the lock could not be acquired after waiting
- */
- RowLock getRowLock(byte[] row, boolean waitForLock) throws IOException;
+ *
+ * Get a row lock for the specified row. All locks are reentrant.
+ *
+ * Before calling this function make sure that a region operation has already been
+ * started (the calling thread has already acquired the region-close-guard lock).
+ * @param row The row actions will be performed against
+ * @param readLock is the lock reader or writer. True indicates that a non-exlcusive
+ * lock is requested
+ */
+ RowLock getRowLock(byte[] row, boolean readLock) throws IOException;
/**
* If the given list of row locks is not null, releases all locks.
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java
new file mode 100644
index 0000000..6d20f84
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionOperationContext.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.protobuf.RpcController;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+
+/**
+ * In each operation of AsyncRegion, we pass a context object with information of the request.
+ * We can pass deadline of this request to AsyncRegion implementation to drop timeout request and
+ * not waste time on timed out requests.
+ * We can add listeners to watch the event of completion/failure of this operation, which helps us
+ * make operation of AsyncRegion non-blocking. It is important for Staged Event-Driven Architecture
+ * (SEDA), see HBASE-16583 for details.
+ * The context is RPC-free, don't add RPC related code. In RPC we should use listener to deal with
+ * the result.
+ * @param <T> The type of result, Void if the operation has no result.
+ */
+@InterfaceAudience.Private
+public class RegionOperationContext<T> {
+
+ private long deadline = Long.MAX_VALUE;
+ private List<OperationListener<T>> listeners;
+
+ public long getDeadline() {
+ return deadline;
+ }
+
+ public void setDeadline(long deadline) {
+ this.deadline = deadline;
+ }
+
+ public RegionOperationContext() {
+ listeners = new ArrayList<>();
+ }
+
+ public RegionOperationContext(RegionOperationContext<T> context) {
+ this.deadline = context.deadline;
+ this.listeners = new ArrayList<>(context.listeners);
+ }
+
+ public RegionOperationContext(RpcController controller) {
+ if (controller instanceof HBaseRpcController) {
+ this.deadline = ((HBaseRpcController) controller).getDeadline();
+ }
+ }
+
+ public synchronized void addListener(OperationListener<T> listener) {
+ listeners.add(listener);
+ }
+
+ /**
+ * We will call this only in one thread, so no need to lock.
+ */
+ public void error(Throwable error) {
+ for (OperationListener<T> listener : listeners) {
+ listener.failed(error);
+ }
+ }
+
+ /**
+ * We will call this only in one thread, so no need to lock.
+ */
+ public void done(T result) {
+ for (OperationListener<T> listener : listeners) {
+ listener.completed(result);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1574c6ef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java
new file mode 100644
index 0000000..24de981
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SynchronousOperationListener.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import com.google.common.base.Throwables;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * An OperationListener supporting getting result directly. Temporarily used when AsyncRegion is
+ * not fully non-blocking. When call getResult of this class, the operation must have been done.
+ */
+@InterfaceAudience.Private
+public class SynchronousOperationListener<T> implements OperationListener<T> {
+
+ private T result;
+ private boolean done;
+ private Throwable error;
+
+ @Override
+ public void completed(T result) {
+ done = true;
+ this.result = result;
+ }
+
+ @Override
+ public void failed(Throwable error) {
+ done = true;
+ this.error = error;
+ }
+
+ /**
+ * We call this method after calling operation of AsyncRegion synchronously in the same thread.
+ * So no need to lock and success/fail must has been called.
+ */
+ public T getResult() throws IOException {
+ assert done;
+ if (error != null) {
+ // We also need throw unchecked throwable
+ Throwables.propagateIfPossible(error, IOException.class);
+ // Wrap to IOE if it is not IOE or unchecked
+ throw new IOException(error);
+ }
+ return result;
+ }
+}