You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/31 12:46:10 UTC

[34/47] hbase git commit: HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL

HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f5ea00f7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f5ea00f7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f5ea00f7

Branch: refs/heads/HBASE-21512
Commit: f5ea00f72442e5c80f2a5fc6e99506127fa8d16b
Parents: c2d5991
Author: Guanghao Zhang <zg...@apache.org>
Authored: Wed Dec 26 17:42:02 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Dec 27 18:27:06 2018 +0800

----------------------------------------------------------------------
 .../hbase/coprocessor/RegionObserver.java       | 47 ++++++++++++++++++++
 .../hadoop/hbase/regionserver/HRegion.java      | 26 ++++++-----
 .../regionserver/RegionCoprocessorHost.java     | 29 +++++++++---
 .../hbase/security/access/AccessController.java | 30 ++++++++++---
 .../visibility/VisibilityController.java        | 30 +++++++++++--
 5 files changed, 134 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index c14cbd1..95b2150 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -1029,13 +1030,59 @@ public interface RegionObserver {
    * @param oldCell old cell containing previous value
    * @param newCell the new cell containing the computed value
    * @return the new cell, possibly changed
+   * @deprecated Use {@link #postIncrementBeforeWAL} or {@link #postAppendBeforeWAL} instead.
    */
+  @Deprecated
   default Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
       MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
     return newCell;
   }
 
   /**
+   * Called after a list of new cells has been created during an increment operation, but before
+   * they are committed to the WAL or memstore.
+   *
+   * @param ctx       the environment provided by the region server
+   * @param mutation  the current mutation
+   * @param cellPairs a list of cell pair. The first cell is old cell which may be null.
+   *                  And the second cell is the new cell.
+   * @return a list of cell pair, possibly changed.
+   */
+  default List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+      List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+    for (Pair<Cell, Cell> pair : cellPairs) {
+      resultPairs.add(new Pair<>(pair.getFirst(),
+          postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
+              pair.getSecond())));
+    }
+    return resultPairs;
+  }
+
+  /**
+   * Called after a list of new cells has been created during an append operation, but before
+   * they are committed to the WAL or memstore.
+   *
+   * @param ctx       the environment provided by the region server
+   * @param mutation  the current mutation
+   * @param cellPairs a list of cell pair. The first cell is old cell which may be null.
+   *                  And the second cell is the new cell.
+   * @return a list of cell pair, possibly changed.
+   */
+  default List<Pair<Cell, Cell>> postAppendBeforeWAL(
+      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+      List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+    for (Pair<Cell, Cell> pair : cellPairs) {
+      resultPairs.add(new Pair<>(pair.getFirst(),
+          postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
+              pair.getSecond())));
+    }
+    return resultPairs;
+  }
+
+  /**
    * Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing
    * this hook would help in creating customised DeleteTracker and returning
    * the newly created DeleteTracker

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9bf9309..ec222c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -70,6 +70,8 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -120,7 +122,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
 import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
 import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -8014,7 +8015,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
       throws IOException {
     byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
-    List<Cell> toApply = new ArrayList<>(deltas.size());
+    List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
     // Get previous values for all columns in this family.
     TimeRange tr = null;
     switch (op) {
@@ -8041,18 +8042,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           currentValuesIndex++;
         }
       }
+
       // Switch on whether this an increment or an append building the new Cell to apply.
       Cell newCell = null;
-      MutationType mutationType = null;
       switch (op) {
         case INCREMENT:
-          mutationType = MutationType.INCREMENT;
           long deltaAmount = getLongValue(delta);
           final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;
           newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));
           break;
         case APPEND:
-          mutationType = MutationType.APPEND;
           newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) ->
             ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
                     .put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
@@ -8063,18 +8062,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         default: throw new UnsupportedOperationException(op.toString());
       }
 
-      // Give coprocessors a chance to update the new cell
-      if (coprocessorHost != null) {
-        newCell =
-            coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);
-      }
-      toApply.add(newCell);
+      cellPairs.add(new Pair<>(currentValue, newCell));
       // Add to results to get returned to the Client. If null, cilent does not want results.
       if (results != null) {
         results.add(newCell);
       }
     }
-    return toApply;
+
+    // Give coprocessors a chance to update the new cells before apply to WAL or memstore
+    if (coprocessorHost != null) {
+      // Here the operation must be increment or append.
+      cellPairs = op == Operation.INCREMENT ?
+          coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
+          coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
+    }
+    return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
   }
 
   private static Cell reckonDelta(final Cell delta, final Cell currentCell,

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index dea13ca..16fd332 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.Reference;
@@ -1691,16 +1690,32 @@ public class RegionCoprocessorHost
         });
   }
 
-  public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
-      final Cell oldCell, Cell newCell) throws IOException {
+  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
+      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
     if (this.coprocEnvironments.isEmpty()) {
-      return newCell;
+      return cellPairs;
     }
     return execOperationWithResult(
-        new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) {
+        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
+            regionObserverGetter, cellPairs) {
           @Override
-          public Cell call(RegionObserver observer) throws IOException {
-            return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult());
+          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
+            return observer.postIncrementBeforeWAL(this, mutation, getResult());
+          }
+        });
+  }
+
+  public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
+      final List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    if (this.coprocEnvironments.isEmpty()) {
+      return cellPairs;
+    }
+    return execOperationWithResult(
+        new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
+            regionObserverGetter, cellPairs) {
+          @Override
+          public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
+            return observer.postAppendBeforeWAL(this, mutation, getResult());
           }
         });
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 82ec12d..6e2c9ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -36,6 +36,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ArrayBackedTag;
@@ -1849,14 +1850,34 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
   }
 
   @Override
-  public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
-      MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
+  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+      List<Pair<Cell, Cell>> cellPairs) throws IOException {
     // If the HFile version is insufficient to persist tags, we won't have any
     // work to do here
     if (!cellFeaturesEnabled) {
-      return newCell;
+      return cellPairs;
     }
+    return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
+        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+      List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    // If the HFile version is insufficient to persist tags, we won't have any
+    // work to do here
+    if (!cellFeaturesEnabled) {
+      return cellPairs;
+    }
+    return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
+        createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
+        .collect(Collectors.toList());
+  }
 
+  private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
     // Collect any ACLs from the old cell
     List<Tag> tags = Lists.newArrayList();
     List<Tag> aclTags = Lists.newArrayList();
@@ -1901,8 +1922,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
       return newCell;
     }
 
-    Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags);
-    return rewriteCell;
+    return PrivateCellUtil.createCell(newCell, tags);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index c4f3b95..2a18551 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -127,6 +127,7 @@ import org.slf4j.LoggerFactory;
 public class VisibilityController implements MasterCoprocessor, RegionCoprocessor,
     VisibilityLabelsService.Interface, MasterObserver, RegionObserver {
 
+
   private static final Logger LOG = LoggerFactory.getLogger(VisibilityController.class);
   private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
       + VisibilityController.class.getName());
@@ -688,8 +689,30 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
   }
 
   @Override
-  public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
-      MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
+  public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+      List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+    for (Pair<Cell, Cell> pair : cellPairs) {
+      resultPairs
+          .add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond())));
+    }
+    return resultPairs;
+  }
+
+  @Override
+  public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+      ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+      List<Pair<Cell, Cell>> cellPairs) throws IOException {
+    List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+    for (Pair<Cell, Cell> pair : cellPairs) {
+      resultPairs
+          .add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond())));
+    }
+    return resultPairs;
+  }
+
+  private Cell createNewCellWithTags(Mutation mutation, Cell newCell) throws IOException {
     List<Tag> tags = Lists.newArrayList();
     CellVisibility cellVisibility = null;
     try {
@@ -715,8 +738,7 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
       }
     }
 
-    Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags);
-    return rewriteCell;
+    return PrivateCellUtil.createCell(newCell, tags);
   }
 
   @Override