You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/01/01 09:37:38 UTC
hbase git commit: HBASE-21660 Apply the cell to right memstore for
increment/append operation
Repository: hbase
Updated Branches:
refs/heads/master 7755d4bee -> 3ab895979
HBASE-21660 Apply the cell to right memstore for increment/append operation
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ab89597
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ab89597
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ab89597
Branch: refs/heads/master
Commit: 3ab895979b643a2980bcdb7fee2078f14b614210
Parents: 7755d4b
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun Dec 30 18:52:03 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Tue Jan 1 17:32:44 2019 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 14 +-
.../TestPostIncrementAndAppendBeforeWAL.java | 235 +++++++++++++++++++
2 files changed, 245 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ab89597/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ec222c7..5ab61fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7980,12 +7980,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
final byte[] columnFamilyName = entry.getKey();
List<Cell> deltas = entry.getValue();
- HStore store = this.stores.get(columnFamilyName);
// Reckon for the Store what to apply to WAL and MemStore.
- List<Cell> toApply =
- reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);
+ List<Cell> toApply = reckonDeltasByStore(stores.get(columnFamilyName), op, mutation,
+ effectiveDurability, now, deltas, results);
if (!toApply.isEmpty()) {
- forMemStore.put(store, toApply);
+ for (Cell cell : toApply) {
+ HStore store = getStore(cell);
+ if (store == null) {
+ checkFamily(CellUtil.cloneFamily(cell));
+ } else {
+ forMemStore.computeIfAbsent(store, key -> new ArrayList<>()).add(cell);
+ }
+ }
if (writeToWAL) {
if (walEdit == null) {
walEdit = new WALEdit();
http://git-wip-us.apache.org/repos/asf/hbase/blob/3ab89597/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java
new file mode 100644
index 0000000..031960b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TestFromClientSide;
+import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test coprocessor methods
+ * {@link RegionObserver#postIncrementBeforeWAL(ObserverContext, Mutation, List)} and
+ * {@link RegionObserver#postAppendBeforeWAL(ObserverContext, Mutation, List)}. These methods may
+ * change the cells which will be applied to memstore and WAL. So add unit test for the case which
+ * change the cell's column family.
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestPostIncrementAndAppendBeforeWAL {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestPostIncrementAndAppendBeforeWAL.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static Connection connection;
+
+ private static final byte [] ROW = Bytes.toBytes("row");
+ private static final String CF1 = "cf1";
+ private static final byte[] CF1_BYTES = Bytes.toBytes(CF1);
+ private static final String CF2 = "cf2";
+ private static final byte[] CF2_BYTES = Bytes.toBytes(CF2);
+ private static final String CF_NOT_EXIST = "cf_not_exist";
+ private static final byte[] CF_NOT_EXIST_BYTES = Bytes.toBytes(CF_NOT_EXIST);
+ private static final byte[] CQ1 = Bytes.toBytes("cq1");
+ private static final byte[] CQ2 = Bytes.toBytes("cq2");
+ private static final byte[] VALUE = Bytes.toBytes("value");
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ connection = UTIL.getConnection();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ connection.close();
+ UTIL.shutdownMiniCluster();
+ }
+
+ private void createTableWithCoprocessor(TableName tableName, String coprocessor)
+ throws IOException {
+ TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF1_BYTES).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2_BYTES).build())
+ .setCoprocessor(coprocessor).build();
+ connection.getAdmin().createTable(tableDesc);
+ }
+
+ @Test
+ public void testChangeCellWithDifferntColumnFamily() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTableWithCoprocessor(tableName,
+ ChangeCellWithDifferntColumnFamilyObserver.class.getName());
+
+ try (Table table = connection.getTable(tableName)) {
+ Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
+ table.increment(increment);
+ Get get = new Get(ROW).addColumn(CF2_BYTES, CQ1);
+ Result result = table.get(get);
+ assertEquals(1, result.size());
+ assertEquals(1, Bytes.toLong(result.getValue(CF2_BYTES, CQ1)));
+
+ Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
+ table.append(append);
+ get = new Get(ROW).addColumn(CF2_BYTES, CQ2);
+ result = table.get(get);
+ assertEquals(1, result.size());
+ assertTrue(Bytes.equals(VALUE, result.getValue(CF2_BYTES, CQ2)));
+ }
+ }
+
+ @Test
+ public void testChangeCellWithNotExistColumnFamily() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ createTableWithCoprocessor(tableName,
+ ChangeCellWithNotExistColumnFamilyObserver.class.getName());
+
+ try (Table table = connection.getTable(tableName)) {
+ try {
+ Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
+ table.increment(increment);
+ fail("should throw NoSuchColumnFamilyException");
+ } catch (Exception e) {
+ assertTrue(e instanceof NoSuchColumnFamilyException);
+ }
+ try {
+ Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
+ table.append(append);
+ fail("should throw NoSuchColumnFamilyException");
+ } catch (Exception e) {
+ assertTrue(e instanceof NoSuchColumnFamilyException);
+ }
+ }
+ }
+
+ public static class ChangeCellWithDifferntColumnFamilyObserver
+ implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ return cellPairs.stream()
+ .map(
+ pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
+ .collect(Collectors.toList());
+ }
+
+ private Cell newCellWithDifferentColumnFamily(Cell cell) {
+ return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+ .setFamily(CF2_BYTES, 0, CF2_BYTES.length).setQualifier(CellUtil.cloneQualifier(cell))
+ .setTimestamp(cell.getTimestamp()).setType(cell.getType().getCode())
+ .setValue(CellUtil.cloneValue(cell)).build();
+ }
+
+ @Override
+ public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ return cellPairs.stream()
+ .map(
+ pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
+ .collect(Collectors.toList());
+ }
+ }
+
+ public static class ChangeCellWithNotExistColumnFamilyObserver
+ implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ return cellPairs.stream()
+ .map(
+ pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
+ .collect(Collectors.toList());
+ }
+
+ private Cell newCellWithNotExistColumnFamily(Cell cell) {
+ return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+ .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+ .setFamily(CF_NOT_EXIST_BYTES, 0, CF_NOT_EXIST_BYTES.length)
+ .setQualifier(CellUtil.cloneQualifier(cell)).setTimestamp(cell.getTimestamp())
+ .setType(cell.getType().getCode()).setValue(CellUtil.cloneValue(cell)).build();
+ }
+
+ @Override
+ public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ return cellPairs.stream()
+ .map(
+ pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
+ .collect(Collectors.toList());
+ }
+ }
+}
\ No newline at end of file