You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/07 22:57:26 UTC
[14/15] hbase git commit: HBASE-18961 doMiniBatchMutate() is split
into smaller member methods of BatchOperation and it's sub-classes
HBASE-18961 doMiniBatchMutate() is split into smaller member methods of BatchOperation and it's sub-classes
There is no functionality change except for below:
* Variable lastIndexExclusive was getting incremented while locking rows corresponding to input
operations. As a result when getRowLockInternal() method throws TimeoutIOException only operations
in range [nextIndexToProcess, lastIndexExclusive) was getting marked as FAILED before raising
exception up the call stack. With these changes all operations are getting marked as FAILED.
* Cluster Ids of first mutation is used consistently for entire batch. Previous behavior was to use
cluster ids of first mutation in a mini-batch
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4eae5a29
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4eae5a29
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4eae5a29
Branch: refs/heads/HBASE-19189
Commit: 4eae5a29749da1c34f1a2dd0b1f6aa6f7a9bbffd
Parents: 29fd1de
Author: Umesh Agashe <ua...@cloudera.com>
Authored: Sun Oct 8 00:31:12 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Tue Nov 7 10:00:49 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/NonceKey.java | 4 +-
.../hadoop/hbase/regionserver/HRegion.java | 1328 ++++++++++--------
.../MiniBatchOperationInProgress.java | 44 +-
.../regionserver/MultiRowMutationProcessor.java | 2 +-
.../TestMiniBatchOperationInProgress.java | 4 +-
.../access/TestWithDisabledAuthorization.java | 2 +-
6 files changed, 805 insertions(+), 579 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
index 6da808e..b658331 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
@@ -1,4 +1,4 @@
-/**
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.util;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -31,7 +30,6 @@ public class NonceKey {
private long nonce;
public NonceKey(long group, long nonce) {
- assert nonce != HConstants.NO_NONCE;
this.group = group;
this.nonce = nonce;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5cd27b8..82d4bd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -161,6 +161,7 @@ import org.apache.hadoop.hbase.util.EncryptionTest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HashedBytes;
+import org.apache.hadoop.hbase.util.NonceKey;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
@@ -199,6 +200,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import edu.umd.cs.findbugs.annotations.Nullable;
+
/**
* Regions store data for a certain region of a table. It stores all columns
* for each row. A given table consists of one or more Regions.
@@ -642,7 +645,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// flushPerChanges is to prevent too many changes in memstore
private long flushPerChanges;
private long blockingMemStoreSize;
- final long threadWakeFrequency;
// Used to guard closes
final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -757,7 +759,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
this.rsServices = rsServices;
- this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
setHTableSpecificConf();
this.scannerReadPoints = new ConcurrentHashMap<>();
@@ -1271,14 +1272,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return writeRequestsCount.sum();
}
- /**
- * Update the write request count for this region
- * @param i increment
- */
- public void updateWriteRequestsCount(long i) {
- writeRequestsCount.add(i);
- }
-
@Override
public long getMemStoreSize() {
return memstoreDataSize.get();
@@ -2218,7 +2211,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
}
- public static interface FlushResult {
+ public interface FlushResult {
enum Result {
FLUSHED_NO_COMPACTION_NEEDED,
FLUSHED_COMPACTION_NEEDED,
@@ -3025,105 +3018,355 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
- * Struct-like class that tracks the progress of a batch operation, accumulating status codes
- * and tracking the index at which processing is proceeding. These batch operations may get
- * split into mini-batches for processing.
+ * Class that tracks the progress of a batch operations, accumulating status codes and tracking
+ * the index at which processing is proceeding. These batch operations may get split into
+ * mini-batches for processing.
*/
private abstract static class BatchOperation<T> {
- T[] operations;
- int nextIndexToProcess = 0;
- OperationStatus[] retCodeDetails;
- WALEdit[] walEditsFromCoprocessors;
+ protected final T[] operations;
+ protected final OperationStatus[] retCodeDetails;
+ protected final WALEdit[] walEditsFromCoprocessors;
// reference family cell maps directly so coprocessors can mutate them if desired
- Map<byte[], List<Cell>>[] familyCellMaps;
- ObservedExceptionsInBatch observedExceptions;
- Durability durability; //Durability of the batch (highest durability of all operations)
+ protected final Map<byte[], List<Cell>>[] familyCellMaps;
+
+ protected final HRegion region;
+ protected int nextIndexToProcess = 0;
+ protected final ObservedExceptionsInBatch observedExceptions;
+ //Durability of the batch (highest durability of all operations)
+ protected Durability durability;
- public BatchOperation(T[] operations) {
+ public BatchOperation(final HRegion region, T[] operations) {
this.operations = operations;
this.retCodeDetails = new OperationStatus[operations.length];
- this.walEditsFromCoprocessors = new WALEdit[operations.length];
Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
+ this.walEditsFromCoprocessors = new WALEdit[operations.length];
familyCellMaps = new Map[operations.length];
+
+ this.region = region;
observedExceptions = new ObservedExceptionsInBatch();
durability = Durability.USE_DEFAULT;
}
+ /**
+ * Visitor interface for batch operations
+ */
+ @FunctionalInterface
+ public interface Visitor {
+ /**
+ * @param index operation index
+ * @return If true continue visiting remaining entries, break otherwise
+ */
+ boolean visit(int index) throws IOException;
+ }
+
+ /**
+ * Helper method for visiting pending/ all batch operations
+ */
+ public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor)
+ throws IOException {
+ assert lastIndexExclusive <= this.size();
+ for (int i = nextIndexToProcess; i < lastIndexExclusive; i++) {
+ if (!pendingOnly || isOperationPending(i)) {
+ if (!visitor.visit(i)) {
+ break;
+ }
+ }
+ }
+ }
+
public abstract Mutation getMutation(int index);
public abstract long getNonceGroup(int index);
public abstract long getNonce(int index);
- /** This method is potentially expensive and should only be used for non-replay CP path. */
+ /** This method is potentially expensive and useful mostly for non-replay CP path. */
public abstract Mutation[] getMutationsForCoprocs();
public abstract boolean isInReplay();
- public abstract long getReplaySequenceId();
+ public abstract long getOrigLogSeqNum();
+ public abstract void startRegionOperation() throws IOException;
+ public abstract void closeRegionOperation() throws IOException;
+
+ /**
+ * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs
+ * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on
+ * entire batch and will be called from outside of class to check and prepare batch. This can
+ * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a
+ * 'for' loop over mutations.
+ */
+ public abstract void checkAndPrepare() throws IOException;
+
+ /**
+ * Implement any Put request specific check and prepare logic here. Please refer to
+ * {@link #checkAndPrepareMutation(Mutation, long)} for how its used.
+ */
+ protected abstract void checkAndPreparePut(final Put p) throws IOException;
+
+ /**
+ * If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell
+ * count, tags and timestamp for all cells of all operations in a mini-batch.
+ */
+ public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation>
+ miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException;
+
+ /**
+ * Write mini-batch operations to MemStore
+ */
+ public abstract WriteEntry writeMiniBatchOperationsToMemStore(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+ throws IOException;
+
+ protected void writeMiniBatchOperationsToMemStore(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
+ throws IOException {
+ MemStoreSizing memStoreAccounting = new MemStoreSizing();
+ visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
+ // We need to update the sequence id for following reasons.
+ // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
+ // 2) If no WAL, FSWALEntry won't be used
+ // we use durability of the original mutation for the mutation passed by CP.
+ if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) {
+ region.updateSequenceId(familyCellMaps[index].values(), writeNumber);
+ }
+ applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);
+ return true;
+ });
+ // update memStore size
+ region.addAndGetMemStoreSize(memStoreAccounting);
+ }
public boolean isDone() {
return nextIndexToProcess == operations.length;
}
+ public int size() {
+ return operations.length;
+ }
+
+ public boolean isOperationPending(int index) {
+ return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN;
+ }
+
+ public List<UUID> getClusterIds() {
+ assert size() != 0;
+ return getMutation(0).getClusterIds();
+ }
+
/**
- * Validates each mutation and prepares a batch for write.
+ * Helper method that checks and prepares only one mutation. This can be used to implement
+ * {@link #checkAndPrepare()} for entire Batch.
* NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called
- * after prePut()/ preDelete() CP hooks are run for all mutations in a batch.
+ * after prePut()/ preDelete() CP hooks are run for the mutation
*/
- public void checkAndPrepare(final HRegion region) throws IOException {
- long now = EnvironmentEdgeManager.currentTime();
- for (int i = 0 ; i < operations.length; i++) {
- // Skip anything that "ran" already
- if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
- Mutation mutation = getMutation(i);
+ protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)
+ throws IOException {
+ region.checkRow(mutation.getRow(), "batchMutate");
+ if (mutation instanceof Put) {
+ // Check the families in the put. If bad, skip this one.
+ checkAndPreparePut((Put) mutation);
+ region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);
+ } else {
+ region.prepareDelete((Delete) mutation);
+ }
+ }
- try {
- region.checkAndPrepareMutation(mutation, isInReplay(), now);
-
- // store the family map reference to allow for mutations
- familyCellMaps[i] = mutation.getFamilyCellMap();
- // store durability for the batch (highest durability of all operations in the batch)
- Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
- if (tmpDur.ordinal() > durability.ordinal()) {
- durability = tmpDur;
- }
- } catch (NoSuchColumnFamilyException nscf) {
- final String msg = "No such column family in batch mutation. ";
- if (observedExceptions.hasSeenNoSuchFamily()) {
- LOG.warn(msg + nscf.getMessage());
- } else {
- LOG.warn(msg, nscf);
- observedExceptions.sawNoSuchFamily();
- }
- retCodeDetails[i] = new OperationStatus(
- OperationStatusCode.BAD_FAMILY, nscf.getMessage());
- } catch (FailedSanityCheckException fsce) {
- final String msg = "Batch Mutation did not pass sanity check. ";
- if (observedExceptions.hasSeenFailedSanityCheck()) {
- LOG.warn(msg + fsce.getMessage());
- } else {
- LOG.warn(msg, fsce);
- observedExceptions.sawFailedSanityCheck();
- }
- retCodeDetails[i] = new OperationStatus(
- OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
- } catch (WrongRegionException we) {
- final String msg = "Batch mutation had a row that does not belong to this region. ";
- if (observedExceptions.hasSeenWrongRegion()) {
- LOG.warn(msg + we.getMessage());
- } else {
- LOG.warn(msg, we);
- observedExceptions.sawWrongRegion();
+ protected void checkAndPrepareMutation(int index, long timestamp) throws IOException {
+ Mutation mutation = getMutation(index);
+ try {
+ this.checkAndPrepareMutation(mutation, timestamp);
+
+ // store the family map reference to allow for mutations
+ familyCellMaps[index] = mutation.getFamilyCellMap();
+ // store durability for the batch (highest durability of all operations in the batch)
+ Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
+ if (tmpDur.ordinal() > durability.ordinal()) {
+ durability = tmpDur;
+ }
+ } catch (NoSuchColumnFamilyException nscf) {
+ final String msg = "No such column family in batch mutation. ";
+ if (observedExceptions.hasSeenNoSuchFamily()) {
+ LOG.warn(msg + nscf.getMessage());
+ } else {
+ LOG.warn(msg, nscf);
+ observedExceptions.sawNoSuchFamily();
+ }
+ retCodeDetails[index] = new OperationStatus(
+ OperationStatusCode.BAD_FAMILY, nscf.getMessage());
+ } catch (FailedSanityCheckException fsce) {
+ final String msg = "Batch Mutation did not pass sanity check. ";
+ if (observedExceptions.hasSeenFailedSanityCheck()) {
+ LOG.warn(msg + fsce.getMessage());
+ } else {
+ LOG.warn(msg, fsce);
+ observedExceptions.sawFailedSanityCheck();
+ }
+ retCodeDetails[index] = new OperationStatus(
+ OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
+ } catch (WrongRegionException we) {
+ final String msg = "Batch mutation had a row that does not belong to this region. ";
+ if (observedExceptions.hasSeenWrongRegion()) {
+ LOG.warn(msg + we.getMessage());
+ } else {
+ LOG.warn(msg, we);
+ observedExceptions.sawWrongRegion();
+ }
+ retCodeDetails[index] = new OperationStatus(
+ OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+ }
+ }
+
+ /**
+ * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which
+ * a row lock can be acquired. All mutations with locked rows are considered to be
+ * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch
+ * is window over {@link BatchOperation} and contains contiguous pending operations.
+ *
+ * @param acquiredRowLocks keeps track of rowLocks acquired.
+ */
+ public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(
+ List<RowLock> acquiredRowLocks) throws IOException {
+ int readyToWriteCount = 0;
+ int lastIndexExclusive = 0;
+ for (; lastIndexExclusive < size(); lastIndexExclusive++) {
+ if (!isOperationPending(lastIndexExclusive)) {
+ continue;
+ }
+ Mutation mutation = getMutation(lastIndexExclusive);
+ // If we haven't got any rows in our batch, we should block to get the next one.
+ RowLock rowLock = null;
+ try {
+ rowLock = region.getRowLockInternal(mutation.getRow(), true);
+ } catch (TimeoutIOException e) {
+ // We will retry when other exceptions, but we should stop if we timeout .
+ throw e;
+ } catch (IOException ioe) {
+ LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
+ }
+ if (rowLock == null) {
+ // We failed to grab another lock
+ break; // Stop acquiring more rows for this batch
+ } else {
+ acquiredRowLocks.add(rowLock);
+ }
+ readyToWriteCount++;
+ }
+ return createMiniBatch(lastIndexExclusive, readyToWriteCount);
+ }
+
+ protected MiniBatchOperationInProgress<Mutation> createMiniBatch(final int lastIndexExclusive,
+ final int readyToWriteCount) {
+ return new MiniBatchOperationInProgress<>(getMutationsForCoprocs(), retCodeDetails,
+ walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);
+ }
+
+ /**
+ * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are
+ * present, they are merged to result WALEdit.
+ */
+ public List<Pair<NonceKey, WALEdit>> buildWALEdits(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+ List<Pair<NonceKey, WALEdit>> walEdits = new ArrayList<>();
+
+ visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {
+ private Pair<NonceKey, WALEdit> curWALEditForNonce;
+ @Override
+ public boolean visit(int index) throws IOException {
+ Mutation m = getMutation(index);
+ // we use durability of the original mutation for the mutation passed by CP.
+ if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
+ region.recordMutationWithoutWal(m.getFamilyCellMap());
+ return true;
+ }
+
+ // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each.
+ // Given how nonce keys are originally written, these should be contiguous.
+ // They don't have to be, it will still work, just write more WALEdits than needed.
+ long nonceGroup = getNonceGroup(index);
+ long nonce = getNonce(index);
+ if (curWALEditForNonce == null ||
+ curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup ||
+ curWALEditForNonce.getFirst().getNonce() != nonce) {
+ curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),
+ new WALEdit(miniBatchOp.getCellCount(), isInReplay()));
+ walEdits.add(curWALEditForNonce);
+ }
+ WALEdit walEdit = curWALEditForNonce.getSecond();
+
+ // Add WAL edits by CP
+ WALEdit fromCP = walEditsFromCoprocessors[index];
+ if (fromCP != null) {
+ for (Cell cell : fromCP.getCells()) {
+ walEdit.add(cell);
}
- retCodeDetails[i] = new OperationStatus(
- OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
}
+ addFamilyMapToWALEdit(familyCellMaps[index], walEdit);
+
+ return true;
+ }
+ });
+ return walEdits;
+ }
+
+ /**
+ * This method completes mini-batch operations by calling postBatchMutate() CP hook (if
+ * required) and completing mvcc.
+ */
+ public void completeMiniBatchOperations(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+ throws IOException {
+ if (writeEntry != null) {
+ region.mvcc.completeAndWait(writeEntry);
+ }
+ }
+
+ public void doPostOpCleanupForMiniBatch(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
+ boolean success) throws IOException {}
+
+ /**
+ * Atomically apply the given map of family->edits to the memstore.
+ * This handles the consistency control on its own, but the caller
+ * should already have locked updatesLock.readLock(). This also does
+ * <b>not</b> check the families for validity.
+ *
+ * @param familyMap Map of Cells by family
+ */
+ protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
+ MemStoreSizing memstoreAccounting) throws IOException {
+ for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
+ byte[] family = e.getKey();
+ List<Cell> cells = e.getValue();
+ assert cells instanceof RandomAccess;
+ region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);
+ }
+ }
+
+ /**
+ * Append the given map of family->edits to a WALEdit data structure.
+ * This does not write to the WAL itself.
+ * @param familyMap map of family->edits
+ * @param walEdit the destination entry to append into
+ */
+ private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
+ WALEdit walEdit) {
+ for (List<Cell> edits : familyMap.values()) {
+ assert edits instanceof RandomAccess;
+ int listSize = edits.size();
+ for (int i=0; i < listSize; i++) {
+ Cell cell = edits.get(i);
+ walEdit.add(cell);
}
}
}
}
+ /**
+ * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most
+ * of the logic is same.
+ */
private static class MutationBatchOperation extends BatchOperation<Mutation> {
private long nonceGroup;
private long nonce;
- public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) {
- super(operations);
+ public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup,
+ long nonce) {
+ super(region, operations);
this.nonceGroup = nonceGroup;
this.nonce = nonce;
}
@@ -3154,16 +3397,279 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public long getReplaySequenceId() {
- return 0;
+ public long getOrigLogSeqNum() {
+ return WALKey.NO_SEQUENCE_ID;
+ }
+
+ @Override
+ public void startRegionOperation() throws IOException {
+ region.startRegionOperation(Operation.BATCH_MUTATE);
+ }
+
+ @Override
+ public void closeRegionOperation() throws IOException {
+ region.closeRegionOperation(Operation.BATCH_MUTATE);
+ }
+
+ @Override
+ public void checkAndPreparePut(Put p) throws IOException {
+ region.checkFamilies(p.getFamilyCellMap().keySet());
+ }
+
+ @Override
+ public void checkAndPrepare() throws IOException {
+ final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes
+ visitBatchOperations(true, this.size(), new Visitor() {
+ private long now = EnvironmentEdgeManager.currentTime();
+ private WALEdit walEdit;
+ @Override
+ public boolean visit(int index) throws IOException {
+ // Run coprocessor pre hook outside of locks to avoid deadlock
+ if (region.coprocessorHost != null) {
+ if (walEdit == null) {
+ walEdit = new WALEdit();
+ }
+ callPreMutateCPHook(index, walEdit, metrics);
+ if (!walEdit.isEmpty()) {
+ walEditsFromCoprocessors[index] = walEdit;
+ walEdit = null;
+ }
+ }
+ if (isOperationPending(index)) {
+ // TODO: Currently validation is done with current time before acquiring locks and
+ // updates are done with different timestamps after acquiring locks. This behavior is
+ // inherited from the code prior to this change. Can this be changed?
+ checkAndPrepareMutation(index, now);
+ }
+ return true;
+ }
+ });
+
+ // FIXME: we may update metrics twice! here for all operations bypassed by CP and later in
+ // normal processing.
+ // Update metrics in same way as it is done when we go the normal processing route (we now
+ // update general metrics though a Coprocessor did the work).
+ if (region.metricsRegion != null) {
+ if (metrics[0] > 0) {
+ // There were some Puts in the batch.
+ region.metricsRegion.updatePut();
+ }
+ if (metrics[1] > 0) {
+ // There were some Deletes in the batch.
+ region.metricsRegion.updateDelete();
+ }
+ }
+ }
+
+ @Override
+ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
+ byte[] byteTS = Bytes.toBytes(timestamp);
+ visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
+ Mutation mutation = getMutation(index);
+ if (mutation instanceof Put) {
+ region.updateCellTimestamps(familyCellMaps[index].values(), byteTS);
+ miniBatchOp.incrementNumOfPuts();
+ } else {
+ region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);
+ miniBatchOp.incrementNumOfDeletes();
+ }
+ region.rewriteCellTags(familyCellMaps[index], mutation);
+
+ // update cell count
+ if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
+ for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+ miniBatchOp.addCellCount(cells.size());
+ }
+ }
+
+ WALEdit fromCP = walEditsFromCoprocessors[index];
+ if (fromCP != null) {
+ miniBatchOp.addCellCount(fromCP.size());
+ }
+ return true;
+ });
+
+ if (region.coprocessorHost != null) {
+ // calling the pre CP hook for batch mutation
+ region.coprocessorHost.preBatchMutate(miniBatchOp);
+ checkAndMergeCPMutations(miniBatchOp, acquiredRowLocks, timestamp);
+ }
+ }
+
+ @Override
+ public List<Pair<NonceKey, WALEdit>> buildWALEdits(final MiniBatchOperationInProgress<Mutation>
+ miniBatchOp) throws IOException {
+ List<Pair<NonceKey, WALEdit>> walEdits = super.buildWALEdits(miniBatchOp);
+ // for MutationBatchOperation, more than one nonce is not allowed
+ if (walEdits.size() > 1) {
+ throw new IOException("Found multiple nonce keys per batch!");
+ }
+ return walEdits;
+ }
+
+ @Override
+ public WriteEntry writeMiniBatchOperationsToMemStore(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry)
+ throws IOException {
+ if (writeEntry == null) {
+ writeEntry = region.mvcc.begin();
+ }
+ super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());
+ return writeEntry;
+ }
+
+ @Override
+ public void completeMiniBatchOperations(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+ throws IOException {
+ // TODO: can it be done after completing mvcc?
+ // calling the post CP hook for batch mutation
+ if (region.coprocessorHost != null) {
+ region.coprocessorHost.postBatchMutate(miniBatchOp);
+ }
+ super.completeMiniBatchOperations(miniBatchOp, writeEntry);
+ }
+
+ @Override
+ public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ final WALEdit walEdit, boolean success) throws IOException {
+ if (miniBatchOp != null) {
+ // synced so that the coprocessor contract is adhered to.
+ if (region.coprocessorHost != null) {
+ visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
+ // only for successful puts
+ if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) {
+ Mutation m = getMutation(i);
+ if (m instanceof Put) {
+ region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
+ } else {
+ region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
+ }
+ }
+ return true;
+ });
+ }
+
+ // See if the column families were consistent through the whole thing.
+ // if they were then keep them. If they were not then pass a null.
+ // null will be treated as unknown.
+ // Total time taken might be involving Puts and Deletes.
+ // Split the time for puts and deletes based on the total number of Puts and Deletes.
+ if (region.metricsRegion != null) {
+ if (miniBatchOp.getNumOfPuts() > 0) {
+ // There were some Puts in the batch.
+ region.metricsRegion.updatePut();
+ }
+ if (miniBatchOp.getNumOfDeletes() > 0) {
+ // There were some Deletes in the batch.
+ region.metricsRegion.updateDelete();
+ }
+ }
+ }
+
+ if (region.coprocessorHost != null) {
+ // call the coprocessor hook to do any finalization steps after the put is done
+ region.coprocessorHost.postBatchMutateIndispensably(
+ miniBatchOp != null ? miniBatchOp : createMiniBatch(size(), 0), success);
+ }
+ }
+
+ /**
+ * Runs prePut/ preDelete coprocessor hook for input mutation in a batch
+ * @param metrics Array of 2 ints. index 0: count of puts and index 1: count of deletes
+ */
+ private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] metrics)
+ throws IOException {
+ Mutation m = getMutation(index);
+ if (m instanceof Put) {
+ if (region.coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
+ // pre hook says skip this Put
+ // mark as success and skip in doMiniBatchMutation
+ metrics[0]++;
+ retCodeDetails[index] = OperationStatus.SUCCESS;
+ }
+ } else if (m instanceof Delete) {
+ Delete curDel = (Delete) m;
+ if (curDel.getFamilyCellMap().isEmpty()) {
+ // handle deleting a row case
+ // TODO: prepareDelete() has been called twice, before and after preDelete() CP hook.
+ // Can this be avoided?
+ region.prepareDelete(curDel);
+ }
+ if (region.coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
+ // pre hook says skip this Delete
+ // mark as success and skip in doMiniBatchMutation
+ metrics[1]++;
+ retCodeDetails[index] = OperationStatus.SUCCESS;
+ }
+ } else {
+ // In case of passing Append mutations along with the Puts and Deletes in batchMutate
+ // mark the operation return code as failure so that it will not be considered in
+ // the doMiniBatchMutation
+ retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE,
+ "Put/Delete mutations only supported in batchMutate() now");
+ }
+ }
+
+ private void checkAndMergeCPMutations(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ final List<RowLock> acquiredRowLocks, final long timestamp) throws IOException {
+ visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int i) -> {
+ // we pass (i - firstIndex) below since the call expects a relative index
+ Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - nextIndexToProcess);
+ if (cpMutations == null) {
+ return true;
+ }
+ // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
+ Mutation mutation = getMutation(i);
+ for (Mutation cpMutation : cpMutations) {
+ this.checkAndPrepareMutation(cpMutation, timestamp);
+
+ // Acquire row locks. If not, the whole batch will fail.
+ acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true));
+
+ // Returned mutations from coprocessor correspond to the Mutation at index i. We can
+ // directly add the cells from those mutations to the familyMaps of this mutation.
+ Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
+ // will get added to the memStore later
+ mergeFamilyMaps(familyCellMaps[i], cpFamilyMap);
+
+ // The durability of returned mutation is replaced by the corresponding mutation.
+ // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
+ // cells of returned mutation.
+ if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
+ for (List<Cell> cells : cpFamilyMap.values()) {
+ miniBatchOp.addCellCount(cells.size());
+ }
+ }
+ }
+ return true;
+ });
+ }
+
+ private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap,
+ Map<byte[], List<Cell>> toBeMerged) {
+ for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) {
+ List<Cell> cells = familyMap.get(entry.getKey());
+ if (cells == null) {
+ familyMap.put(entry.getKey(), entry.getValue());
+ } else {
+ cells.addAll(entry.getValue());
+ }
+ }
}
}
+ /**
+ * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
+ * of the logic is same.
+ */
private static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
- private long replaySeqId = 0;
- public ReplayBatchOperation(MutationReplay[] operations, long seqId) {
- super(operations);
- this.replaySeqId = seqId;
+ private long origLogSeqNum = 0;
+ public ReplayBatchOperation(final HRegion region, MutationReplay[] operations,
+ long origLogSeqNum) {
+ super(region, operations);
+ this.origLogSeqNum = origLogSeqNum;
}
@Override
@@ -3183,8 +3689,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public Mutation[] getMutationsForCoprocs() {
- assert false;
- throw new RuntimeException("Should not be called for replay batch");
+ return null;
}
@Override
@@ -3193,8 +3698,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
@Override
- public long getReplaySequenceId() {
- return this.replaySeqId;
+ public long getOrigLogSeqNum() {
+ return this.origLogSeqNum;
+ }
+
+ @Override
+ public void startRegionOperation() throws IOException {
+ region.startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+ }
+
+ @Override
+ public void closeRegionOperation() throws IOException {
+ region.closeRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+ }
+
+ /**
+ * During replay, there could exist column families which are removed between region server
+ * failure and replay
+ */
+ @Override
+ protected void checkAndPreparePut(Put p) throws IOException {
+ Map<byte[], List<Cell>> familyCellMap = p.getFamilyCellMap();
+ List<byte[]> nonExistentList = null;
+ for (byte[] family : familyCellMap.keySet()) {
+ if (!region.htableDescriptor.hasColumnFamily(family)) {
+ if (nonExistentList == null) {
+ nonExistentList = new ArrayList<>();
+ }
+ nonExistentList.add(family);
+ }
+ }
+ if (nonExistentList != null) {
+ for (byte[] family : nonExistentList) {
+ // Perhaps schema was changed between crash and replay
+ LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
+ familyCellMap.remove(family);
+ }
+ }
+ }
+
+ @Override
+ public void checkAndPrepare() throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ visitBatchOperations(true, this.size(), (int index) -> {
+ checkAndPrepareMutation(index, now);
+ return true;
+ });
+ }
+
+ @Override
+ public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+ long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
+ visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
+ // update cell count
+ for (List<Cell> cells : getMutation(index).getFamilyCellMap().values()) {
+ miniBatchOp.addCellCount(cells.size());
+ }
+ return true;
+ });
+ }
+
+ @Override
+ public WriteEntry writeMiniBatchOperationsToMemStore(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+ throws IOException {
+ super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum());
+ return writeEntry;
+ }
+
+ @Override
+ public void completeMiniBatchOperations(
+ final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+ throws IOException {
+ super.completeMiniBatchOperations(miniBatchOp, writeEntry);
+ region.mvcc.advanceTo(getOrigLogSeqNum());
}
}
@@ -3204,7 +3781,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
// * coprocessor calls (see ex. BulkDeleteEndpoint).
// So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
- return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce));
+ return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce));
}
public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
@@ -3232,14 +3809,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
return statuses;
}
- return batchMutate(new ReplayBatchOperation(mutations, replaySeqId));
+ return batchMutate(new ReplayBatchOperation(this, mutations, replaySeqId));
}
/**
* Perform a batch of mutations.
+ *
* It supports only Put and Delete mutations and will ignore other types passed. Operations in
* a batch are stored with highest durability specified of for all operations in a batch,
* except for {@link Durability#SKIP_WAL}.
+ *
+ * <p>This function is called from {@link #batchReplay(MutationReplay[], long)} with
+ * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[], long, long)} with
+ * {@link MutationBatchOperation} instance as an argument. As the processing of replay batch
+ * and mutation batch is very similar, lot of code is shared by providing generic methods in
+ * base class {@link BatchOperation}. The logic for this method and
+ * {@link #doMiniBatchMutate(BatchOperation)} is implemented using methods in base class which
+ * are overridden by derived classes to implement special behavior.
+ *
* @param batchOp contains the list of mutations
* @return an array of OperationStatus which internally contains the
* OperationStatusCode and the exceptionMessage if any.
@@ -3247,8 +3834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {
boolean initialized = false;
- Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
- startRegionOperation(op);
+ batchOp.startRegionOperation();
try {
while (!batchOp.isDone()) {
if (!batchOp.isInReplay()) {
@@ -3257,12 +3843,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
checkResources();
if (!initialized) {
- this.writeRequestsCount.add(batchOp.operations.length);
- if (!batchOp.isInReplay()) {
- callPreMutateCPHooks(batchOp);
- }
- // validate and prepare batch for write, after CP pre-hooks
- batchOp.checkAndPrepare(this);
+ this.writeRequestsCount.add(batchOp.size());
+ // validate and prepare batch for write, for MutationBatchOperation it also calls CP
+ // prePut()/ preDelete() hooks
+ batchOp.checkAndPrepare();
initialized = true;
}
doMiniBatchMutate(batchOp);
@@ -3270,296 +3854,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
requestFlushIfNeeded(newSize);
}
} finally {
- closeRegionOperation(op);
+ batchOp.closeRegionOperation();
}
return batchOp.retCodeDetails;
}
/**
- * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch.
- * @param batchOp
- */
- private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException {
- if (coprocessorHost == null) {
- return;
- }
- /* Run coprocessor pre hook outside of locks to avoid deadlock */
- WALEdit walEdit = new WALEdit();
- int noOfPuts = 0;
- int noOfDeletes = 0;
- for (int i = 0 ; i < batchOp.operations.length; i++) {
- Mutation m = batchOp.getMutation(i);
- if (m instanceof Put) {
- if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
- // pre hook says skip this Put
- // mark as success and skip in doMiniBatchMutation
- noOfPuts++;
- batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
- }
- } else if (m instanceof Delete) {
- Delete curDel = (Delete) m;
- if (curDel.getFamilyCellMap().isEmpty()) {
- // handle deleting a row case
- prepareDelete(curDel);
- }
- if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
- // pre hook says skip this Delete
- // mark as success and skip in doMiniBatchMutation
- noOfDeletes++;
- batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
- }
- } else {
- // In case of passing Append mutations along with the Puts and Deletes in batchMutate
- // mark the operation return code as failure so that it will not be considered in
- // the doMiniBatchMutation
- batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
- "Put/Delete mutations only supported in batchMutate() now");
- }
- if (!walEdit.isEmpty()) {
- batchOp.walEditsFromCoprocessors[i] = walEdit;
- walEdit = new WALEdit();
- }
- }
- // Update metrics in same way as it is done when we go the normal processing route (we now
- // update general metrics though a Coprocessor did the work).
- if (noOfPuts > 0) {
- // There were some Puts in the batch.
- if (this.metricsRegion != null) {
- this.metricsRegion.updatePut();
- }
- }
- if (noOfDeletes > 0) {
- // There were some Deletes in the batch.
- if (this.metricsRegion != null) {
- this.metricsRegion.updateDelete();
- }
- }
- }
-
- /**
* Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
* In here we also handle replay of edits on region recover.
* @return Change in size brought about by applying <code>batchOp</code>
*/
- // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120
private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
- boolean replay = batchOp.isInReplay();
- long currentNonceGroup = HConstants.NO_NONCE;
- long currentNonce = HConstants.NO_NONCE;
- WALEdit walEdit = null;
- boolean locked = false;
- // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
- int firstIndex = batchOp.nextIndexToProcess;
- int lastIndexExclusive = firstIndex;
boolean success = false;
- int noOfPuts = 0;
- int noOfDeletes = 0;
+ WALEdit walEdit = null;
WriteEntry writeEntry = null;
- int cellCount = 0;
+ boolean locked = false;
+ // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive)
+ MiniBatchOperationInProgress<Mutation> miniBatchOp = null;
/** Keep track of the locks we hold so we can release them in finally clause */
- List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
- MemStoreSizing memStoreAccounting = new MemStoreSizing();
+ List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size());
try {
- // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
- int numReadyToWrite = 0;
- for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) {
- if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
- != OperationStatusCode.NOT_RUN) {
- continue;
- }
- Mutation mutation = batchOp.getMutation(lastIndexExclusive);
- // If we haven't got any rows in our batch, we should block to get the next one.
- RowLock rowLock = null;
- try {
- rowLock = getRowLockInternal(mutation.getRow(), true);
- } catch (TimeoutIOException e) {
- // We will retry when other exceptions, but we should stop if we timeout .
- throw e;
- } catch (IOException ioe) {
- LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
- }
- if (rowLock == null) {
- // We failed to grab another lock
- break; // Stop acquiring more rows for this batch
- } else {
- acquiredRowLocks.add(rowLock);
- }
-
- numReadyToWrite++;
- if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
- for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
- cellCount += cells.size();
- }
- }
- }
+ // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
+ // locked rows
+ miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);
// We've now grabbed as many mutations off the list as we can
- // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
- if (numReadyToWrite <= 0) {
+ // Ensure we acquire at least one.
+ if (miniBatchOp.getReadyToWriteCount() <= 0) {
+ // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
return;
}
- // STEP 2. Update any LATEST_TIMESTAMP timestamps
+ lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
+ locked = true;
+
+ // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp
// We should record the timestamp only after we have acquired the rowLock,
// otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
long now = EnvironmentEdgeManager.currentTime();
- if (!replay) {
- byte[] byteNow = Bytes.toBytes(now);
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- // skip invalid
- if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
- // lastIndexExclusive was incremented above.
- continue;
- }
-
- Mutation mutation = batchOp.getMutation(i);
- if (mutation instanceof Put) {
- updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow);
- noOfPuts++;
- } else {
- prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow);
- noOfDeletes++;
- }
- rewriteCellTags(batchOp.familyCellMaps[i], mutation);
- WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
- if (fromCP != null) {
- cellCount += fromCP.size();
- }
- }
- }
- lock(this.updatesLock.readLock(), numReadyToWrite);
- locked = true;
-
- // calling the pre CP hook for batch mutation
- if (!replay && coprocessorHost != null) {
- MiniBatchOperationInProgress<Mutation> miniBatchOp =
- new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
- batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
- coprocessorHost.preBatchMutate(miniBatchOp);
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
- // lastIndexExclusive was incremented above.
- continue;
- }
- // we pass (i - firstIndex) below since the call expects a relative index
- Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
- if (cpMutations == null) {
- continue;
- }
- Mutation mutation = batchOp.getMutation(i);
- boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;
- // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
- for (int j = 0; j < cpMutations.length; j++) {
- Mutation cpMutation = cpMutations[j];
- checkAndPrepareMutation(cpMutation, replay, now);
-
- // Acquire row locks. If not, the whole batch will fail.
- acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
-
- // Returned mutations from coprocessor correspond to the Mutation at index i. We can
- // directly add the cells from those mutations to the familyMaps of this mutation.
- Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
- // will get added to the memStore later
- mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
-
- // The durability of returned mutation is replaced by the corresponding mutation.
- // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
- // cells of returned mutation.
- if (!skipWal) {
- for (List<Cell> cells : cpFamilyMap.values()) {
- cellCount += cells.size();
- }
- }
- }
- }
- }
+ batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
// STEP 3. Build WAL edit
- walEdit = new WALEdit(cellCount, replay);
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- // Skip puts that were determined to be invalid during preprocessing
- if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
- continue;
- }
+ List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);
- Mutation m = batchOp.getMutation(i);
- // we use durability of the original mutation for the mutation passed by CP.
- if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
- recordMutationWithoutWal(m.getFamilyCellMap());
- continue;
- }
+ // STEP 4. Append the WALEdits to WAL and sync.
+ for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
+ Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
+ walEdit = nonceKeyWALEditPair.getSecond();
+ NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
- long nonceGroup = batchOp.getNonceGroup(i);
- long nonce = batchOp.getNonce(i);
- // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
- // Given how nonces are originally written, these should be contiguous.
- // They don't have to be, it will still work, just write more WALEdits than needed.
- if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
- // Write what we have so far for nonces out to WAL
- appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);
- walEdit = new WALEdit(cellCount, replay);
- currentNonceGroup = nonceGroup;
- currentNonce = nonce;
+ if (walEdit != null && !walEdit.isEmpty()) {
+ writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
+ nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
}
- // Add WAL edits by CP
- WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
- if (fromCP != null) {
- for (Cell cell : fromCP.getCells()) {
- walEdit.add(cell);
- }
+ // STEP 6. Complete mvcc for all but last writeEntry (for replay case)
+ if (it.hasNext() && writeEntry != null) {
+ mvcc.complete(writeEntry);
+ writeEntry = null;
}
- addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit);
- }
-
- // STEP 4. Append the final edit to WAL and sync.
- Mutation mutation = batchOp.getMutation(firstIndex);
- writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now,
- currentNonceGroup, currentNonce,
- replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID);
- if (!replay && writeEntry == null) {
- // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC
- // transaction to get sequence id.
- writeEntry = mvcc.begin();
}
// STEP 5. Write back to memStore
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
- continue;
- }
- // We need to update the sequence id for following reasons.
- // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
- // 2) If no WAL, FSWALEntry won't be used
- // we use durability of the original mutation for the mutation passed by CP.
- boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
- if (updateSeqId) {
- this.updateSequenceId(batchOp.familyCellMaps[i].values(),
- replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
- }
- applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting);
- }
-
- // update memstore size
- this.addAndGetMemStoreSize(memStoreAccounting);
-
- // calling the post CP hook for batch mutation
- if (!replay && coprocessorHost != null) {
- MiniBatchOperationInProgress<Mutation> miniBatchOp =
- new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
- batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
- coprocessorHost.postBatchMutate(miniBatchOp);
- }
-
- // STEP 6. Complete mvcc.
- if (writeEntry != null) {
- mvcc.completeAndWait(writeEntry);
- writeEntry = null;
- }
- if (replay) {
- this.mvcc.advanceTo(batchOp.getReplaySequenceId());
- }
+ // NOTE: writeEntry can be null here
+ writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
+ // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
+ // complete mvcc for last writeEntry
+ batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
+ writeEntry = null;
success = true;
} finally {
// Call complete rather than completeAndWait because we probably had error if walKey != null
@@ -3570,122 +3933,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
releaseRowLocks(acquiredRowLocks);
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
- batchOp.retCodeDetails[i] = success? OperationStatus.SUCCESS : OperationStatus.FAILURE;
- }
- }
-
- // synced so that the coprocessor contract is adhered to.
- if (!replay && coprocessorHost != null) {
- for (int i = firstIndex; i < lastIndexExclusive; i++) {
- // only for successful puts
- if (batchOp.retCodeDetails[i].getOperationStatusCode()
- != OperationStatusCode.SUCCESS) {
- continue;
- }
- Mutation m = batchOp.getMutation(i);
- if (m instanceof Put) {
- coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
- } else {
- coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
- }
- }
- }
-
- // See if the column families were consistent through the whole thing.
- // if they were then keep them. If they were not then pass a null.
- // null will be treated as unknown.
- // Total time taken might be involving Puts and Deletes.
- // Split the time for puts and deletes based on the total number of Puts and Deletes.
-
- if (noOfPuts > 0) {
- // There were some Puts in the batch.
- if (this.metricsRegion != null) {
- this.metricsRegion.updatePut();
- }
- }
- if (noOfDeletes > 0) {
- // There were some Deletes in the batch.
- if (this.metricsRegion != null) {
- this.metricsRegion.updateDelete();
- }
- }
-
- if (coprocessorHost != null && !batchOp.isInReplay()) {
- // call the coprocessor hook to do any finalization steps
- // after the put is done
- MiniBatchOperationInProgress<Mutation> miniBatchOp =
- new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
- batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
- coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
- }
-
- batchOp.nextIndexToProcess = lastIndexExclusive;
- }
- }
-
- private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap,
- Map<byte[], List<Cell>> toBeMerged) {
- for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) {
- List<Cell> cells = familyMap.get(entry.getKey());
- if (cells == null) {
- familyMap.put(entry.getKey(), entry.getValue());
- } else {
- cells.addAll(entry.getValue());
- }
- }
- }
-
- private void appendCurrentNonces(final Mutation mutation, final boolean replay,
- final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)
- throws IOException {
- if (walEdit.isEmpty()) return;
- if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
- WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
- currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
- this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
- // Complete the mvcc transaction started down in append else it will block others
- this.mvcc.complete(walKey.getWriteEntry());
- }
+ final int finalLastIndexExclusive =
+ miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
+ final boolean finalSuccess = success;
+ batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> {
+ batchOp.retCodeDetails[i] =
+ finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
+ return true;
+ });
- private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now)
- throws IOException {
- checkRow(mutation.getRow(), "doMiniBatchMutation");
- if (mutation instanceof Put) {
- // Check the families in the put. If bad, skip this one.
- if (replay) {
- removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap());
- } else {
- checkFamilies(mutation.getFamilyCellMap().keySet());
- }
- checkTimestamps(mutation.getFamilyCellMap(), now);
- } else {
- prepareDelete((Delete)mutation);
- }
- }
+ batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess);
- /**
- * During replay, there could exist column families which are removed between region server
- * failure and replay
- */
- private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) {
- List<byte[]> nonExistentList = null;
- for (byte[] family : familyMap.keySet()) {
- if (!this.htableDescriptor.hasColumnFamily(family)) {
- if (nonExistentList == null) {
- nonExistentList = new ArrayList<>();
- }
- nonExistentList.add(family);
- }
- }
- if (nonExistentList != null) {
- for (byte[] family : nonExistentList) {
- // Perhaps schema was changed between crash and replay
- LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
- familyMap.remove(family);
- }
+ batchOp.nextIndexToProcess = finalLastIndexExclusive;
}
}
@@ -4003,25 +4262,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
doBatchMutate(p);
}
- /*
- * Atomically apply the given map of family->edits to the memstore.
- * This handles the consistency control on its own, but the caller
- * should already have locked updatesLock.readLock(). This also does
- * <b>not</b> check the families for validity.
- *
- * @param familyMap Map of Cells by family
- * @param memstoreSize
- */
- private void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
- MemStoreSizing memstoreAccounting) throws IOException {
- for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
- byte[] family = e.getKey();
- List<Cell> cells = e.getValue();
- assert cells instanceof RandomAccess;
- applyToMemStore(getStore(family), cells, false, memstoreAccounting);
- }
- }
-
/**
* @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
* set; when set we will run operations that make sense in the increment/append scenario
@@ -4090,24 +4330,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
- /**
- * Append the given map of family->edits to a WALEdit data structure.
- * This does not write to the WAL itself.
- * @param familyMap map of family->edits
- * @param walEdit the destination entry to append into
- */
- private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
- WALEdit walEdit) {
- for (List<Cell> edits : familyMap.values()) {
- assert edits instanceof RandomAccess;
- int listSize = edits.size();
- for (int i=0; i < listSize; i++) {
- Cell cell = edits.get(i);
- walEdit.add(cell);
- }
- }
- }
-
/*
* @param size
* @return True if size is over the flush threshold
@@ -5471,8 +5693,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private void releaseRowLocks(List<RowLock> rowLocks) {
if (rowLocks != null) {
- for (int i = 0; i < rowLocks.size(); i++) {
- rowLocks.get(i).release();
+ for (RowLock rowLock : rowLocks) {
+ rowLock.release();
}
rowLocks.clear();
}
@@ -5626,7 +5848,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* bulkLoadHFile() to perform any necessary
* pre/post processing of a given bulkload call
*/
- public static interface BulkLoadListener {
+ public interface BulkLoadListener {
/**
* Called before an HFile is actually loaded
* @param family family being loaded to
@@ -6081,7 +6303,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// to handle scan or get operation.
moreValues = nextInternal(outResults, scannerContext);
} else {
- List<Cell> tmpList = new ArrayList<Cell>();
+ List<Cell> tmpList = new ArrayList<>();
moreValues = nextInternal(tmpList, scannerContext);
outResults.addAll(tmpList);
}
@@ -6861,46 +7083,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
/**
- * Create a daughter region from given a temp directory with the region data.
- * @param hri Spec. for daughter region to open.
- * @throws IOException
- */
- public HRegion createDaughterRegionFromSplits(final RegionInfo hri) throws IOException {
- // Move the files from the temporary .splits to the final /table/region directory
- fs.commitDaughterRegion(hri);
-
- // Create the daughter HRegion instance
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
- this.getBaseConf(), hri, this.getTableDescriptor(), rsServices);
- r.readRequestsCount.add(this.getReadRequestsCount() / 2);
- r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount() / 2);
- r.writeRequestsCount.add(this.getWriteRequestsCount() / 2);
- return r;
- }
-
- /**
- * Create a merged region given a temp directory with the region data.
- * @param region_b another merging region
- * @return merged HRegion
- * @throws IOException
- */
- HRegion createMergedRegionFromMerges(final RegionInfo mergedRegionInfo,
- final HRegion region_b) throws IOException {
- HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
- fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
- this.getTableDescriptor(), this.rsServices);
- r.readRequestsCount.add(this.getReadRequestsCount()
- + region_b.getReadRequestsCount());
- r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount()
- + region_b.getFilteredReadRequestsCount());
- r.writeRequestsCount.add(this.getWriteRequestsCount()
-
- + region_b.getWriteRequestsCount());
- this.fs.commitMergedRegion(mergedRegionInfo);
- return r;
- }
-
- /**
* Computes the Path of the HRegion
*
* @param tabledir qualified path for table
@@ -6960,7 +7142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
}
- void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException {
+ void prepareGet(final Get get) throws IOException {
checkRow(get.getRow(), "Get");
// Verify families are all valid
if (get.hasFamilies()) {
@@ -7396,32 +7578,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return writeEntry associated with this append
*/
private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
- long now, long nonceGroup, long nonce, long replaySeqId) throws IOException {
- Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID,
+ long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
+ Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(),
+ "WALEdit is null or empty!");
+ Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != WALKey.NO_SEQUENCE_ID,
"Invalid replay sequence Id for replay WALEdit!");
+ // Using default cluster id, as this can only happen in the originating cluster.
+ // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
+ // here instead of WALKey directly to support legacy coprocessors.
+ WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
+ nonce, mvcc) :
+ new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+ this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
+ nonceGroup, nonce, mvcc, this.getReplicationScope());
+ if (walEdit.isReplay()) {
+ walKey.setOrigLogSeqNum(origLogSeqNum);
+ }
WriteEntry writeEntry = null;
- if (!walEdit.isEmpty()) {
- // Using default cluster id, as this can only happen in the originating cluster.
- // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
- // here instead of WALKey directly to support legacy coprocessors.
- WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
- nonce, mvcc) :
- new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
- this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
- nonceGroup, nonce, mvcc, this.getReplicationScope());
- if (walEdit.isReplay()) {
- walKey.setOrigLogSeqNum(replaySeqId);
+ try {
+ long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
+ // Call sync on our edit.
+ if (txid != 0) {
+ sync(txid, durability);
}
- try {
- long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
- // Call sync on our edit.
- if (txid != 0) sync(txid, durability);
- writeEntry = walKey.getWriteEntry();
- } catch (IOException ioe) {
- if (walKey != null) mvcc.complete(walKey.getWriteEntry());
- throw ioe;
+ writeEntry = walKey.getWriteEntry();
+ } catch (IOException ioe) {
+ if (walKey != null) {
+ mvcc.complete(walKey.getWriteEntry());
}
+ throw ioe;
}
return writeEntry;
}
@@ -7637,7 +7823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @return Sorted list of <code>cells</code> using <code>comparator</code>
*/
private static List<Cell> sort(List<Cell> cells, final CellComparator comparator) {
- Collections.sort(cells, comparator);
+ cells.sort(comparator);
return cells;
}
@@ -7658,7 +7844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ClassSize.OBJECT +
ClassSize.ARRAY +
51 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
- (15 * Bytes.SIZEOF_LONG) +
+ (14 * Bytes.SIZEOF_LONG) +
6 * Bytes.SIZEOF_BOOLEAN);
// woefully out of date - currently missing:
http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index 56a97e0..ba847a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -40,13 +41,22 @@ public class MiniBatchOperationInProgress<T> {
private final int firstIndex;
private final int lastIndexExclusive;
+ private int readyToWriteCount = 0;
+ private int cellCount = 0;
+ private int numOfPuts = 0;
+ private int numOfDeletes = 0;
+
+
public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
- WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) {
+ WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
+ int readyToWriteCount) {
+ Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex));
this.operations = operations;
this.retCodeDetails = retCodeDetails;
this.walEditsFromCoprocessors = walEditsFromCoprocessors;
this.firstIndex = firstIndex;
this.lastIndexExclusive = lastIndexExclusive;
+ this.readyToWriteCount = readyToWriteCount;
}
/**
@@ -127,4 +137,36 @@ public class MiniBatchOperationInProgress<T> {
return operationsFromCoprocessors == null ? null :
operationsFromCoprocessors[getAbsoluteIndex(index)];
}
+
+ public int getReadyToWriteCount() {
+ return readyToWriteCount;
+ }
+
+ public int getLastIndexExclusive() {
+ return lastIndexExclusive;
+ }
+
+ public int getCellCount() {
+ return cellCount;
+ }
+
+ public void addCellCount(int cellCount) {
+ this.cellCount += cellCount;
+ }
+
+ public int getNumOfPuts() {
+ return numOfPuts;
+ }
+
+ public void incrementNumOfPuts() {
+ this.numOfPuts += 1;
+ }
+
+ public int getNumOfDeletes() {
+ return numOfDeletes;
+ }
+
+ public void incrementNumOfDeletes() {
+ this.numOfDeletes += 1;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
index c8e9940..0d9d149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
@@ -137,7 +137,7 @@ MultiRowMutationProcessorResponse> {
if (coprocessorHost != null) {
miniBatch = new MiniBatchOperationInProgress<>(
mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
- mutations.size());
+ mutations.size(), mutations.size());
coprocessorHost.preBatchMutate(miniBatch);
}
// Apply edits to a single WALEdit
http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
index 4a59379..c3472b5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
@@ -44,7 +44,7 @@ public class TestMiniBatchOperationInProgress {
}
MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatch =
new MiniBatchOperationInProgress<>(operations, retCodeDetails,
- walEditsFromCoprocessors, 0, 5);
+ walEditsFromCoprocessors, 0, 5, 5);
assertEquals(5, miniBatch.size());
assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow()));
@@ -69,7 +69,7 @@ public class TestMiniBatchOperationInProgress {
}
miniBatch = new MiniBatchOperationInProgress<>(operations,
- retCodeDetails, walEditsFromCoprocessors, 7, 10);
+ retCodeDetails, walEditsFromCoprocessors, 7, 10, 3);
try {
miniBatch.setWalEdit(-1, new WALEdit());
fail("Should throw Exception while accessing out of range");
http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
index eb336fe..2fd3909 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
@@ -867,7 +867,7 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preBatchMutate(ObserverContextImpl.createAndPrepare(RCP_ENV),
- new MiniBatchOperationInProgress<>(null, null, null, 0, 0));
+ new MiniBatchOperationInProgress<>(null, null, null, 0, 0, 0));
return null;
}
}, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);