You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/12/31 12:46:10 UTC
[34/47] hbase git commit: HBASE-21643 Introduce two new region
coprocessor method and deprecated postMutationBeforeWAL
HBASE-21643 Introduce two new region coprocessor method and deprecated postMutationBeforeWAL
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f5ea00f7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f5ea00f7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f5ea00f7
Branch: refs/heads/HBASE-21512
Commit: f5ea00f72442e5c80f2a5fc6e99506127fa8d16b
Parents: c2d5991
Author: Guanghao Zhang <zg...@apache.org>
Authored: Wed Dec 26 17:42:02 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Dec 27 18:27:06 2018 +0800
----------------------------------------------------------------------
.../hbase/coprocessor/RegionObserver.java | 47 ++++++++++++++++++++
.../hadoop/hbase/regionserver/HRegion.java | 26 ++++++-----
.../regionserver/RegionCoprocessorHost.java | 29 +++++++++---
.../hbase/security/access/AccessController.java | 30 ++++++++++---
.../visibility/VisibilityController.java | 30 +++++++++++--
5 files changed, 134 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index c14cbd1..95b2150 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -1029,13 +1030,59 @@ public interface RegionObserver {
* @param oldCell old cell containing previous value
* @param newCell the new cell containing the computed value
* @return the new cell, possibly changed
+ * @deprecated Use {@link #postIncrementBeforeWAL} or {@link #postAppendBeforeWAL} instead.
*/
+ @Deprecated
default Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
return newCell;
}
/**
+ * Called after a list of new cells has been created during an increment operation, but before
+ * they are committed to the WAL or memstore.
+ *
+ * @param ctx the environment provided by the region server
+ * @param mutation the current mutation
+ * @param cellPairs a list of cell pair. The first cell is old cell which may be null.
+ * And the second cell is the new cell.
+ * @return a list of cell pair, possibly changed.
+ */
+ default List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+ for (Pair<Cell, Cell> pair : cellPairs) {
+ resultPairs.add(new Pair<>(pair.getFirst(),
+ postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
+ pair.getSecond())));
+ }
+ return resultPairs;
+ }
+
+ /**
+ * Called after a list of new cells has been created during an append operation, but before
+ * they are committed to the WAL or memstore.
+ *
+ * @param ctx the environment provided by the region server
+ * @param mutation the current mutation
+ * @param cellPairs a list of cell pair. The first cell is old cell which may be null.
+ * And the second cell is the new cell.
+ * @return a list of cell pair, possibly changed.
+ */
+ default List<Pair<Cell, Cell>> postAppendBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+ for (Pair<Cell, Cell> pair : cellPairs) {
+ resultPairs.add(new Pair<>(pair.getFirst(),
+ postMutationBeforeWAL(ctx, MutationType.INCREMENT, mutation, pair.getFirst(),
+ pair.getSecond())));
+ }
+ return resultPairs;
+ }
+
+ /**
* Called after the ScanQueryMatcher creates ScanDeleteTracker. Implementing
* this hook would help in creating customised DeleteTracker and returning
* the newly created DeleteTracker
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9bf9309..ec222c7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -70,6 +70,8 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
+import java.util.stream.Collectors;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -120,7 +122,6 @@ import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionSnare;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
@@ -8014,7 +8015,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Durability effectiveDurability, long now, List<Cell> deltas, List<Cell> results)
throws IOException {
byte[] columnFamily = store.getColumnFamilyDescriptor().getName();
- List<Cell> toApply = new ArrayList<>(deltas.size());
+ List<Pair<Cell, Cell>> cellPairs = new ArrayList<>(deltas.size());
// Get previous values for all columns in this family.
TimeRange tr = null;
switch (op) {
@@ -8041,18 +8042,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
currentValuesIndex++;
}
}
+
// Switch on whether this an increment or an append building the new Cell to apply.
Cell newCell = null;
- MutationType mutationType = null;
switch (op) {
case INCREMENT:
- mutationType = MutationType.INCREMENT;
long deltaAmount = getLongValue(delta);
final long newValue = currentValue == null ? deltaAmount : getLongValue(currentValue) + deltaAmount;
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) -> Bytes.toBytes(newValue));
break;
case APPEND:
- mutationType = MutationType.APPEND;
newCell = reckonDelta(delta, currentValue, columnFamily, now, mutation, (oldCell) ->
ByteBuffer.wrap(new byte[delta.getValueLength() + oldCell.getValueLength()])
.put(oldCell.getValueArray(), oldCell.getValueOffset(), oldCell.getValueLength())
@@ -8063,18 +8062,21 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
default: throw new UnsupportedOperationException(op.toString());
}
- // Give coprocessors a chance to update the new cell
- if (coprocessorHost != null) {
- newCell =
- coprocessorHost.postMutationBeforeWAL(mutationType, mutation, currentValue, newCell);
- }
- toApply.add(newCell);
+ cellPairs.add(new Pair<>(currentValue, newCell));
// Add to results to get returned to the Client. If null, cilent does not want results.
if (results != null) {
results.add(newCell);
}
}
- return toApply;
+
+ // Give coprocessors a chance to update the new cells before apply to WAL or memstore
+ if (coprocessorHost != null) {
+ // Here the operation must be increment or append.
+ cellPairs = op == Operation.INCREMENT ?
+ coprocessorHost.postIncrementBeforeWAL(mutation, cellPairs) :
+ coprocessorHost.postAppendBeforeWAL(mutation, cellPairs);
+ }
+ return cellPairs.stream().map(Pair::getSecond).collect(Collectors.toList());
}
private static Cell reckonDelta(final Cell delta, final Cell currentCell,
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index dea13ca..16fd332 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -67,7 +67,6 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver.MutationType;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
@@ -1691,16 +1690,32 @@ public class RegionCoprocessorHost
});
}
- public Cell postMutationBeforeWAL(final MutationType opType, final Mutation mutation,
- final Cell oldCell, Cell newCell) throws IOException {
+ public List<Pair<Cell, Cell>> postIncrementBeforeWAL(final Mutation mutation,
+ final List<Pair<Cell, Cell>> cellPairs) throws IOException {
if (this.coprocEnvironments.isEmpty()) {
- return newCell;
+ return cellPairs;
}
return execOperationWithResult(
- new ObserverOperationWithResult<RegionObserver, Cell>(regionObserverGetter, newCell) {
+ new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
+ regionObserverGetter, cellPairs) {
@Override
- public Cell call(RegionObserver observer) throws IOException {
- return observer.postMutationBeforeWAL(this, opType, mutation, oldCell, getResult());
+ public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
+ return observer.postIncrementBeforeWAL(this, mutation, getResult());
+ }
+ });
+ }
+
+ public List<Pair<Cell, Cell>> postAppendBeforeWAL(final Mutation mutation,
+ final List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ if (this.coprocEnvironments.isEmpty()) {
+ return cellPairs;
+ }
+ return execOperationWithResult(
+ new ObserverOperationWithResult<RegionObserver, List<Pair<Cell, Cell>>>(
+ regionObserverGetter, cellPairs) {
+ @Override
+ public List<Pair<Cell, Cell>> call(RegionObserver observer) throws IOException {
+ return observer.postAppendBeforeWAL(this, mutation, getResult());
}
});
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 82ec12d..6e2c9ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -36,6 +36,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ArrayBackedTag;
@@ -1849,14 +1850,34 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
}
@Override
- public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
- MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
+ public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
// If the HFile version is insufficient to persist tags, we won't have any
// work to do here
if (!cellFeaturesEnabled) {
- return newCell;
+ return cellPairs;
}
+ return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
+ createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ // If the HFile version is insufficient to persist tags, we won't have any
+ // work to do here
+ if (!cellFeaturesEnabled) {
+ return cellPairs;
+ }
+ return cellPairs.stream().map(pair -> new Pair<>(pair.getFirst(),
+ createNewCellWithTags(mutation, pair.getFirst(), pair.getSecond())))
+ .collect(Collectors.toList());
+ }
+ private Cell createNewCellWithTags(Mutation mutation, Cell oldCell, Cell newCell) {
// Collect any ACLs from the old cell
List<Tag> tags = Lists.newArrayList();
List<Tag> aclTags = Lists.newArrayList();
@@ -1901,8 +1922,7 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
return newCell;
}
- Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags);
- return rewriteCell;
+ return PrivateCellUtil.createCell(newCell, tags);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/f5ea00f7/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
index c4f3b95..2a18551 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/visibility/VisibilityController.java
@@ -127,6 +127,7 @@ import org.slf4j.LoggerFactory;
public class VisibilityController implements MasterCoprocessor, RegionCoprocessor,
VisibilityLabelsService.Interface, MasterObserver, RegionObserver {
+
private static final Logger LOG = LoggerFactory.getLogger(VisibilityController.class);
private static final Logger AUDITLOG = LoggerFactory.getLogger("SecurityLogger."
+ VisibilityController.class.getName());
@@ -688,8 +689,30 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
}
@Override
- public Cell postMutationBeforeWAL(ObserverContext<RegionCoprocessorEnvironment> ctx,
- MutationType opType, Mutation mutation, Cell oldCell, Cell newCell) throws IOException {
+ public List<Pair<Cell, Cell>> postIncrementBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+ for (Pair<Cell, Cell> pair : cellPairs) {
+ resultPairs
+ .add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond())));
+ }
+ return resultPairs;
+ }
+
+ @Override
+ public List<Pair<Cell, Cell>> postAppendBeforeWAL(
+ ObserverContext<RegionCoprocessorEnvironment> ctx, Mutation mutation,
+ List<Pair<Cell, Cell>> cellPairs) throws IOException {
+ List<Pair<Cell, Cell>> resultPairs = new ArrayList<>(cellPairs.size());
+ for (Pair<Cell, Cell> pair : cellPairs) {
+ resultPairs
+ .add(new Pair<>(pair.getFirst(), createNewCellWithTags(mutation, pair.getSecond())));
+ }
+ return resultPairs;
+ }
+
+ private Cell createNewCellWithTags(Mutation mutation, Cell newCell) throws IOException {
List<Tag> tags = Lists.newArrayList();
CellVisibility cellVisibility = null;
try {
@@ -715,8 +738,7 @@ public class VisibilityController implements MasterCoprocessor, RegionCoprocesso
}
}
- Cell rewriteCell = PrivateCellUtil.createCell(newCell, tags);
- return rewriteCell;
+ return PrivateCellUtil.createCell(newCell, tags);
}
@Override