You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/01/01 09:37:38 UTC

hbase git commit: HBASE-21660 Apply the cell to right memstore for increment/append operation

Repository: hbase
Updated Branches:
  refs/heads/master 7755d4bee -> 3ab895979


HBASE-21660 Apply the cell to right memstore for increment/append operation


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3ab89597
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3ab89597
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3ab89597

Branch: refs/heads/master
Commit: 3ab895979b643a2980bcdb7fee2078f14b614210
Parents: 7755d4b
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun Dec 30 18:52:03 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Tue Jan 1 17:32:44 2019 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  14 +-
 .../TestPostIncrementAndAppendBeforeWAL.java    | 235 +++++++++++++++++++
 2 files changed, 245 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3ab89597/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ec222c7..5ab61fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7980,12 +7980,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     for (Map.Entry<byte [], List<Cell>> entry: mutation.getFamilyCellMap().entrySet()) {
       final byte[] columnFamilyName = entry.getKey();
       List<Cell> deltas = entry.getValue();
-      HStore store = this.stores.get(columnFamilyName);
       // Reckon for the Store what to apply to WAL and MemStore.
-      List<Cell> toApply =
-        reckonDeltasByStore(store, op, mutation, effectiveDurability, now, deltas, results);
+      List<Cell> toApply = reckonDeltasByStore(stores.get(columnFamilyName), op, mutation,
+        effectiveDurability, now, deltas, results);
       if (!toApply.isEmpty()) {
-        forMemStore.put(store, toApply);
+        for (Cell cell : toApply) {
+          HStore store = getStore(cell);
+          if (store == null) {
+            checkFamily(CellUtil.cloneFamily(cell));
+          } else {
+            forMemStore.computeIfAbsent(store, key -> new ArrayList<>()).add(cell);
+          }
+        }
         if (writeToWAL) {
           if (walEdit == null) {
             walEdit = new WALEdit();

http://git-wip-us.apache.org/repos/asf/hbase/blob/3ab89597/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java
new file mode 100644
index 0000000..031960b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestPostIncrementAndAppendBeforeWAL.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.ExtendedCellBuilderFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TestFromClientSide;
+import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test coprocessor methods
+ * {@link RegionObserver#postIncrementBeforeWAL(ObserverContext, Mutation, List)} and
+ * {@link RegionObserver#postAppendBeforeWAL(ObserverContext, Mutation, List)}. These methods may
+ * change the cells which will be applied to memstore and WAL. So add unit test for the case which
+ * change the cell's column family.
+ */
+@Category({CoprocessorTests.class, MediumTests.class})
+public class TestPostIncrementAndAppendBeforeWAL {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestPostIncrementAndAppendBeforeWAL.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestFromClientSide.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static Connection connection;
+
+  private static final byte [] ROW = Bytes.toBytes("row");
+  private static final String CF1 = "cf1";
+  private static final byte[] CF1_BYTES = Bytes.toBytes(CF1);
+  private static final String CF2 = "cf2";
+  private static final byte[] CF2_BYTES = Bytes.toBytes(CF2);
+  private static final String CF_NOT_EXIST = "cf_not_exist";
+  private static final byte[] CF_NOT_EXIST_BYTES = Bytes.toBytes(CF_NOT_EXIST);
+  private static final byte[] CQ1 = Bytes.toBytes("cq1");
+  private static final byte[] CQ2 = Bytes.toBytes("cq2");
+  private static final byte[] VALUE = Bytes.toBytes("value");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    UTIL.startMiniCluster();
+    connection = UTIL.getConnection();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    connection.close();
+    UTIL.shutdownMiniCluster();
+  }
+
+  private void createTableWithCoprocessor(TableName tableName, String coprocessor)
+      throws IOException {
+    TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF1_BYTES).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2_BYTES).build())
+        .setCoprocessor(coprocessor).build();
+    connection.getAdmin().createTable(tableDesc);
+  }
+
+  @Test
+  public void testChangeCellWithDifferntColumnFamily() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    createTableWithCoprocessor(tableName,
+      ChangeCellWithDifferntColumnFamilyObserver.class.getName());
+
+    try (Table table = connection.getTable(tableName)) {
+      Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
+      table.increment(increment);
+      Get get = new Get(ROW).addColumn(CF2_BYTES, CQ1);
+      Result result = table.get(get);
+      assertEquals(1, result.size());
+      assertEquals(1, Bytes.toLong(result.getValue(CF2_BYTES, CQ1)));
+
+      Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
+      table.append(append);
+      get = new Get(ROW).addColumn(CF2_BYTES, CQ2);
+      result = table.get(get);
+      assertEquals(1, result.size());
+      assertTrue(Bytes.equals(VALUE, result.getValue(CF2_BYTES, CQ2)));
+    }
+  }
+
+  @Test
+  public void testChangeCellWithNotExistColumnFamily() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    createTableWithCoprocessor(tableName,
+      ChangeCellWithNotExistColumnFamilyObserver.class.getName());
+
+    try (Table table = connection.getTable(tableName)) {
+      try {
+        Increment increment = new Increment(ROW).addColumn(CF1_BYTES, CQ1, 1);
+        table.increment(increment);
+        fail("should throw NoSuchColumnFamilyException");
+      } catch (Exception e) {
+        assertTrue(e instanceof NoSuchColumnFamilyException);
+      }
+      try {
+        Append append = new Append(ROW).addColumn(CF1_BYTES, CQ2, VALUE);
+        table.append(append);
+        fail("should throw NoSuchColumnFamilyException");
+      } catch (Exception e) {
+        assertTrue(e instanceof NoSuchColumnFamilyException);
+      }
+    }
+  }
+
+  public static class ChangeCellWithDifferntColumnFamilyObserver
+      implements RegionCoprocessor, RegionObserver {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+        ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+        List<Pair<Cell, Cell>> cellPairs) throws IOException {
+      return cellPairs.stream()
+          .map(
+            pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
+          .collect(Collectors.toList());
+    }
+
+    private Cell newCellWithDifferentColumnFamily(Cell cell) {
+      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+          .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+          .setFamily(CF2_BYTES, 0, CF2_BYTES.length).setQualifier(CellUtil.cloneQualifier(cell))
+          .setTimestamp(cell.getTimestamp()).setType(cell.getType().getCode())
+          .setValue(CellUtil.cloneValue(cell)).build();
+    }
+
+    @Override
+    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+        ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+        List<Pair<Cell, Cell>> cellPairs) throws IOException {
+      return cellPairs.stream()
+          .map(
+            pair -> new Pair<>(pair.getFirst(), newCellWithDifferentColumnFamily(pair.getSecond())))
+          .collect(Collectors.toList());
+    }
+  }
+
+  public static class ChangeCellWithNotExistColumnFamilyObserver
+      implements RegionCoprocessor, RegionObserver {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+        ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+        List<Pair<Cell, Cell>> cellPairs) throws IOException {
+      return cellPairs.stream()
+          .map(
+            pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
+          .collect(Collectors.toList());
+    }
+
+    private Cell newCellWithNotExistColumnFamily(Cell cell) {
+      return ExtendedCellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+          .setRow(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength())
+          .setFamily(CF_NOT_EXIST_BYTES, 0, CF_NOT_EXIST_BYTES.length)
+          .setQualifier(CellUtil.cloneQualifier(cell)).setTimestamp(cell.getTimestamp())
+          .setType(cell.getType().getCode()).setValue(CellUtil.cloneValue(cell)).build();
+    }
+
+    @Override
+    public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+        ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+        List<Pair<Cell, Cell>> cellPairs) throws IOException {
+      return cellPairs.stream()
+          .map(
+            pair -> new Pair<>(pair.getFirst(), newCellWithNotExistColumnFamily(pair.getSecond())))
+          .collect(Collectors.toList());
+    }
+  }
+}
\ No newline at end of file