You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/12/26 08:50:18 UTC
hbase git commit: HBASE-19550 Wrap the cell passed via
Mutation#add(Cell) to be of ExtendedCell
Repository: hbase
Updated Branches:
refs/heads/branch-2 2abf7b508 -> 8362b0dba
HBASE-19550 Wrap the cell passed via Mutation#add(Cell) to be of ExtendedCell
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8362b0db
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8362b0db
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8362b0db
Branch: refs/heads/branch-2
Commit: 8362b0dba548b6c4b284586af9127a94e9fd3cdb
Parents: 2abf7b5
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Tue Dec 26 16:39:51 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Tue Dec 26 16:43:01 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Append.java | 19 +-
.../org/apache/hadoop/hbase/client/Delete.java | 17 +-
.../apache/hadoop/hbase/client/Increment.java | 9 +-
.../apache/hadoop/hbase/client/Mutation.java | 208 ++++++++++
.../org/apache/hadoop/hbase/client/Put.java | 25 +-
.../TestPassCustomCellViaRegionObserver.java | 403 +++++++++++++++++++
6 files changed, 628 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8362b0db/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 0cb51a2..b2995ed 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.security.access.Permission;
@@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Performs Append operations on a single row.
@@ -44,6 +46,7 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Public
public class Append extends Mutation {
+ private static final Logger LOG = LoggerFactory.getLogger(Append.class);
private static final long HEAP_OVERHEAD = ClassSize.REFERENCE + ClassSize.TIMERANGE;
private TimeRange tr = new TimeRange();
@@ -176,14 +179,12 @@ public class Append extends Mutation {
*/
@SuppressWarnings("unchecked")
public Append add(final Cell cell) {
- // Presume it is KeyValue for now.
- byte [] family = CellUtil.cloneFamily(cell);
-
- // Get cell list for the family
- List<Cell> list = getCellList(family);
-
- // find where the new entry should be placed in the List
- list.add(cell);
+ try {
+ super.add(cell);
+ } catch (IOException e) {
+ // we eat the exception of wrong row for BC..
+ LOG.error(e.toString(), e);
+ }
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8362b0db/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index 57f5648..b5a0b93 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.security.access.Permission;
@@ -170,22 +169,12 @@ public class Delete extends Mutation implements Comparable<Row> {
/**
* Add an existing delete marker to this Delete object.
- * @param kv An existing KeyValue of type "delete".
+ * @param cell An existing cell of type "delete".
* @return this for invocation chaining
* @throws IOException
*/
- public Delete add(Cell kv) throws IOException {
- if (!CellUtil.isDelete(kv)) {
- throw new IOException("The recently added KeyValue is not of type "
- + "delete. Rowkey: " + Bytes.toStringBinary(this.row));
- }
- if (!CellUtil.matchingRows(kv, this.row)) {
- throw new WrongRowIOException("The row in " + kv.toString() +
- " doesn't match the original one " + Bytes.toStringBinary(this.row));
- }
- byte [] family = CellUtil.cloneFamily(kv);
- List<Cell> list = getCellList(family);
- list.add(kv);
+ public Delete add(Cell cell) throws IOException {
+ super.add(cell);
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8362b0db/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index e9ae8fb..1ccc7e9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -98,14 +98,7 @@ public class Increment extends Mutation implements Comparable<Row> {
* @throws java.io.IOException e
*/
public Increment add(Cell cell) throws IOException{
- byte [] family = CellUtil.cloneFamily(cell);
- List<Cell> list = getCellList(family);
- //Checking that the row of the kv is the same as the put
- if (!CellUtil.matchingRows(cell, this.row)) {
- throw new WrongRowIOException("The row in " + cell +
- " doesn't match the original one " + Bytes.toStringBinary(this.row));
- }
- list.add(cell);
+ super.add(cell);
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8362b0db/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 3983f35..9472d70 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -18,24 +18,31 @@
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.Tag.TAG_LENGTH_SIZE;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import java.util.stream.Collectors;
+import org.apache.hadoop.hbase.ArrayBackedTag;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RawCell;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -53,6 +60,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ListMultimap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteArrayDataInput;
import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteArrayDataOutput;
import org.apache.hadoop.hbase.shaded.com.google.common.io.ByteStreams;
@@ -757,4 +765,204 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
HConstants.MAX_ROW_LENGTH);
}
}
+
+ Mutation add(Cell cell) throws IOException {
+ //Checking that the row of the kv is the same as the mutation
+ // TODO: It is fraught with risk if user pass the wrong row.
+ // Throwing the IllegalArgumentException is more suitable I'd say.
+ if (!CellUtil.matchingRows(cell, this.row)) {
+ throw new WrongRowIOException("The row in " + cell.toString() +
+ " doesn't match the original one " + Bytes.toStringBinary(this.row));
+ }
+
+ if (cell.getFamilyArray() == null || cell.getFamilyLength() == 0) {
+ throw new IllegalArgumentException("Family cannot be null");
+ }
+
+ byte[] family = CellUtil.cloneFamily(cell);
+ if (cell instanceof ExtendedCell) {
+ getCellList(family).add(cell);
+ } else {
+ getCellList(family).add(new CellWrapper(cell));
+ }
+ return this;
+ }
+
+ private static final class CellWrapper implements ExtendedCell {
+ private static final long FIXED_OVERHEAD = ClassSize.align(
+ ClassSize.OBJECT // object header
+ + KeyValue.TIMESTAMP_SIZE // timestamp
+ + Bytes.SIZEOF_LONG // sequence id
+ + 1 * ClassSize.REFERENCE); // references to cell
+ private final Cell cell;
+ private long sequenceId;
+ private long timestamp;
+
+ CellWrapper(Cell cell) {
+ assert !(cell instanceof ExtendedCell);
+ this.cell = cell;
+ this.sequenceId = cell.getSequenceId();
+ this.timestamp = cell.getTimestamp();
+ }
+
+ @Override
+ public void setSequenceId(long seqId) {
+ sequenceId = seqId;
+ }
+
+ @Override
+ public void setTimestamp(long ts) {
+ timestamp = ts;
+ }
+
+ @Override
+ public void setTimestamp(byte[] ts) {
+ timestamp = Bytes.toLong(ts);
+ }
+
+ @Override
+ public long getSequenceId() {
+ return sequenceId;
+ }
+
+ @Override
+ public byte[] getValueArray() {
+ return cell.getValueArray();
+ }
+
+ @Override
+ public int getValueOffset() {
+ return cell.getValueOffset();
+ }
+
+ @Override
+ public int getValueLength() {
+ return cell.getValueLength();
+ }
+
+ @Override
+ public byte[] getTagsArray() {
+ return cell.getTagsArray();
+ }
+
+ @Override
+ public int getTagsOffset() {
+ return cell.getTagsOffset();
+ }
+
+ @Override
+ public int getTagsLength() {
+ return cell.getTagsLength();
+ }
+
+ @Override
+ public byte[] getRowArray() {
+ return cell.getRowArray();
+ }
+
+ @Override
+ public int getRowOffset() {
+ return cell.getRowOffset();
+ }
+
+ @Override
+ public short getRowLength() {
+ return cell.getRowLength();
+ }
+
+ @Override
+ public byte[] getFamilyArray() {
+ return cell.getFamilyArray();
+ }
+
+ @Override
+ public int getFamilyOffset() {
+ return cell.getFamilyOffset();
+ }
+
+ @Override
+ public byte getFamilyLength() {
+ return cell.getFamilyLength();
+ }
+
+ @Override
+ public byte[] getQualifierArray() {
+ return cell.getQualifierArray();
+ }
+
+ @Override
+ public int getQualifierOffset() {
+ return cell.getQualifierOffset();
+ }
+
+ @Override
+ public int getQualifierLength() {
+ return cell.getQualifierLength();
+ }
+
+ @Override
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public byte getTypeByte() {
+ return cell.getTypeByte();
+ }
+
+ @Override
+ public Optional<Tag> getTag(byte type) {
+ if (cell instanceof RawCell) {
+ return ((RawCell) cell).getTag(type);
+ }
+ int length = getTagsLength();
+ int offset = getTagsOffset();
+ int pos = offset;
+ while (pos < offset + length) {
+ int tagLen = Bytes.readAsInt(getTagsArray(), pos, TAG_LENGTH_SIZE);
+ if (getTagsArray()[pos + TAG_LENGTH_SIZE] == type) {
+ return Optional.of(new ArrayBackedTag(getTagsArray(), pos,
+ tagLen + TAG_LENGTH_SIZE));
+ }
+ pos += TAG_LENGTH_SIZE + tagLen;
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public List<Tag> getTags() {
+ if (cell instanceof RawCell) {
+ return ((RawCell) cell).getTags();
+ }
+ return Lists.newArrayList(PrivateCellUtil.tagsIterator(cell));
+ }
+
+ @Override
+ public byte[] cloneTags() {
+ if (cell instanceof RawCell) {
+ return ((RawCell) cell).cloneTags();
+ } else {
+ return PrivateCellUtil.cloneTags(cell);
+ }
+ }
+
+ private long heapOverhead() {
+ return FIXED_OVERHEAD
+ + ClassSize.ARRAY // row
+ + getFamilyLength() == 0 ? 0 : ClassSize.ARRAY
+ + getQualifierLength() == 0 ? 0 : ClassSize.ARRAY
+ + getValueLength() == 0 ? 0 : ClassSize.ARRAY
+ + getTagsLength() == 0 ? 0 : ClassSize.ARRAY;
+ }
+
+ @Override
+ public long heapSize() {
+ return heapOverhead()
+ + ClassSize.align(getRowLength())
+ + ClassSize.align(getFamilyLength())
+ + ClassSize.align(getQualifierLength())
+ + ClassSize.align(getValueLength())
+ + ClassSize.align(getTagsLength());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8362b0db/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index 1a1176f..34ddf08 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.IndividualBytesFieldCell;
import org.apache.hadoop.hbase.KeyValue;
@@ -275,30 +274,12 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
* Add the specified KeyValue to this Put operation. Operation assumes that
* the passed KeyValue is immutable and its backing array will not be modified
* for the duration of this Put.
- * @param kv individual KeyValue
+ * @param cell individual cell
* @return this
* @throws java.io.IOException e
*/
- public Put add(Cell kv) throws IOException {
- // Family can not be null, otherwise NullPointerException is thrown when putting
- // the cell into familyMap
- if (kv.getFamilyArray() == null) {
- throw new IllegalArgumentException("Family cannot be null");
- }
-
- // Check timestamp
- if (ts < 0) {
- throw new IllegalArgumentException("Timestamp cannot be negative. ts=" + ts);
- }
-
- byte [] family = CellUtil.cloneFamily(kv);
- List<Cell> list = getCellList(family);
- //Checking that the row of the kv is the same as the put
- if (!CellUtil.matchingRows(kv, this.row)) {
- throw new WrongRowIOException("The row in " + kv.toString() +
- " doesn't match the original one " + Bytes.toStringBinary(this.row));
- }
- list.add(kv);
+ public Put add(Cell cell) throws IOException {
+ super.add(cell);
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8362b0db/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java
new file mode 100644
index 0000000..53abfa0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPassCustomCellViaRegionObserver.java
@@ -0,0 +1,403 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.ByteArrayComparable;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.junit.rules.TestRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestPassCustomCellViaRegionObserver {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestPassCustomCellViaRegionObserver.class);
+
+ @Rule public final TestRule timeout = CategoryBasedTimeout.builder().
+ withTimeout(this.getClass()).withLookingForStuckThread(true).build();
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private TableName tableName;
+ private Table table = null;
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final byte[] ROW = Bytes.toBytes("ROW");
+ private static final byte[] FAMILY = Bytes.toBytes("FAMILY");
+ private static final byte[] QUALIFIER = Bytes.toBytes("QUALIFIER");
+ private static final byte[] VALUE = Bytes.toBytes(10L);
+ private static final byte[] APPEND_VALUE = Bytes.toBytes("MB");
+
+ private static final byte[] QUALIFIER_FROM_CP = Bytes.toBytes("QUALIFIER_FROM_CP");
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ // small retry number can speed up the failed tests.
+ UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void clearTable() throws IOException {
+ RegionObserverImpl.COUNT.set(0);
+ tableName = TableName.valueOf(testName.getMethodName());
+ if (table != null) {
+ table.close();
+ }
+ try (Admin admin = UTIL.getAdmin()) {
+ for (TableName name : admin.listTableNames()) {
+ try {
+ admin.disableTable(name);
+ } catch (IOException e) {
+ }
+ admin.deleteTable(name);
+ }
+ table = UTIL.createTable(TableDescriptorBuilder.newBuilder(tableName)
+ .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+ .addCoprocessor(RegionObserverImpl.class.getName())
+ .build(), null);
+ }
+ }
+
+ @Test
+ public void testMutation() throws Exception {
+
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, QUALIFIER, VALUE);
+ table.put(put);
+ byte[] value = VALUE;
+ assertResult(table.get(new Get(ROW)), value, value);
+ assertObserverHasExecuted();
+
+ Increment inc = new Increment(ROW);
+ inc.addColumn(FAMILY, QUALIFIER, 10L);
+ table.increment(inc);
+ // QUALIFIER -> 10 (put) + 10 (increment)
+ // QUALIFIER_FROM_CP -> 10 (from cp's put) + 10 (from cp's increment)
+ value = Bytes.toBytes(20L);
+ assertResult(table.get(new Get(ROW)), value, value);
+ assertObserverHasExecuted();
+
+ Append append = new Append(ROW);
+ append.addColumn(FAMILY, QUALIFIER, APPEND_VALUE);
+ table.append(append);
+ // 10L + "MB"
+ value = ByteBuffer.wrap(new byte[value.length + APPEND_VALUE.length])
+ .put(value)
+ .put(APPEND_VALUE)
+ .array();
+ assertResult(table.get(new Get(ROW)), value, value);
+ assertObserverHasExecuted();
+
+ Delete delete = new Delete(ROW);
+ delete.addColumns(FAMILY, QUALIFIER);
+ table.delete(delete);
+ assertTrue(Arrays.asList(table.get(new Get(ROW)).rawCells()).toString(),
+ table.get(new Get(ROW)).isEmpty());
+ assertObserverHasExecuted();
+
+ assertTrue(table.checkAndPut(ROW, FAMILY, QUALIFIER, null, put));
+ assertObserverHasExecuted();
+
+ assertTrue(table.checkAndDelete(ROW, FAMILY, QUALIFIER, VALUE, delete));
+ assertObserverHasExecuted();
+
+ assertTrue(table.get(new Get(ROW)).isEmpty());
+ }
+
+ @Test
+ public void testMultiPut() throws Exception {
+ List<Put> puts = IntStream.range(0, 10)
+ .mapToObj(i -> new Put(ROW).addColumn(FAMILY, Bytes.toBytes(i), VALUE))
+ .collect(Collectors.toList());
+ table.put(puts);
+ assertResult(table.get(new Get(ROW)), VALUE);
+ assertObserverHasExecuted();
+
+ List<Delete> deletes = IntStream.range(0, 10)
+ .mapToObj(i -> new Delete(ROW).addColumn(FAMILY, Bytes.toBytes(i)))
+ .collect(Collectors.toList());
+ table.delete(deletes);
+ assertTrue(table.get(new Get(ROW)).isEmpty());
+ assertObserverHasExecuted();
+ }
+
+ private static void assertObserverHasExecuted() {
+ assertTrue(RegionObserverImpl.COUNT.getAndSet(0) > 0);
+ }
+
+ private static void assertResult(Result result, byte[] expectedValue) {
+ assertFalse(result.isEmpty());
+ for (Cell c : result.rawCells()) {
+ assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
+ assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
+ assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
+ }
+ }
+
+ private static void assertResult(Result result, byte[] expectedValue, byte[] expectedFromCp) {
+ assertFalse(result.isEmpty());
+ for (Cell c : result.rawCells()) {
+ assertTrue(c.toString(), Bytes.equals(ROW, CellUtil.cloneRow(c)));
+ assertTrue(c.toString(), Bytes.equals(FAMILY, CellUtil.cloneFamily(c)));
+ if (Bytes.equals(QUALIFIER, CellUtil.cloneQualifier(c))) {
+ assertTrue(c.toString(), Bytes.equals(expectedValue, CellUtil.cloneValue(c)));
+ } else if (Bytes.equals(QUALIFIER_FROM_CP, CellUtil.cloneQualifier(c))) {
+ assertTrue(c.toString(), Bytes.equals(expectedFromCp, CellUtil.cloneValue(c)));
+ } else {
+ fail("No valid qualifier");
+ }
+ }
+ }
+
+ private static Cell createCustomCell(byte[] row, byte[] family, byte[] qualifier,
+ Cell.DataType type, byte[] value) {
+ return new Cell() {
+
+ private byte[] getArray(byte[] array) {
+ return array == null ? HConstants.EMPTY_BYTE_ARRAY : array;
+ }
+
+ private int length(byte[] array) {
+ return array == null ? 0 : array.length;
+ }
+
+ @Override
+ public byte[] getRowArray() {
+ return getArray(row);
+ }
+
+ @Override
+ public int getRowOffset() {
+ return 0;
+ }
+
+ @Override
+ public short getRowLength() {
+ return (short) length(row);
+ }
+
+ @Override
+ public byte[] getFamilyArray() {
+ return getArray(family);
+ }
+
+ @Override
+ public int getFamilyOffset() {
+ return 0;
+ }
+
+ @Override
+ public byte getFamilyLength() {
+ return (byte) length(family);
+ }
+
+ @Override
+ public byte[] getQualifierArray() {
+ return getArray(qualifier);
+ }
+
+ @Override
+ public int getQualifierOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getQualifierLength() {
+ return length(qualifier);
+ }
+
+ @Override
+ public long getTimestamp() {
+ return HConstants.LATEST_TIMESTAMP;
+ }
+
+ @Override
+ public byte getTypeByte() {
+ return type.getCode();
+ }
+
+ @Override
+ public long getSequenceId() {
+ return 0;
+ }
+
+ @Override
+ public byte[] getValueArray() {
+ return getArray(value);
+ }
+
+ @Override
+ public int getValueOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getValueLength() {
+ return length(value);
+ }
+
+ @Override
+ public byte[] getTagsArray() {
+ return getArray(null);
+ }
+
+ @Override
+ public int getTagsOffset() {
+ return 0;
+ }
+
+ @Override
+ public int getTagsLength() {
+ return length(null);
+ }
+
+ @Override
+ public DataType getType() {
+ return type;
+ }
+ };
+ }
+
+ private static Cell createCustomCell(Put put) {
+ return createCustomCell(put.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, VALUE);
+ }
+
+ private static Cell createCustomCell(Append append) {
+ return createCustomCell(append.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put,
+ APPEND_VALUE);
+ }
+
+ private static Cell createCustomCell(Increment inc) {
+ return createCustomCell(inc.getRow(), FAMILY, QUALIFIER_FROM_CP, Cell.DataType.Put, VALUE);
+ }
+
+ private static Cell createCustomCell(Delete delete) {
+ return createCustomCell(delete.getRow(), FAMILY, QUALIFIER_FROM_CP,
+ Cell.DataType.DeleteColumn, null);
+ }
+
+ public static class RegionObserverImpl implements RegionCoprocessor, RegionObserver {
+ static final AtomicInteger COUNT = new AtomicInteger(0);
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit,
+ Durability durability) throws IOException {
+ put.add(createCustomCell(put));
+ COUNT.incrementAndGet();
+ }
+
+ @Override
+ public void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete,
+ WALEdit edit, Durability durability) throws IOException {
+ delete.add(createCustomCell(delete));
+ COUNT.incrementAndGet();
+ }
+
+ @Override
+ public boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put,
+ boolean result) throws IOException {
+ put.add(createCustomCell(put));
+ COUNT.incrementAndGet();
+ return result;
+ }
+
+ @Override
+ public boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row,
+ byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator,
+ Delete delete, boolean result) throws IOException {
+ delete.add(createCustomCell(delete));
+ COUNT.incrementAndGet();
+ return result;
+ }
+
+ @Override
+ public Result preAppend(ObserverContext<RegionCoprocessorEnvironment> c, Append append)
+ throws IOException {
+ append.add(createCustomCell(append));
+ COUNT.incrementAndGet();
+ return null;
+ }
+
+
+ @Override
+ public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
+ throws IOException {
+ increment.add(createCustomCell(increment));
+ COUNT.incrementAndGet();
+ return null;
+ }
+
+ }
+
+}