You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/03/23 16:11:42 UTC
[2/2] hbase git commit: HBASE-19504 Add TimeRange support into
checkAndMutate
HBASE-19504 Add TimeRange support into checkAndMutate
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/468cc059
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/468cc059
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/468cc059
Branch: refs/heads/branch-2.0
Commit: 468cc059d896c21a8399fa7bede0688754ca9f15
Parents: 4df0b4f
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sat Mar 24 00:00:36 2018 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Sat Mar 24 00:07:51 2018 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Append.java | 2 +-
.../apache/hadoop/hbase/client/AsyncTable.java | 8 +-
.../hadoop/hbase/client/AsyncTableImpl.java | 9 +-
.../org/apache/hadoop/hbase/client/Get.java | 2 +-
.../org/apache/hadoop/hbase/client/HTable.java | 111 +++++++------
.../apache/hadoop/hbase/client/Increment.java | 2 +-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 15 +-
.../org/apache/hadoop/hbase/client/Scan.java | 2 +-
.../org/apache/hadoop/hbase/client/Table.java | 6 +
.../hadoop/hbase/protobuf/ProtobufUtil.java | 81 +++-------
.../hbase/shaded/protobuf/ProtobufUtil.java | 108 +++++--------
.../hbase/shaded/protobuf/RequestConverter.java | 77 ++++-----
.../hbase/shaded/protobuf/TestProtobufUtil.java | 6 +-
.../org/apache/hadoop/hbase/io/TimeRange.java | 17 ++
.../src/main/protobuf/Client.proto | 1 +
hbase-protocol/src/main/protobuf/Client.proto | 1 +
.../hadoop/hbase/rest/client/RemoteHTable.java | 5 +
.../hadoop/hbase/regionserver/HRegion.java | 24 +--
.../hbase/regionserver/RSRpcServices.java | 158 ++++++++++---------
.../hadoop/hbase/regionserver/Region.java | 55 +++++--
.../hadoop/hbase/client/TestAsyncTable.java | 63 ++++++++
.../hadoop/hbase/client/TestFromClientSide.java | 55 +++++++
.../client/TestMalformedCellFromClient.java | 2 +-
.../hadoop/hbase/protobuf/TestProtobufUtil.java | 5 +
.../hbase/regionserver/TestAtomicOperation.java | 2 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 68 ++++----
.../TestSimpleTimeRangeTracker.java | 10 +-
27 files changed, 529 insertions(+), 366 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 61474b7..3a08d68 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -48,7 +48,7 @@ import org.slf4j.LoggerFactory;
public class Append extends Mutation {
private static final Logger LOG = LoggerFactory.getLogger(Append.class);
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
- private TimeRange tr = new TimeRange();
+ private TimeRange tr = TimeRange.allTime();
/**
* Sets the TimeRange to be used on the Get for this append.
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 37c80b3..cc1ba87 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -22,15 +22,14 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
import com.google.protobuf.RpcChannel;
-
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
@@ -236,6 +235,11 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
CheckAndMutateBuilder qualifier(byte[] qualifier);
/**
+ * @param timeRange time range to check.
+ */
+ CheckAndMutateBuilder timeRange(TimeRange timeRange);
+
+ /**
* Check for lack of column.
*/
CheckAndMutateBuilder ifNotExists();
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index c8553c6..9747d06 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -20,17 +20,16 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import com.google.protobuf.RpcChannel;
-
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -152,6 +151,12 @@ class AsyncTableImpl implements AsyncTable<ScanResultConsumer> {
}
@Override
+ public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
+ builder.timeRange(timeRange);
+ return this;
+ }
+
+ @Override
public CheckAndMutateBuilder ifNotExists() {
builder.ifNotExists();
return this;
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index 9ed3b38..aae52d2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -72,7 +72,7 @@ public class Get extends Query implements Row {
private boolean cacheBlocks = true;
private int storeLimit = -1;
private int storeOffset = 0;
- private TimeRange tr = new TimeRange();
+ private TimeRange tr = TimeRange.allTime();
private boolean checkExistenceOnly = false;
private boolean closestRowBefore = false;
private Map<byte [], NavigableSet<byte []>> familyMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 1a11979..69ec366 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -692,14 +693,14 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, put);
+ return doCheckAndPut(row, family, qualifier, CompareOperator.EQUAL.name(), value, null, put);
}
@Override
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
- return doCheckAndPut(row, family, qualifier, compareOp.name(), value, put);
+ return doCheckAndPut(row, family, qualifier, compareOp.name(), value, null, put);
}
@Override
@@ -708,11 +709,12 @@ public class HTable implements Table {
final CompareOperator op, final byte [] value, final Put put) throws IOException {
// The name of the operators in CompareOperator are intentionally those of the
// operators in the filter's CompareOp enum.
- return doCheckAndPut(row, family, qualifier, op.name(), value, put);
+ return doCheckAndPut(row, family, qualifier, op.name(), value, null, put);
}
- private boolean doCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
- final String opName, final byte [] value, final Put put) throws IOException {
+ private boolean doCheckAndPut(final byte[] row, final byte[] family, final byte[] qualifier,
+ final String opName, final byte[] value, final TimeRange timeRange, final Put put)
+ throws IOException {
ClientServiceCallable<Boolean> callable =
new ClientServiceCallable<Boolean>(this.connection, getName(), row,
this.rpcControllerFactory.newController(), put.getPriority()) {
@@ -721,7 +723,7 @@ public class HTable implements Table {
CompareType compareType = CompareType.valueOf(opName);
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, put);
+ new BinaryComparator(value), compareType, timeRange, put);
MutateResponse response = doMutate(request);
return Boolean.valueOf(response.getProcessed());
}
@@ -732,60 +734,58 @@ public class HTable implements Table {
@Override
@Deprecated
- public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
- final byte [] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, delete);
+ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
+ final byte[] value, final Delete delete) throws IOException {
+ return doCheckAndDelete(row, family, qualifier, CompareOperator.EQUAL.name(), value, null,
+ delete);
}
@Override
@Deprecated
- public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOp compareOp, final byte [] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, delete);
+ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
+ final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
+ return doCheckAndDelete(row, family, qualifier, compareOp.name(), value, null, delete);
}
@Override
@Deprecated
- public boolean checkAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
- final CompareOperator op, final byte [] value, final Delete delete) throws IOException {
- return doCheckAndDelete(row, family, qualifier, op.name(), value, delete);
+ public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
+ final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
+ return doCheckAndDelete(row, family, qualifier, op.name(), value, null, delete);
}
- private boolean doCheckAndDelete(final byte [] row, final byte [] family, final byte [] qualifier,
- final String opName, final byte [] value, final Delete delete) throws IOException {
+ private boolean doCheckAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
+ final String opName, final byte[] value, final TimeRange timeRange, final Delete delete)
+ throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
- new CancellableRegionServerCallable<SingleResponse>(
- this.connection, getName(), row, this.rpcControllerFactory.newController(),
- writeRpcTimeoutMs, new RetryingTimeTracker().start(), delete.getPriority()) {
- @Override
- protected SingleResponse rpcCall() throws Exception {
- CompareType compareType = CompareType.valueOf(opName);
- MutateRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, delete);
- MutateResponse response = doMutate(request);
- return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
- }
- };
+ new CancellableRegionServerCallable<SingleResponse>(this.connection, getName(), row,
+ this.rpcControllerFactory.newController(), writeRpcTimeoutMs,
+ new RetryingTimeTracker().start(), delete.getPriority()) {
+ @Override
+ protected SingleResponse rpcCall() throws Exception {
+ CompareType compareType = CompareType.valueOf(opName);
+ MutateRequest request = RequestConverter
+ .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family,
+ qualifier, new BinaryComparator(value), compareType, timeRange, delete);
+ MutateResponse response = doMutate(request);
+ return ResponseConverter.getResult(request, response, getRpcControllerCellScanner());
+ }
+ };
List<Delete> rows = Collections.singletonList(delete);
Object[] results = new Object[1];
- AsyncProcessTask task = AsyncProcessTask.newBuilder()
- .setPool(pool)
- .setTableName(tableName)
- .setRowAccess(rows)
+ AsyncProcessTask task =
+ AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName).setRowAccess(rows)
.setCallable(callable)
// TODO any better timeout?
.setRpcTimeout(Math.max(readRpcTimeoutMs, writeRpcTimeoutMs))
.setOperationTimeout(operationTimeoutMs)
- .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
- .setResults(results)
- .build();
+ .setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL).setResults(results).build();
AsyncRequestFuture ars = multiAp.submit(task);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
}
- return ((SingleResponse.Entry)results[0]).isProcessed();
+ return ((SingleResponse.Entry) results[0]).isProcessed();
}
@Override
@@ -793,9 +793,9 @@ public class HTable implements Table {
return new CheckAndMutateBuilderImpl(row, family);
}
- private boolean doCheckAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
- final String opName, final byte [] value, final RowMutations rm)
- throws IOException {
+ private boolean doCheckAndMutate(final byte[] row, final byte[] family, final byte[] qualifier,
+ final String opName, final byte[] value, final TimeRange timeRange, final RowMutations rm)
+ throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
rpcControllerFactory.newController(), writeRpcTimeoutMs, new RetryingTimeTracker().start(),
@@ -803,18 +803,18 @@ public class HTable implements Table {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(opName);
- MultiRequest request = RequestConverter.buildMutateRequest(
- getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
- new BinaryComparator(value), compareType, rm);
+ MultiRequest request = RequestConverter
+ .buildMutateRequest(getLocation().getRegionInfo().getRegionName(), row, family, qualifier,
+ new BinaryComparator(value), compareType, timeRange, rm);
ClientProtos.MultiResponse response = doMulti(request);
ClientProtos.RegionActionResult res = response.getRegionActionResultList().get(0);
if (res.hasException()) {
Throwable ex = ProtobufUtil.toException(res.getException());
if (ex instanceof IOException) {
- throw (IOException)ex;
+ throw (IOException) ex;
}
- throw new IOException("Failed to checkAndMutate row: "+
- Bytes.toStringBinary(rm.getRow()), ex);
+ throw new IOException(
+ "Failed to checkAndMutate row: " + Bytes.toStringBinary(rm.getRow()), ex);
}
return ResponseConverter.getResults(request, response, getRpcControllerCellScanner());
}
@@ -850,14 +850,14 @@ public class HTable implements Table {
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm)
throws IOException {
- return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, rm);
+ return doCheckAndMutate(row, family, qualifier, compareOp.name(), value, null, rm);
}
@Override
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
- return doCheckAndMutate(row, family, qualifier, op.name(), value, rm);
+ return doCheckAndMutate(row, family, qualifier, op.name(), value, null, rm);
}
@Override
@@ -1234,6 +1234,7 @@ public class HTable implements Table {
private final byte[] row;
private final byte[] family;
private byte[] qualifier;
+ private TimeRange timeRange;
private CompareOperator op;
private byte[] value;
@@ -1250,6 +1251,12 @@ public class HTable implements Table {
}
@Override
+ public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ return this;
+ }
+
+ @Override
public CheckAndMutateBuilder ifNotExists() {
this.op = CompareOperator.EQUAL;
this.value = null;
@@ -1271,19 +1278,19 @@ public class HTable implements Table {
@Override
public boolean thenPut(Put put) throws IOException {
preCheck();
- return doCheckAndPut(row, family, qualifier, op.name(), value, put);
+ return doCheckAndPut(row, family, qualifier, op.name(), value, timeRange, put);
}
@Override
public boolean thenDelete(Delete delete) throws IOException {
preCheck();
- return doCheckAndDelete(row, family, qualifier, op.name(), value, delete);
+ return doCheckAndDelete(row, family, qualifier, op.name(), value, timeRange, delete);
}
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
preCheck();
- return doCheckAndMutate(row, family, qualifier, op.name(), value, mutation);
+ return doCheckAndMutate(row, family, qualifier, op.name(), value, timeRange, mutation);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index 76208d6..d7d1116 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -48,7 +48,7 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Public
public class Increment extends Mutation {
private static final int HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
- private TimeRange tr = new TimeRange();
+ private TimeRange tr = TimeRange.allTime();
/**
* Create a Increment operation for the specified row.
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index e6f78a1..d705d7c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -265,6 +266,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private byte[] qualifier;
+ private TimeRange timeRange;
+
private CompareOperator op;
private byte[] value;
@@ -282,6 +285,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
@Override
+ public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
+ this.timeRange = timeRange;
+ return this;
+ }
+
+ @Override
public CheckAndMutateBuilder ifNotExists() {
this.op = CompareOperator.EQUAL;
this.value = null;
@@ -307,7 +316,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller,
loc, stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), p),
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, p),
(c, r) -> r.getProcessed()))
.call();
}
@@ -319,7 +328,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), d),
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, d),
(c, r) -> r.getProcessed()))
.call();
}
@@ -331,7 +340,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
- new BinaryComparator(value), CompareType.valueOf(op.name()), rm),
+ new BinaryComparator(value), CompareType.valueOf(op.name()), timeRange, rm),
resp -> resp.getExists()))
.call();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 7139b26..20a2ada 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -141,7 +141,7 @@ public class Scan extends Query {
private long maxResultSize = -1;
private boolean cacheBlocks = true;
private boolean reversed = false;
- private TimeRange tr = new TimeRange();
+ private TimeRange tr = TimeRange.allTime();
private Map<byte [], NavigableSet<byte []>> familyMap =
new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
private Boolean asyncPrefetch = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index 81513fe..fab439c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.filter.CompareFilter;
@@ -438,6 +439,11 @@ public interface Table extends Closeable {
CheckAndMutateBuilder qualifier(byte[] qualifier);
/**
+ * @param timeRange timeRange to check
+ */
+ CheckAndMutateBuilder timeRange(TimeRange timeRange);
+
+ /**
* Check for lack of column.
*/
CheckAndMutateBuilder ifNotExists();
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index 3033da7..735ddef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -27,7 +27,6 @@ import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import com.google.protobuf.TextFormat;
-
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
@@ -36,10 +35,8 @@ import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.NavigableSet;
import java.util.function.Function;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
@@ -875,20 +872,13 @@ public final class ProtobufUtil {
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
}
scanBuilder.setMaxVersions(scan.getMaxVersions());
- for (Entry<byte[], TimeRange> cftr : scan.getColumnFamilyTimeRange().entrySet()) {
- HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
- b.setColumnFamily(ByteStringer.wrap(cftr.getKey()));
- b.setTimeRange(timeRangeToProto(cftr.getValue()));
- scanBuilder.addCfTimeRange(b);
- }
- TimeRange timeRange = scan.getTimeRange();
- if (!timeRange.isAllTime()) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- scanBuilder.setTimeRange(timeRangeBuilder.build());
- }
+ scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> {
+ scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
+ .setColumnFamily(ByteStringer.wrap(cf))
+ .setTimeRange(toTimeRange(timeRange))
+ .build());
+ });
+ scanBuilder.setTimeRange(toTimeRange(scan.getTimeRange()));
Map<String, byte[]> attributes = scan.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
@@ -1076,20 +1066,12 @@ public final class ProtobufUtil {
if (get.getFilter() != null) {
builder.setFilter(ProtobufUtil.toFilter(get.getFilter()));
}
- for (Entry<byte[], TimeRange> cftr : get.getColumnFamilyTimeRange().entrySet()) {
- HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
- b.setColumnFamily(ByteStringer.wrap(cftr.getKey()));
- b.setTimeRange(timeRangeToProto(cftr.getValue()));
- builder.addCfTimeRange(b);
- }
- TimeRange timeRange = get.getTimeRange();
- if (!timeRange.isAllTime()) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- builder.setTimeRange(timeRangeBuilder.build());
- }
+ get.getColumnFamilyTimeRange().forEach((cf, timeRange) ->
+ builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
+ .setColumnFamily(ByteStringer.wrap(cf))
+ .setTimeRange(toTimeRange(timeRange)).build())
+ );
+ builder.setTimeRange(toTimeRange(get.getTimeRange()));
Map<String, byte[]> attributes = get.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
@@ -1135,16 +1117,6 @@ public final class ProtobufUtil {
return builder.build();
}
- static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) {
- if (!timeRange.isAllTime()) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- builder.setTimeRange(timeRangeBuilder.build());
- }
- }
-
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
throws IOException {
return toMutation(type, mutation, HConstants.NO_NONCE);
@@ -1176,12 +1148,10 @@ public final class ProtobufUtil {
builder.setNonce(nonce);
}
if (type == MutationType.INCREMENT) {
- TimeRange timeRange = ((Increment) mutation).getTimeRange();
- setTimeRange(builder, timeRange);
+ builder.setTimeRange(toTimeRange(((Increment) mutation).getTimeRange()));
}
if (type == MutationType.APPEND) {
- TimeRange timeRange = ((Append) mutation).getTimeRange();
- setTimeRange(builder, timeRange);
+ builder.setTimeRange(toTimeRange(((Append) mutation).getTimeRange()));
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
@@ -1239,10 +1209,10 @@ public final class ProtobufUtil {
getMutationBuilderAndSetCommonFields(type, mutation, builder);
builder.setAssociatedCellCount(mutation.size());
if (mutation instanceof Increment) {
- setTimeRange(builder, ((Increment)mutation).getTimeRange());
+ builder.setTimeRange(toTimeRange(((Increment)mutation).getTimeRange()));
}
if (mutation instanceof Append) {
- setTimeRange(builder, ((Append)mutation).getTimeRange());
+ builder.setTimeRange(toTimeRange(((Append)mutation).getTimeRange()));
}
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
@@ -1718,14 +1688,6 @@ public final class ProtobufUtil {
codedInput.checkLastTagWas(0);
}
- private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- return timeRangeBuilder;
- }
-
private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException {
long minStamp = 0;
long maxStamp = Long.MAX_VALUE;
@@ -1818,4 +1780,13 @@ public final class ProtobufUtil {
}
return RSGroupInfo;
}
+
+ public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) {
+ if (timeRange == null) {
+ timeRange = TimeRange.allTime();
+ }
+ return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin())
+ .setTo(timeRange.getMax())
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 7cbddcf..1c90357 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -529,13 +529,13 @@ public final class ProtobufUtil {
}
if (proto.getCfTimeRangeCount() > 0) {
for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) {
- TimeRange timeRange = protoToTimeRange(cftr.getTimeRange());
+ TimeRange timeRange = toTimeRange(cftr.getTimeRange());
get.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(),
timeRange.getMin(), timeRange.getMax());
}
}
if (proto.hasTimeRange()) {
- TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+ TimeRange timeRange = toTimeRange(proto.getTimeRange());
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
if (proto.hasFilter()) {
@@ -858,7 +858,7 @@ public final class ProtobufUtil {
Append append = toDelta((Bytes row) -> new Append(row.get(), row.getOffset(), row.getLength()),
Append::add, proto, cellScanner);
if (proto.hasTimeRange()) {
- TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+ TimeRange timeRange = toTimeRange(proto.getTimeRange());
append.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
return append;
@@ -878,7 +878,7 @@ public final class ProtobufUtil {
Increment increment = toDelta((Bytes row) -> new Increment(row.get(), row.getOffset(), row.getLength()),
Increment::add, proto, cellScanner);
if (proto.hasTimeRange()) {
- TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+ TimeRange timeRange = toTimeRange(proto.getTimeRange());
increment.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
return increment;
@@ -950,7 +950,7 @@ public final class ProtobufUtil {
}
}
if (proto.hasTimeRange()) {
- TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+ TimeRange timeRange = toTimeRange(proto.getTimeRange());
get.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
for (NameBytesPair attribute : proto.getAttributeList()) {
@@ -1014,20 +1014,13 @@ public final class ProtobufUtil {
scanBuilder.setLoadColumnFamiliesOnDemand(loadColumnFamiliesOnDemand);
}
scanBuilder.setMaxVersions(scan.getMaxVersions());
- for (Entry<byte[], TimeRange> cftr : scan.getColumnFamilyTimeRange().entrySet()) {
- HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
- b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey()));
- b.setTimeRange(timeRangeToProto(cftr.getValue()));
- scanBuilder.addCfTimeRange(b);
- }
- TimeRange timeRange = scan.getTimeRange();
- if (!timeRange.isAllTime()) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- scanBuilder.setTimeRange(timeRangeBuilder.build());
- }
+ scan.getColumnFamilyTimeRange().forEach((cf, timeRange) -> {
+ scanBuilder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
+ .setColumnFamily(UnsafeByteOperations.unsafeWrap(cf))
+ .setTimeRange(toTimeRange(timeRange))
+ .build());
+ });
+ scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(scan.getTimeRange()));
Map<String, byte[]> attributes = scan.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
@@ -1146,13 +1139,13 @@ public final class ProtobufUtil {
}
if (proto.getCfTimeRangeCount() > 0) {
for (HBaseProtos.ColumnFamilyTimeRange cftr : proto.getCfTimeRangeList()) {
- TimeRange timeRange = protoToTimeRange(cftr.getTimeRange());
+ TimeRange timeRange = toTimeRange(cftr.getTimeRange());
scan.setColumnFamilyTimeRange(cftr.getColumnFamily().toByteArray(),
timeRange.getMin(), timeRange.getMax());
}
}
if (proto.hasTimeRange()) {
- TimeRange timeRange = protoToTimeRange(proto.getTimeRange());
+ TimeRange timeRange = toTimeRange(proto.getTimeRange());
scan.setTimeRange(timeRange.getMin(), timeRange.getMax());
}
if (proto.hasFilter()) {
@@ -1242,20 +1235,13 @@ public final class ProtobufUtil {
if (get.getFilter() != null) {
builder.setFilter(ProtobufUtil.toFilter(get.getFilter()));
}
- for (Entry<byte[], TimeRange> cftr : get.getColumnFamilyTimeRange().entrySet()) {
- HBaseProtos.ColumnFamilyTimeRange.Builder b = HBaseProtos.ColumnFamilyTimeRange.newBuilder();
- b.setColumnFamily(UnsafeByteOperations.unsafeWrap(cftr.getKey()));
- b.setTimeRange(timeRangeToProto(cftr.getValue()));
- builder.addCfTimeRange(b);
- }
- TimeRange timeRange = get.getTimeRange();
- if (!timeRange.isAllTime()) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- builder.setTimeRange(timeRangeBuilder.build());
- }
+ get.getColumnFamilyTimeRange().forEach((cf, timeRange) -> {
+ builder.addCfTimeRange(HBaseProtos.ColumnFamilyTimeRange.newBuilder()
+ .setColumnFamily(UnsafeByteOperations.unsafeWrap(cf))
+ .setTimeRange(toTimeRange(timeRange))
+ .build());
+ });
+ builder.setTimeRange(ProtobufUtil.toTimeRange(get.getTimeRange()));
Map<String, byte[]> attributes = get.getAttributesMap();
if (!attributes.isEmpty()) {
NameBytesPair.Builder attributeBuilder = NameBytesPair.newBuilder();
@@ -1300,16 +1286,6 @@ public final class ProtobufUtil {
return builder.build();
}
- static void setTimeRange(final MutationProto.Builder builder, final TimeRange timeRange) {
- if (!timeRange.isAllTime()) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- builder.setTimeRange(timeRangeBuilder.build());
- }
- }
-
public static MutationProto toMutation(final MutationType type, final Mutation mutation)
throws IOException {
return toMutation(type, mutation, HConstants.NO_NONCE);
@@ -1341,12 +1317,10 @@ public final class ProtobufUtil {
builder.setNonce(nonce);
}
if (type == MutationType.INCREMENT) {
- TimeRange timeRange = ((Increment) mutation).getTimeRange();
- setTimeRange(builder, timeRange);
+ builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange()));
}
if (type == MutationType.APPEND) {
- TimeRange timeRange = ((Append) mutation).getTimeRange();
- setTimeRange(builder, timeRange);
+ builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange()));
}
ColumnValue.Builder columnBuilder = ColumnValue.newBuilder();
QualifierValue.Builder valueBuilder = QualifierValue.newBuilder();
@@ -1404,10 +1378,10 @@ public final class ProtobufUtil {
getMutationBuilderAndSetCommonFields(type, mutation, builder);
builder.setAssociatedCellCount(mutation.size());
if (mutation instanceof Increment) {
- setTimeRange(builder, ((Increment)mutation).getTimeRange());
+ builder.setTimeRange(ProtobufUtil.toTimeRange(((Increment) mutation).getTimeRange()));
}
if (mutation instanceof Append) {
- setTimeRange(builder, ((Append)mutation).getTimeRange());
+ builder.setTimeRange(ProtobufUtil.toTimeRange(((Append) mutation).getTimeRange()));
}
if (nonce != HConstants.NO_NONCE) {
builder.setNonce(nonce);
@@ -2754,24 +2728,11 @@ public final class ProtobufUtil {
return scList;
}
- private static HBaseProtos.TimeRange.Builder timeRangeToProto(TimeRange timeRange) {
- HBaseProtos.TimeRange.Builder timeRangeBuilder =
- HBaseProtos.TimeRange.newBuilder();
- timeRangeBuilder.setFrom(timeRange.getMin());
- timeRangeBuilder.setTo(timeRange.getMax());
- return timeRangeBuilder;
- }
-
- private static TimeRange protoToTimeRange(HBaseProtos.TimeRange timeRange) throws IOException {
- long minStamp = 0;
- long maxStamp = Long.MAX_VALUE;
- if (timeRange.hasFrom()) {
- minStamp = timeRange.getFrom();
- }
- if (timeRange.hasTo()) {
- maxStamp = timeRange.getTo();
- }
- return new TimeRange(minStamp, maxStamp);
+ public static TimeRange toTimeRange(HBaseProtos.TimeRange timeRange) {
+ return timeRange == null ?
+ TimeRange.allTime() :
+ new TimeRange(timeRange.hasFrom() ? timeRange.getFrom() : 0,
+ timeRange.hasTo() ? timeRange.getTo() : Long.MAX_VALUE);
}
/**
@@ -3226,4 +3187,13 @@ public final class ProtobufUtil {
.setTimeStampsOfLastAppliedOp(rls.getTimeStampsOfLastAppliedOp())
.build();
}
+
+ public static HBaseProtos.TimeRange toTimeRange(TimeRange timeRange) {
+ if (timeRange == null) {
+ timeRange = TimeRange.allTime();
+ }
+ return HBaseProtos.TimeRange.newBuilder().setFrom(timeRange.getMin())
+ .setTo(timeRange.getMax())
+ .build();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 0afcfe1..8ce2f1b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -235,16 +236,9 @@ public final class RequestConverter {
public static MutateRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final ByteArrayComparable comparator,
- final CompareType compareType, final Put put) throws IOException {
- MutateRequest.Builder builder = MutateRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- Condition condition = buildCondition(
- row, family, qualifier, comparator, compareType);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.PUT, put, MutationProto.newBuilder()));
- builder.setCondition(condition);
- return builder.build();
+ final CompareType compareType, TimeRange timeRange, final Put put) throws IOException {
+ return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
+ , put, MutationType.PUT);
}
/**
@@ -263,19 +257,21 @@ public final class RequestConverter {
public static MutateRequest buildMutateRequest(
final byte[] regionName, final byte[] row, final byte[] family,
final byte [] qualifier, final ByteArrayComparable comparator,
- final CompareType compareType, final Delete delete) throws IOException {
- MutateRequest.Builder builder = MutateRequest.newBuilder();
- RegionSpecifier region = buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- builder.setRegion(region);
- Condition condition = buildCondition(
- row, family, qualifier, comparator, compareType);
- builder.setMutation(ProtobufUtil.toMutation(MutationType.DELETE, delete,
- MutationProto.newBuilder()));
- builder.setCondition(condition);
- return builder.build();
+ final CompareType compareType, TimeRange timeRange, final Delete delete) throws IOException {
+ return buildMutateRequest(regionName, row, family, qualifier, comparator, compareType, timeRange
+ , delete, MutationType.DELETE);
+ }
+
+ public static MutateRequest buildMutateRequest(final byte[] regionName, final byte[] row,
+ final byte[] family, final byte[] qualifier, final ByteArrayComparable comparator,
+ final CompareType compareType, TimeRange timeRange, final Mutation mutation,
+ final MutationType type) throws IOException {
+ return MutateRequest.newBuilder()
+ .setRegion(buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName))
+ .setMutation(ProtobufUtil.toMutation(type, mutation))
+ .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
+ .build();
}
-
/**
* Create a protocol buffer MutateRequest for conditioned row mutations
*
@@ -289,17 +285,15 @@ public final class RequestConverter {
* @return a mutate request
* @throws IOException
*/
- public static ClientProtos.MultiRequest buildMutateRequest(
- final byte[] regionName, final byte[] row, final byte[] family,
- final byte [] qualifier, final ByteArrayComparable comparator,
- final CompareType compareType, final RowMutations rowMutations) throws IOException {
+ public static ClientProtos.MultiRequest buildMutateRequest(final byte[] regionName,
+ final byte[] row, final byte[] family, final byte[] qualifier,
+ final ByteArrayComparable comparator, final CompareType compareType, final TimeRange timeRange,
+ final RowMutations rowMutations) throws IOException {
RegionAction.Builder builder =
getRegionActionBuilderWithRegion(RegionAction.newBuilder(), regionName);
builder.setAtomic(true);
ClientProtos.Action.Builder actionBuilder = ClientProtos.Action.newBuilder();
MutationProto.Builder mutationBuilder = MutationProto.newBuilder();
- Condition condition = buildCondition(
- row, family, qualifier, comparator, compareType);
for (Mutation mutation: rowMutations.getMutations()) {
MutationType mutateType = null;
if (mutation instanceof Put) {
@@ -316,10 +310,9 @@ public final class RequestConverter {
actionBuilder.setMutation(mp);
builder.addAction(actionBuilder.build());
}
- ClientProtos.MultiRequest request =
- ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
- .setCondition(condition).build();
- return request;
+ return ClientProtos.MultiRequest.newBuilder().addRegionAction(builder.build())
+ .setCondition(buildCondition(row, family, qualifier, comparator, compareType, timeRange))
+ .build();
}
/**
@@ -1100,16 +1093,16 @@ public final class RequestConverter {
* @throws IOException
*/
public static Condition buildCondition(final byte[] row, final byte[] family,
- final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType)
- throws IOException {
- Condition.Builder builder = Condition.newBuilder();
- builder.setRow(UnsafeByteOperations.unsafeWrap(row));
- builder.setFamily(UnsafeByteOperations.unsafeWrap(family));
- builder.setQualifier(UnsafeByteOperations
- .unsafeWrap(qualifier == null ? HConstants.EMPTY_BYTE_ARRAY : qualifier));
- builder.setComparator(ProtobufUtil.toComparator(comparator));
- builder.setCompareType(compareType);
- return builder.build();
+ final byte[] qualifier, final ByteArrayComparable comparator, final CompareType compareType,
+ final TimeRange timeRange) {
+ return Condition.newBuilder().setRow(UnsafeByteOperations.unsafeWrap(row))
+ .setFamily(UnsafeByteOperations.unsafeWrap(family))
+ .setQualifier(UnsafeByteOperations.unsafeWrap(qualifier == null ?
+ HConstants.EMPTY_BYTE_ARRAY : qualifier))
+ .setComparator(ProtobufUtil.toComparator(comparator))
+ .setCompareType(compareType)
+ .setTimeRange(ProtobufUtil.toTimeRange(timeRange))
+ .build();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
index 77c0650..f16f060 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/shaded/protobuf/TestProtobufUtil.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
@@ -110,7 +111,7 @@ public class TestProtobufUtil {
getBuilder = ClientProtos.Get.newBuilder(proto);
getBuilder.setMaxVersions(1);
getBuilder.setCacheBlocks(true);
-
+ getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
Get get = ProtobufUtil.toGet(proto);
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
}
@@ -244,6 +245,7 @@ public class TestProtobufUtil {
scanBuilder.setMaxVersions(2);
scanBuilder.setCacheBlocks(false);
scanBuilder.setCaching(1024);
+ scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
ClientProtos.Scan expectedProto = scanBuilder.build();
ClientProtos.Scan actualProto = ProtobufUtil.toScan(
@@ -305,6 +307,7 @@ public class TestProtobufUtil {
Increment increment = ProtobufUtil.toIncrement(proto, null);
mutateBuilder.setTimestamp(increment.getTimeStamp());
+ mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange()));
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
}
@@ -345,6 +348,7 @@ public class TestProtobufUtil {
// append always use the latest timestamp,
// reset the timestamp to the original mutate
mutateBuilder.setTimestamp(append.getTimeStamp());
+ mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange()));
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
index e450346..c44ab69 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/TimeRange.java
@@ -37,6 +37,20 @@ import org.apache.yetus.audience.InterfaceAudience;
public class TimeRange {
public static final long INITIAL_MIN_TIMESTAMP = 0L;
public static final long INITIAL_MAX_TIMESTAMP = Long.MAX_VALUE;
+ private static final TimeRange ALL_TIME = new TimeRange(INITIAL_MIN_TIMESTAMP,
+ INITIAL_MAX_TIMESTAMP);
+
+ public static TimeRange allTime() {
+ return ALL_TIME;
+ }
+
+ public static TimeRange at(long ts) {
+ if (ts < 0 || ts == Long.MAX_VALUE) {
+ throw new IllegalArgumentException("invalid ts:" + ts);
+ }
+ return new TimeRange(ts, ts + 1);
+ }
+
private final long minStamp;
private final long maxStamp;
private final boolean allTime;
@@ -150,7 +164,10 @@ public class TimeRange {
* @param bytes timestamp to check
* @param offset offset into the bytes
* @return true if within TimeRange, false if not
+ * @deprecated This is made @InterfaceAudience.Private in the 2.0 line and above and may be
+ * changed to private or removed in 3.0. Use {@link #withinTimeRange(long)} instead
*/
+ @Deprecated
public boolean withinTimeRange(byte [] bytes, int offset) {
if (allTime) {
return true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-protocol-shaded/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 325b9c1..14abb08 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -143,6 +143,7 @@ message Condition {
required bytes qualifier = 3;
required CompareType compare_type = 4;
required Comparator comparator = 5;
+ optional TimeRange time_range = 6;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 817c26e..3681ae9 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -143,6 +143,7 @@ message Condition {
required bytes qualifier = 3;
required CompareType compare_type = 4;
required Comparator comparator = 5;
+ optional TimeRange time_range = 6;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index 21c7858..b8d0035 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -976,6 +976,11 @@ public class RemoteHTable implements Table {
}
@Override
+ public CheckAndMutateBuilder timeRange(TimeRange timeRange) {
+ throw new UnsupportedOperationException("timeRange not implemented");
+ }
+
+ @Override
public CheckAndMutateBuilder ifNotExists() {
throw new UnsupportedOperationException("CheckAndMutate for non-equal comparison "
+ "not implemented");
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1be3d65..1a3693b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -3926,28 +3926,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier,
- CompareOperator op, ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL)
- throws IOException{
+ public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
+ ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException {
checkMutationType(mutation, row);
- return doCheckAndRowMutate(row, family, qualifier, op, comparator, null,
- mutation);
+ return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, null, mutation);
}
@Override
- public boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
- CompareOperator op, ByteArrayComparable comparator, RowMutations rm)
- throws IOException {
- return doCheckAndRowMutate(row, family, qualifier, op, comparator, rm, null);
+ public boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
+ ByteArrayComparable comparator, TimeRange timeRange, RowMutations rm) throws IOException {
+ return doCheckAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm, null);
}
/**
* checkAndMutate and checkAndRowMutate are 90% the same. Rather than copy/paste, below has
* switches in the few places where there is deviation.
*/
- private boolean doCheckAndRowMutate(byte [] row, byte [] family, byte [] qualifier,
- CompareOperator op, ByteArrayComparable comparator, RowMutations rowMutations,
- Mutation mutation)
+ private boolean doCheckAndRowMutate(byte[] row, byte[] family, byte[] qualifier,
+ CompareOperator op, ByteArrayComparable comparator, TimeRange timeRange,
+ RowMutations rowMutations, Mutation mutation)
throws IOException {
// Could do the below checks but seems wacky with two callers only. Just comment out for now.
// One caller passes a Mutation, the other passes RowMutation. Presume all good so we don't
@@ -3962,6 +3959,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Get get = new Get(row);
checkFamily(family);
get.addColumn(family, qualifier);
+ if (timeRange != null) {
+ get.setTimeRange(timeRange.getMin(), timeRange.getMax());
+ }
// Lock row - note that doBatchMutate will relock this row if called
checkRow(row, "doCheckAndRowMutate");
RowLock rowLock = getRowLockInternal(get.getRow(), false, null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 02d7e98..92e081b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.ipc.PriorityFunction;
@@ -591,9 +592,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
* @param cellScanner if non-null, the mutation data -- the Cell content.
*/
private boolean checkAndRowMutate(final HRegion region, final List<ClientProtos.Action> actions,
- final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier,
- CompareOperator op, ByteArrayComparable comparator, RegionActionResult.Builder builder,
- ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
+ final CellScanner cellScanner, byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
+ ByteArrayComparable comparator, TimeRange timeRange, RegionActionResult.Builder builder,
+ ActivePolicyEnforcement spaceQuotaEnforcement) throws IOException {
int countOfCompleteMutation = 0;
try {
if (!region.getRegionInfo().isMetaRegion()) {
@@ -636,7 +637,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.addResultOrException(
resultOrExceptionOrBuilder.build());
}
- return region.checkAndRowMutate(row, family, qualifier, op, comparator, rm);
+ return region.checkAndRowMutate(row, family, qualifier, op, comparator, timeRange, rm);
} finally {
// Currently, the checkAndMutate isn't supported by batch so it won't mess up the cell scanner
// even if the malformed cells are not skipped.
@@ -2633,9 +2634,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
CompareOperator.valueOf(condition.getCompareType().name());
ByteArrayComparable comparator =
ProtobufUtil.toComparator(condition.getComparator());
- processed = checkAndRowMutate(region, regionAction.getActionList(),
- cellScanner, row, family, qualifier, op,
- comparator, regionActionResultBuilder, spaceQuotaEnforcement);
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+ TimeRange.allTime();
+ processed =
+ checkAndRowMutate(region, regionAction.getActionList(), cellScanner, row, family,
+ qualifier, op, comparator, timeRange, regionActionResultBuilder,
+ spaceQuotaEnforcement);
} else {
doAtomicBatchOp(regionActionResultBuilder, region, quota, regionAction.getActionList(),
cellScanner, spaceQuotaEnforcement);
@@ -2756,79 +2761,84 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
switch (type) {
- case APPEND:
- // TODO: this doesn't actually check anything.
- r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
- break;
- case INCREMENT:
- // TODO: this doesn't actually check anything.
- r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
- break;
- case PUT:
- Put put = ProtobufUtil.toPut(mutation, cellScanner);
- checkCellSizeLimit(region, put);
- // Throws an exception when violated
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
- quota.addMutation(put);
- if (request.hasCondition()) {
- Condition condition = request.getCondition();
- byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.getFamily().toByteArray();
- byte[] qualifier = condition.getQualifier().toByteArray();
- CompareOperator compareOp =
- CompareOperator.valueOf(condition.getCompareType().name());
- ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
- if (region.getCoprocessorHost() != null) {
- processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
- compareOp, comparator, put);
- }
- if (processed == null) {
- boolean result = region.checkAndMutate(row, family,
- qualifier, compareOp, comparator, put, true);
+ case APPEND:
+ // TODO: this doesn't actually check anything.
+ r = append(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
+ break;
+ case INCREMENT:
+ // TODO: this doesn't actually check anything.
+ r = increment(region, quota, mutation, cellScanner, nonceGroup, spaceQuotaEnforcement);
+ break;
+ case PUT:
+ Put put = ProtobufUtil.toPut(mutation, cellScanner);
+ checkCellSizeLimit(region, put);
+ // Throws an exception when violated
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(put);
+ quota.addMutation(put);
+ if (request.hasCondition()) {
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.getFamily().toByteArray();
+ byte[] qualifier = condition.getQualifier().toByteArray();
+ CompareOperator compareOp =
+ CompareOperator.valueOf(condition.getCompareType().name());
+ ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+ TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndPut(row, family,
- qualifier, compareOp, comparator, put, result);
+ processed = region.getCoprocessorHost().preCheckAndPut(row, family, qualifier,
+ compareOp, comparator, put);
}
- processed = result;
- }
- } else {
- region.put(put);
- processed = Boolean.TRUE;
- }
- break;
- case DELETE:
- Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
- checkCellSizeLimit(region, delete);
- spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
- quota.addMutation(delete);
- if (request.hasCondition()) {
- Condition condition = request.getCondition();
- byte[] row = condition.getRow().toByteArray();
- byte[] family = condition.getFamily().toByteArray();
- byte[] qualifier = condition.getQualifier().toByteArray();
- CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
- ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
- if (region.getCoprocessorHost() != null) {
- processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
- comparator, delete);
+ if (processed == null) {
+ boolean result = region.checkAndMutate(row, family,
+ qualifier, compareOp, comparator, timeRange, put);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndPut(row, family,
+ qualifier, compareOp, comparator, put, result);
+ }
+ processed = result;
+ }
+ } else {
+ region.put(put);
+ processed = Boolean.TRUE;
}
- if (processed == null) {
- boolean result = region.checkAndMutate(row, family,
- qualifier, op, comparator, delete, true);
+ break;
+ case DELETE:
+ Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+ checkCellSizeLimit(region, delete);
+ spaceQuotaEnforcement.getPolicyEnforcement(region).check(delete);
+ quota.addMutation(delete);
+ if (request.hasCondition()) {
+ Condition condition = request.getCondition();
+ byte[] row = condition.getRow().toByteArray();
+ byte[] family = condition.getFamily().toByteArray();
+ byte[] qualifier = condition.getQualifier().toByteArray();
+ CompareOperator op = CompareOperator.valueOf(condition.getCompareType().name());
+ ByteArrayComparable comparator = ProtobufUtil.toComparator(condition.getComparator());
+ TimeRange timeRange = condition.hasTimeRange() ?
+ ProtobufUtil.toTimeRange(condition.getTimeRange()) :
+ TimeRange.allTime();
if (region.getCoprocessorHost() != null) {
- result = region.getCoprocessorHost().postCheckAndDelete(row, family,
- qualifier, op, comparator, delete, result);
+ processed = region.getCoprocessorHost().preCheckAndDelete(row, family, qualifier, op,
+ comparator, delete);
}
- processed = result;
+ if (processed == null) {
+ boolean result = region.checkAndMutate(row, family,
+ qualifier, op, comparator, timeRange, delete);
+ if (region.getCoprocessorHost() != null) {
+ result = region.getCoprocessorHost().postCheckAndDelete(row, family,
+ qualifier, op, comparator, delete, result);
+ }
+ processed = result;
+ }
+ } else {
+ region.delete(delete);
+ processed = Boolean.TRUE;
}
- } else {
- region.delete(delete);
- processed = Boolean.TRUE;
- }
- break;
- default:
- throw new DoNotRetryIOException(
- "Unsupported mutate type: " + type.name());
+ break;
+ default:
+ throw new DoNotRetryIOException("Unsupported mutate type: " + type.name());
}
if (processed != null) {
builder.setProcessed(processed.booleanValue());
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 27771ce..80b18b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -303,14 +304,31 @@ public interface Region extends ConfigurationObserver {
* @param family column family to check
* @param qualifier column qualifier to check
* @param op the comparison operator
- * @param comparator
- * @param mutation
- * @param writeToWAL
+ * @param comparator the expected value
+ * @param mutation data to put if check succeeds
+ * @return true if mutation was applied, false otherwise
+ */
+ default boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
+ ByteArrayComparable comparator, Mutation mutation) throws IOException {
+ return checkAndMutate(row, family, qualifier, op, comparator, TimeRange.allTime(), mutation);
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected value and if it does,
+ * it performs the mutation. If the passed value is null, the lack of column value
+ * (ie: non-existence) is used. See checkAndRowMutate to do many checkAndPuts at a time on a
+ * single row.
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param op the comparison operator
+ * @param comparator the expected value
+ * @param mutation data to put if check succeeds
+ * @param timeRange time range to check
* @return true if mutation was applied, false otherwise
- * @throws IOException
*/
boolean checkAndMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
- ByteArrayComparable comparator, Mutation mutation, boolean writeToWAL) throws IOException;
+ ByteArrayComparable comparator, TimeRange timeRange, Mutation mutation) throws IOException;
/**
* Atomically checks if a row/family/qualifier value matches the expected values and if it does,
@@ -321,13 +339,32 @@ public interface Region extends ConfigurationObserver {
* @param family column family to check
* @param qualifier column qualifier to check
* @param op the comparison operator
- * @param comparator
- * @param mutations
+ * @param comparator the expected value
+ * @param mutations data to put if check succeeds
+ * @return true if mutations were applied, false otherwise
+ */
+ default boolean checkAndRowMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op,
+ ByteArrayComparable comparator, RowMutations mutations) throws IOException {
+ return checkAndRowMutate(row, family, qualifier, op, comparator, TimeRange.allTime(),
+ mutations);
+ }
+
+ /**
+ * Atomically checks if a row/family/qualifier value matches the expected values and if it does,
+ * it performs the row mutations. If the passed value is null, the lack of column value
+ * (ie: non-existence) is used. Use to do many mutations on a single row. Use checkAndMutate
+ * to do one checkAndMutate at a time.
+ * @param row to check
+ * @param family column family to check
+ * @param qualifier column qualifier to check
+ * @param op the comparison operator
+ * @param comparator the expected value
+ * @param mutations data to put if check succeeds
+ * @param timeRange time range to check
* @return true if mutations were applied, false otherwise
- * @throws IOException
*/
boolean checkAndRowMutate(byte [] row, byte [] family, byte [] qualifier, CompareOperator op,
- ByteArrayComparable comparator, RowMutations mutations)
+ ByteArrayComparable comparator, TimeRange timeRange, RowMutations mutations)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
index 37182ec..576c0a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTable.java
@@ -40,6 +40,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -338,4 +339,66 @@ public class TestAsyncTable {
}
});
}
+
+ @Test
+ public void testCheckAndMutateWithTimeRange() throws Exception {
+ TEST_UTIL.createTable(TableName.valueOf("testCheckAndMutateWithTimeRange"), FAMILY);
+ AsyncTable<?> table = getTable.get();
+ final long ts = System.currentTimeMillis() / 2;
+ Put put = new Put(row);
+ put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
+
+ boolean ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .ifNotExists()
+ .thenPut(put)
+ .get();
+ assertTrue(ok);
+
+ ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts + 10000))
+ .ifEquals(VALUE)
+ .thenPut(put)
+ .get();
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts))
+ .ifEquals(VALUE)
+ .thenPut(put)
+ .get();
+ assertTrue(ok);
+
+ RowMutations rm = new RowMutations(row)
+ .add((Mutation) put);
+ ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts + 10000))
+ .ifEquals(VALUE)
+ .thenMutate(rm)
+ .get();
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts))
+ .ifEquals(VALUE)
+ .thenMutate(rm)
+ .get();
+ assertTrue(ok);
+
+ Delete delete = new Delete(row)
+ .addColumn(FAMILY, QUALIFIER);
+
+ ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts + 10000))
+ .ifEquals(VALUE)
+ .thenDelete(delete)
+ .get();
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(row, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts))
+ .ifEquals(VALUE)
+ .thenDelete(delete)
+ .get();
+ assertTrue(ok);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index 29d3439..5fba101 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.filter.SubstringComparator;
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -4832,6 +4833,60 @@ public class TestFromClientSide {
}
@Test
+ public void testCheckAndMutateWithTimeRange() throws IOException {
+ Table table = TEST_UTIL.createTable(TableName.valueOf(name.getMethodName()), FAMILY);
+ final long ts = System.currentTimeMillis() / 2;
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, ts, VALUE);
+
+ boolean ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .ifNotExists()
+ .thenPut(put);
+ assertTrue(ok);
+
+ ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts + 10000))
+ .ifEquals(VALUE)
+ .thenPut(put);
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts))
+ .ifEquals(VALUE)
+ .thenPut(put);
+ assertTrue(ok);
+
+ RowMutations rm = new RowMutations(ROW)
+ .add((Mutation) put);
+ ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts + 10000))
+ .ifEquals(VALUE)
+ .thenMutate(rm);
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts))
+ .ifEquals(VALUE)
+ .thenMutate(rm);
+ assertTrue(ok);
+
+ Delete delete = new Delete(ROW)
+ .addColumn(FAMILY, QUALIFIER);
+
+ ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts + 10000))
+ .ifEquals(VALUE)
+ .thenDelete(delete);
+ assertFalse(ok);
+
+ ok = table.checkAndMutate(ROW, FAMILY).qualifier(QUALIFIER)
+ .timeRange(TimeRange.at(ts))
+ .ifEquals(VALUE)
+ .thenDelete(delete);
+ assertTrue(ok);
+ }
+
+ @Test
public void testCheckAndPutWithCompareOp() throws IOException {
final byte [] value1 = Bytes.toBytes("aaaa");
final byte [] value2 = Bytes.toBytes("bbbb");
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
index 6305fa1..ef4ca25 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMalformedCellFromClient.java
@@ -239,7 +239,7 @@ public class TestMalformedCellFromClient {
ClientProtos.MutationProto.Builder mutationBuilder = ClientProtos.MutationProto.newBuilder();
ClientProtos.Condition condition = RequestConverter
.buildCondition(rm.getRow(), FAMILY, null, new BinaryComparator(new byte[10]),
- HBaseProtos.CompareType.EQUAL);
+ HBaseProtos.CompareType.EQUAL, null);
for (Mutation mutation : rm.getMutations()) {
ClientProtos.MutationProto.MutationType mutateType = null;
if (mutation instanceof Put) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
index 536af71..7f45e40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestProtobufUtil.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
@@ -104,6 +105,7 @@ public class TestProtobufUtil {
getBuilder = ClientProtos.Get.newBuilder(proto);
getBuilder.setMaxVersions(1);
getBuilder.setCacheBlocks(true);
+ getBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
Get get = ProtobufUtil.toGet(proto);
assertEquals(getBuilder.build(), ProtobufUtil.toGet(get));
@@ -146,6 +148,7 @@ public class TestProtobufUtil {
// append always use the latest timestamp,
// reset the timestamp to the original mutate
mutateBuilder.setTimestamp(append.getTimeStamp());
+ mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(append.getTimeRange()));
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.APPEND, append));
}
@@ -229,6 +232,7 @@ public class TestProtobufUtil {
Increment increment = ProtobufUtil.toIncrement(proto, null);
mutateBuilder.setTimestamp(increment.getTimeStamp());
+ mutateBuilder.setTimeRange(ProtobufUtil.toTimeRange(increment.getTimeRange()));
assertEquals(mutateBuilder.build(), ProtobufUtil.toMutation(MutationType.INCREMENT, increment));
}
@@ -314,6 +318,7 @@ public class TestProtobufUtil {
scanBuilder.setMaxVersions(2);
scanBuilder.setCacheBlocks(false);
scanBuilder.setCaching(1024);
+ scanBuilder.setTimeRange(ProtobufUtil.toTimeRange(TimeRange.allTime()));
ClientProtos.Scan expectedProto = scanBuilder.build();
ClientProtos.Scan actualProto = ProtobufUtil.toScan(
http://git-wip-us.apache.org/repos/asf/hbase/blob/468cc059/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
index b14c94f..3962bbe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestAtomicOperation.java
@@ -663,7 +663,7 @@ public class TestAtomicOperation {
}
testStep = TestStep.CHECKANDPUT_STARTED;
region.checkAndMutate(Bytes.toBytes("r1"), Bytes.toBytes(family), Bytes.toBytes("q1"),
- CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put, true);
+ CompareOperator.EQUAL, new BinaryComparator(Bytes.toBytes("10")), put);
testStep = TestStep.CHECKANDPUT_COMPLETED;
}
}