You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2017/11/07 22:57:26 UTC

[14/15] hbase git commit: HBASE-18961 doMiniBatchMutate() is split into smaller member methods of BatchOperation and it's sub-classes

HBASE-18961 doMiniBatchMutate() is split into smaller member methods of BatchOperation and it's sub-classes

There is no functionality change except for below:
* Variable lastIndexExclusive was getting incremented while locking rows corresponding to input
  operations. As a result when getRowLockInternal() method throws TimeoutIOException only operations
  in range [nextIndexToProcess, lastIndexExclusive) was getting marked as FAILED before raising
  exception up the call stack. With these changes all operations are getting marked as FAILED.
* Cluster Ids of first mutation is used consistently for entire batch. Previous behavior was to use
  cluster ids of first mutation in a mini-batch

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4eae5a29
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4eae5a29
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4eae5a29

Branch: refs/heads/HBASE-19189
Commit: 4eae5a29749da1c34f1a2dd0b1f6aa6f7a9bbffd
Parents: 29fd1de
Author: Umesh Agashe <ua...@cloudera.com>
Authored: Sun Oct 8 00:31:12 2017 -0700
Committer: Michael Stack <st...@apache.org>
Committed: Tue Nov 7 10:00:49 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/NonceKey.java  |    4 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 1328 ++++++++++--------
 .../MiniBatchOperationInProgress.java           |   44 +-
 .../regionserver/MultiRowMutationProcessor.java |    2 +-
 .../TestMiniBatchOperationInProgress.java       |    4 +-
 .../access/TestWithDisabledAuthorization.java   |    2 +-
 6 files changed, 805 insertions(+), 579 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
index 6da808e..b658331 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java
@@ -1,4 +1,4 @@
-/**
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.util;
 
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.yetus.audience.InterfaceAudience;
 
  /**
@@ -31,7 +30,6 @@ public class NonceKey {
   private long nonce;
 
   public NonceKey(long group, long nonce) {
-    assert nonce != HConstants.NO_NONCE;
     this.group = group;
     this.nonce = nonce;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5cd27b8..82d4bd2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -161,6 +161,7 @@ import org.apache.hadoop.hbase.util.EncryptionTest;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HashedBytes;
+import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
 import org.apache.hadoop.hbase.util.Threads;
@@ -199,6 +200,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDe
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
 
+import edu.umd.cs.findbugs.annotations.Nullable;
+
 /**
  * Regions store data for a certain region of a table.  It stores all columns
  * for each row. A given table consists of one or more Regions.
@@ -642,7 +645,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   // flushPerChanges is to prevent too many changes in memstore
   private long flushPerChanges;
   private long blockingMemStoreSize;
-  final long threadWakeFrequency;
   // Used to guard closes
   final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
@@ -757,7 +759,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
     }
     this.rsServices = rsServices;
-    this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
     setHTableSpecificConf();
     this.scannerReadPoints = new ConcurrentHashMap<>();
 
@@ -1271,14 +1272,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return writeRequestsCount.sum();
   }
 
-  /**
-   * Update the write request count for this region
-   * @param i increment
-   */
-  public void updateWriteRequestsCount(long i) {
-    writeRequestsCount.add(i);
-  }
-
   @Override
   public long getMemStoreSize() {
     return memstoreDataSize.get();
@@ -2218,7 +2211,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
   }
 
-  public static interface FlushResult {
+  public interface FlushResult {
     enum Result {
       FLUSHED_NO_COMPACTION_NEEDED,
       FLUSHED_COMPACTION_NEEDED,
@@ -3025,105 +3018,355 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Struct-like class that tracks the progress of a batch operation, accumulating status codes
-   * and tracking the index at which processing is proceeding. These batch operations may get
-   * split into mini-batches for processing.
+   * Class that tracks the progress of a batch operations, accumulating status codes and tracking
+   * the index at which processing is proceeding. These batch operations may get split into
+   * mini-batches for processing.
    */
   private abstract static class BatchOperation<T> {
-    T[] operations;
-    int nextIndexToProcess = 0;
-    OperationStatus[] retCodeDetails;
-    WALEdit[] walEditsFromCoprocessors;
+    protected final T[] operations;
+    protected final OperationStatus[] retCodeDetails;
+    protected final WALEdit[] walEditsFromCoprocessors;
     // reference family cell maps directly so coprocessors can mutate them if desired
-    Map<byte[], List<Cell>>[] familyCellMaps;
-    ObservedExceptionsInBatch observedExceptions;
-    Durability durability;  //Durability of the batch (highest durability of all operations)
+    protected final Map<byte[], List<Cell>>[] familyCellMaps;
+
+    protected final HRegion region;
+    protected int nextIndexToProcess = 0;
+    protected final ObservedExceptionsInBatch observedExceptions;
+    //Durability of the batch (highest durability of all operations)
+    protected Durability durability;
 
-    public BatchOperation(T[] operations) {
+    public BatchOperation(final HRegion region, T[] operations) {
       this.operations = operations;
       this.retCodeDetails = new OperationStatus[operations.length];
-      this.walEditsFromCoprocessors = new WALEdit[operations.length];
       Arrays.fill(this.retCodeDetails, OperationStatus.NOT_RUN);
+      this.walEditsFromCoprocessors = new WALEdit[operations.length];
       familyCellMaps = new Map[operations.length];
+
+      this.region = region;
       observedExceptions = new ObservedExceptionsInBatch();
       durability = Durability.USE_DEFAULT;
     }
 
+    /**
+     * Visitor interface for batch operations
+     */
+    @FunctionalInterface
+    public interface Visitor {
+      /**
+       * @param index operation index
+       * @return If true continue visiting remaining entries, break otherwise
+       */
+      boolean visit(int index) throws IOException;
+    }
+
+    /**
+     * Helper method for visiting pending/ all batch operations
+     */
+    public void visitBatchOperations(boolean pendingOnly, int lastIndexExclusive, Visitor visitor)
+        throws IOException {
+      assert lastIndexExclusive <= this.size();
+      for (int i = nextIndexToProcess; i < lastIndexExclusive; i++) {
+        if (!pendingOnly || isOperationPending(i)) {
+          if (!visitor.visit(i)) {
+            break;
+          }
+        }
+      }
+    }
+
     public abstract Mutation getMutation(int index);
     public abstract long getNonceGroup(int index);
     public abstract long getNonce(int index);
-    /** This method is potentially expensive and should only be used for non-replay CP path. */
+    /** This method is potentially expensive and useful mostly for non-replay CP path. */
     public abstract Mutation[] getMutationsForCoprocs();
     public abstract boolean isInReplay();
-    public abstract long getReplaySequenceId();
+    public abstract long getOrigLogSeqNum();
+    public abstract void startRegionOperation() throws IOException;
+    public abstract void closeRegionOperation() throws IOException;
+
+    /**
+     * Validates each mutation and prepares a batch for write. If necessary (non-replay case), runs
+     * CP prePut()/ preDelete() hooks for all mutations in a batch. This is intended to operate on
+     * entire batch and will be called from outside of class to check and prepare batch. This can
+     * be implemented by calling helper method {@link #checkAndPrepareMutation(int, long)} in a
+     * 'for' loop over mutations.
+     */
+    public abstract void checkAndPrepare() throws IOException;
+
+    /**
+     * Implement any Put request specific check and prepare logic here. Please refer to
+     * {@link #checkAndPrepareMutation(Mutation, long)} for how its used.
+     */
+    protected abstract void checkAndPreparePut(final Put p) throws IOException;
+
+    /**
+     *  If necessary, calls preBatchMutate() CP hook for a mini-batch and updates metrics, cell
+     *  count, tags and timestamp for all cells of all operations in a mini-batch.
+     */
+    public abstract void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation>
+        miniBatchOp, long timestamp, final List<RowLock> acquiredRowLocks) throws IOException;
+
+    /**
+     * Write mini-batch operations to MemStore
+     */
+    public abstract WriteEntry writeMiniBatchOperationsToMemStore(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+        throws IOException;
+
+    protected void writeMiniBatchOperationsToMemStore(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final long writeNumber)
+        throws IOException {
+      MemStoreSizing memStoreAccounting = new MemStoreSizing();
+      visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
+        // We need to update the sequence id for following reasons.
+        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
+        // 2) If no WAL, FSWALEntry won't be used
+        // we use durability of the original mutation for the mutation passed by CP.
+        if (isInReplay() || getMutation(index).getDurability() == Durability.SKIP_WAL) {
+          region.updateSequenceId(familyCellMaps[index].values(), writeNumber);
+        }
+        applyFamilyMapToMemStore(familyCellMaps[index], memStoreAccounting);
+        return true;
+      });
+      // update memStore size
+      region.addAndGetMemStoreSize(memStoreAccounting);
+    }
 
     public boolean isDone() {
       return nextIndexToProcess == operations.length;
     }
 
+    public int size() {
+      return operations.length;
+    }
+
+    public boolean isOperationPending(int index) {
+      return retCodeDetails[index].getOperationStatusCode() == OperationStatusCode.NOT_RUN;
+    }
+
+    public List<UUID> getClusterIds() {
+      assert size() != 0;
+      return getMutation(0).getClusterIds();
+    }
+
     /**
-     * Validates each mutation and prepares a batch for write.
+     * Helper method that checks and prepares only one mutation. This can be used to implement
+     * {@link #checkAndPrepare()} for entire Batch.
      * NOTE: As CP prePut()/ preDelete() hooks may modify mutations, this method should be called
-     * after prePut()/ preDelete() CP hooks are run for all mutations in a batch.
+     * after prePut()/ preDelete() CP hooks are run for the mutation
      */
-    public void checkAndPrepare(final HRegion region) throws IOException {
-      long now = EnvironmentEdgeManager.currentTime();
-      for (int i = 0 ; i < operations.length; i++) {
-        // Skip anything that "ran" already
-        if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.NOT_RUN) {
-          Mutation mutation = getMutation(i);
+    protected void checkAndPrepareMutation(Mutation mutation, final long timestamp)
+        throws IOException {
+      region.checkRow(mutation.getRow(), "batchMutate");
+      if (mutation instanceof Put) {
+        // Check the families in the put. If bad, skip this one.
+        checkAndPreparePut((Put) mutation);
+        region.checkTimestamps(mutation.getFamilyCellMap(), timestamp);
+      } else {
+        region.prepareDelete((Delete) mutation);
+      }
+    }
 
-          try {
-            region.checkAndPrepareMutation(mutation, isInReplay(), now);
-
-            // store the family map reference to allow for mutations
-            familyCellMaps[i] = mutation.getFamilyCellMap();
-            // store durability for the batch (highest durability of all operations in the batch)
-            Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
-            if (tmpDur.ordinal() > durability.ordinal()) {
-              durability = tmpDur;
-            }
-          } catch (NoSuchColumnFamilyException nscf) {
-            final String msg = "No such column family in batch mutation. ";
-            if (observedExceptions.hasSeenNoSuchFamily()) {
-              LOG.warn(msg + nscf.getMessage());
-            } else {
-              LOG.warn(msg, nscf);
-              observedExceptions.sawNoSuchFamily();
-            }
-            retCodeDetails[i] = new OperationStatus(
-                OperationStatusCode.BAD_FAMILY, nscf.getMessage());
-          } catch (FailedSanityCheckException fsce) {
-            final String msg = "Batch Mutation did not pass sanity check. ";
-            if (observedExceptions.hasSeenFailedSanityCheck()) {
-              LOG.warn(msg + fsce.getMessage());
-            } else {
-              LOG.warn(msg, fsce);
-              observedExceptions.sawFailedSanityCheck();
-            }
-            retCodeDetails[i] = new OperationStatus(
-                OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
-          } catch (WrongRegionException we) {
-            final String msg = "Batch mutation had a row that does not belong to this region. ";
-            if (observedExceptions.hasSeenWrongRegion()) {
-              LOG.warn(msg + we.getMessage());
-            } else {
-              LOG.warn(msg, we);
-              observedExceptions.sawWrongRegion();
+    protected void checkAndPrepareMutation(int index, long timestamp) throws IOException {
+      Mutation mutation = getMutation(index);
+      try {
+        this.checkAndPrepareMutation(mutation, timestamp);
+
+        // store the family map reference to allow for mutations
+        familyCellMaps[index] = mutation.getFamilyCellMap();
+        // store durability for the batch (highest durability of all operations in the batch)
+        Durability tmpDur = region.getEffectiveDurability(mutation.getDurability());
+        if (tmpDur.ordinal() > durability.ordinal()) {
+          durability = tmpDur;
+        }
+      } catch (NoSuchColumnFamilyException nscf) {
+        final String msg = "No such column family in batch mutation. ";
+        if (observedExceptions.hasSeenNoSuchFamily()) {
+          LOG.warn(msg + nscf.getMessage());
+        } else {
+          LOG.warn(msg, nscf);
+          observedExceptions.sawNoSuchFamily();
+        }
+        retCodeDetails[index] = new OperationStatus(
+            OperationStatusCode.BAD_FAMILY, nscf.getMessage());
+      } catch (FailedSanityCheckException fsce) {
+        final String msg = "Batch Mutation did not pass sanity check. ";
+        if (observedExceptions.hasSeenFailedSanityCheck()) {
+          LOG.warn(msg + fsce.getMessage());
+        } else {
+          LOG.warn(msg, fsce);
+          observedExceptions.sawFailedSanityCheck();
+        }
+        retCodeDetails[index] = new OperationStatus(
+            OperationStatusCode.SANITY_CHECK_FAILURE, fsce.getMessage());
+      } catch (WrongRegionException we) {
+        final String msg = "Batch mutation had a row that does not belong to this region. ";
+        if (observedExceptions.hasSeenWrongRegion()) {
+          LOG.warn(msg + we.getMessage());
+        } else {
+          LOG.warn(msg, we);
+          observedExceptions.sawWrongRegion();
+        }
+        retCodeDetails[index] = new OperationStatus(
+            OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
+      }
+    }
+
+    /**
+     * Creates Mini-batch of all operations [nextIndexToProcess, lastIndexExclusive) for which
+     * a row lock can be acquired. All mutations with locked rows are considered to be
+     * In-progress operations and hence the name {@link MiniBatchOperationInProgress}. Mini batch
+     * is window over {@link BatchOperation} and contains contiguous pending operations.
+     *
+     * @param acquiredRowLocks keeps track of rowLocks acquired.
+     */
+    public MiniBatchOperationInProgress<Mutation> lockRowsAndBuildMiniBatch(
+        List<RowLock> acquiredRowLocks) throws IOException {
+      int readyToWriteCount = 0;
+      int lastIndexExclusive = 0;
+      for (; lastIndexExclusive < size(); lastIndexExclusive++) {
+        if (!isOperationPending(lastIndexExclusive)) {
+          continue;
+        }
+        Mutation mutation = getMutation(lastIndexExclusive);
+        // If we haven't got any rows in our batch, we should block to get the next one.
+        RowLock rowLock = null;
+        try {
+          rowLock = region.getRowLockInternal(mutation.getRow(), true);
+        } catch (TimeoutIOException e) {
+          // We will retry when other exceptions, but we should stop if we timeout .
+          throw e;
+        } catch (IOException ioe) {
+          LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
+        }
+        if (rowLock == null) {
+          // We failed to grab another lock
+          break; // Stop acquiring more rows for this batch
+        } else {
+          acquiredRowLocks.add(rowLock);
+        }
+        readyToWriteCount++;
+      }
+      return createMiniBatch(lastIndexExclusive, readyToWriteCount);
+    }
+
+    protected MiniBatchOperationInProgress<Mutation> createMiniBatch(final int lastIndexExclusive,
+        final int readyToWriteCount) {
+      return new MiniBatchOperationInProgress<>(getMutationsForCoprocs(), retCodeDetails,
+          walEditsFromCoprocessors, nextIndexToProcess, lastIndexExclusive, readyToWriteCount);
+    }
+
+    /**
+     * Builds separate WALEdit per nonce by applying input mutations. If WALEdits from CP are
+     * present, they are merged to result WALEdit.
+     */
+    public List<Pair<NonceKey, WALEdit>> buildWALEdits(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      List<Pair<NonceKey, WALEdit>> walEdits = new ArrayList<>();
+
+      visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), new Visitor() {
+        private Pair<NonceKey, WALEdit> curWALEditForNonce;
+        @Override
+        public boolean visit(int index) throws IOException {
+          Mutation m = getMutation(index);
+          // we use durability of the original mutation for the mutation passed by CP.
+          if (region.getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
+            region.recordMutationWithoutWal(m.getFamilyCellMap());
+            return true;
+          }
+
+          // the batch may contain multiple nonce keys (replay case). If so, write WALEdit for each.
+          // Given how nonce keys are originally written, these should be contiguous.
+          // They don't have to be, it will still work, just write more WALEdits than needed.
+          long nonceGroup = getNonceGroup(index);
+          long nonce = getNonce(index);
+          if (curWALEditForNonce == null ||
+              curWALEditForNonce.getFirst().getNonceGroup() != nonceGroup ||
+              curWALEditForNonce.getFirst().getNonce() != nonce) {
+            curWALEditForNonce = new Pair<>(new NonceKey(nonceGroup, nonce),
+                new WALEdit(miniBatchOp.getCellCount(), isInReplay()));
+            walEdits.add(curWALEditForNonce);
+          }
+          WALEdit walEdit = curWALEditForNonce.getSecond();
+
+          // Add WAL edits by CP
+          WALEdit fromCP = walEditsFromCoprocessors[index];
+          if (fromCP != null) {
+            for (Cell cell : fromCP.getCells()) {
+              walEdit.add(cell);
             }
-            retCodeDetails[i] = new OperationStatus(
-                OperationStatusCode.SANITY_CHECK_FAILURE, we.getMessage());
           }
+          addFamilyMapToWALEdit(familyCellMaps[index], walEdit);
+
+          return true;
+        }
+      });
+      return walEdits;
+    }
+
+    /**
+     * This method completes mini-batch operations by calling postBatchMutate() CP hook (if
+     * required) and completing mvcc.
+     */
+    public void completeMiniBatchOperations(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+        throws IOException {
+      if (writeEntry != null) {
+        region.mvcc.completeAndWait(writeEntry);
+      }
+    }
+
+    public void doPostOpCleanupForMiniBatch(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WALEdit walEdit,
+        boolean success) throws IOException {}
+
+    /**
+     * Atomically apply the given map of family->edits to the memstore.
+     * This handles the consistency control on its own, but the caller
+     * should already have locked updatesLock.readLock(). This also does
+     * <b>not</b> check the families for validity.
+     *
+     * @param familyMap Map of Cells by family
+     */
+    protected void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
+        MemStoreSizing memstoreAccounting) throws IOException {
+      for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
+        byte[] family = e.getKey();
+        List<Cell> cells = e.getValue();
+        assert cells instanceof RandomAccess;
+        region.applyToMemStore(region.getStore(family), cells, false, memstoreAccounting);
+      }
+    }
+
+    /**
+     * Append the given map of family->edits to a WALEdit data structure.
+     * This does not write to the WAL itself.
+     * @param familyMap map of family->edits
+     * @param walEdit the destination entry to append into
+     */
+    private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
+        WALEdit walEdit) {
+      for (List<Cell> edits : familyMap.values()) {
+        assert edits instanceof RandomAccess;
+        int listSize = edits.size();
+        for (int i=0; i < listSize; i++) {
+          Cell cell = edits.get(i);
+          walEdit.add(cell);
         }
       }
     }
   }
 
+  /**
+   * Batch of mutation operations. Base class is shared with {@link ReplayBatchOperation} as most
+   * of the logic is same.
+   */
   private static class MutationBatchOperation extends BatchOperation<Mutation> {
     private long nonceGroup;
     private long nonce;
-    public MutationBatchOperation(Mutation[] operations, long nonceGroup, long nonce) {
-      super(operations);
+    public MutationBatchOperation(final HRegion region, Mutation[] operations, long nonceGroup,
+        long nonce) {
+      super(region, operations);
       this.nonceGroup = nonceGroup;
       this.nonce = nonce;
     }
@@ -3154,16 +3397,279 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     @Override
-    public long getReplaySequenceId() {
-      return 0;
+    public long getOrigLogSeqNum() {
+      return WALKey.NO_SEQUENCE_ID;
+    }
+
+    @Override
+    public void startRegionOperation() throws IOException {
+      region.startRegionOperation(Operation.BATCH_MUTATE);
+    }
+
+    @Override
+    public void closeRegionOperation() throws IOException {
+      region.closeRegionOperation(Operation.BATCH_MUTATE);
+    }
+
+    @Override
+    public void checkAndPreparePut(Put p) throws IOException {
+      region.checkFamilies(p.getFamilyCellMap().keySet());
+    }
+
+    @Override
+    public void checkAndPrepare() throws IOException {
+      final int[] metrics = {0, 0}; // index 0: puts, index 1: deletes
+      visitBatchOperations(true, this.size(), new Visitor() {
+        private long now = EnvironmentEdgeManager.currentTime();
+        private WALEdit walEdit;
+        @Override
+        public boolean visit(int index) throws IOException {
+          // Run coprocessor pre hook outside of locks to avoid deadlock
+          if (region.coprocessorHost != null) {
+            if (walEdit == null) {
+              walEdit = new WALEdit();
+            }
+            callPreMutateCPHook(index, walEdit, metrics);
+            if (!walEdit.isEmpty()) {
+              walEditsFromCoprocessors[index] = walEdit;
+              walEdit = null;
+            }
+          }
+          if (isOperationPending(index)) {
+            // TODO: Currently validation is done with current time before acquiring locks and
+            // updates are done with different timestamps after acquiring locks. This behavior is
+            // inherited from the code prior to this change. Can this be changed?
+            checkAndPrepareMutation(index, now);
+          }
+          return true;
+        }
+      });
+
+      // FIXME: we may update metrics twice! here for all operations bypassed by CP and later in
+      // normal processing.
+      // Update metrics in same way as it is done when we go the normal processing route (we now
+      // update general metrics though a Coprocessor did the work).
+      if (region.metricsRegion != null) {
+        if (metrics[0] > 0) {
+          // There were some Puts in the batch.
+          region.metricsRegion.updatePut();
+        }
+        if (metrics[1] > 0) {
+          // There were some Deletes in the batch.
+          region.metricsRegion.updateDelete();
+        }
+      }
+    }
+
+    @Override
+    public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+        long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
+      byte[] byteTS = Bytes.toBytes(timestamp);
+      visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
+        Mutation mutation = getMutation(index);
+        if (mutation instanceof Put) {
+          region.updateCellTimestamps(familyCellMaps[index].values(), byteTS);
+          miniBatchOp.incrementNumOfPuts();
+        } else {
+          region.prepareDeleteTimestamps(mutation, familyCellMaps[index], byteTS);
+          miniBatchOp.incrementNumOfDeletes();
+        }
+        region.rewriteCellTags(familyCellMaps[index], mutation);
+
+        // update cell count
+        if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
+          for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
+            miniBatchOp.addCellCount(cells.size());
+          }
+        }
+
+        WALEdit fromCP = walEditsFromCoprocessors[index];
+        if (fromCP != null) {
+          miniBatchOp.addCellCount(fromCP.size());
+        }
+        return true;
+      });
+
+      if (region.coprocessorHost != null) {
+        // calling the pre CP hook for batch mutation
+        region.coprocessorHost.preBatchMutate(miniBatchOp);
+        checkAndMergeCPMutations(miniBatchOp, acquiredRowLocks, timestamp);
+      }
+    }
+
+    @Override
+    public List<Pair<NonceKey, WALEdit>> buildWALEdits(final MiniBatchOperationInProgress<Mutation>
+        miniBatchOp) throws IOException {
+      List<Pair<NonceKey, WALEdit>> walEdits = super.buildWALEdits(miniBatchOp);
+      // for MutationBatchOperation, more than one nonce is not allowed
+      if (walEdits.size() > 1) {
+        throw new IOException("Found multiple nonce keys per batch!");
+      }
+      return walEdits;
+    }
+
+    @Override
+    public WriteEntry writeMiniBatchOperationsToMemStore(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, @Nullable WriteEntry writeEntry)
+        throws IOException {
+      if (writeEntry == null) {
+        writeEntry = region.mvcc.begin();
+      }
+      super.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry.getWriteNumber());
+      return writeEntry;
+    }
+
+    @Override
+    public void completeMiniBatchOperations(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+        throws IOException {
+      // TODO: can it be done after completing mvcc?
+      // calling the post CP hook for batch mutation
+      if (region.coprocessorHost != null) {
+        region.coprocessorHost.postBatchMutate(miniBatchOp);
+      }
+      super.completeMiniBatchOperations(miniBatchOp, writeEntry);
+    }
+
+    @Override
+    public void doPostOpCleanupForMiniBatch(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+        final WALEdit walEdit, boolean success) throws IOException {
+      if (miniBatchOp != null) {
+        // synced so that the coprocessor contract is adhered to.
+        if (region.coprocessorHost != null) {
+          visitBatchOperations(false, miniBatchOp.getLastIndexExclusive(), (int i) -> {
+            // only for successful puts
+            if (retCodeDetails[i].getOperationStatusCode() == OperationStatusCode.SUCCESS) {
+              Mutation m = getMutation(i);
+              if (m instanceof Put) {
+                region.coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
+              } else {
+                region.coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
+              }
+            }
+            return true;
+          });
+        }
+
+        // See if the column families were consistent through the whole thing.
+        // if they were then keep them. If they were not then pass a null.
+        // null will be treated as unknown.
+        // Total time taken might be involving Puts and Deletes.
+        // Split the time for puts and deletes based on the total number of Puts and Deletes.
+        if (region.metricsRegion != null) {
+          if (miniBatchOp.getNumOfPuts() > 0) {
+            // There were some Puts in the batch.
+            region.metricsRegion.updatePut();
+          }
+          if (miniBatchOp.getNumOfDeletes() > 0) {
+            // There were some Deletes in the batch.
+            region.metricsRegion.updateDelete();
+          }
+        }
+      }
+
+      if (region.coprocessorHost != null) {
+        // call the coprocessor hook to do any finalization steps after the put is done
+        region.coprocessorHost.postBatchMutateIndispensably(
+            miniBatchOp != null ? miniBatchOp : createMiniBatch(size(), 0), success);
+      }
+    }
+
+    /**
+     * Runs prePut/ preDelete coprocessor hook for input mutation in a batch
+     * @param metrics Array of 2 ints. index 0: count of puts and index 1: count of deletes
+     */
+    private void callPreMutateCPHook(int index, final WALEdit walEdit, final int[] metrics)
+        throws IOException {
+      Mutation m = getMutation(index);
+      if (m instanceof Put) {
+        if (region.coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
+          // pre hook says skip this Put
+          // mark as success and skip in doMiniBatchMutation
+          metrics[0]++;
+          retCodeDetails[index] = OperationStatus.SUCCESS;
+        }
+      } else if (m instanceof Delete) {
+        Delete curDel = (Delete) m;
+        if (curDel.getFamilyCellMap().isEmpty()) {
+          // handle deleting a row case
+          // TODO: prepareDelete() has been called twice, before and after preDelete() CP hook.
+          // Can this be avoided?
+          region.prepareDelete(curDel);
+        }
+        if (region.coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
+          // pre hook says skip this Delete
+          // mark as success and skip in doMiniBatchMutation
+          metrics[1]++;
+          retCodeDetails[index] = OperationStatus.SUCCESS;
+        }
+      } else {
+        // In case of passing Append mutations along with the Puts and Deletes in batchMutate
+        // mark the operation return code as failure so that it will not be considered in
+        // the doMiniBatchMutation
+        retCodeDetails[index] = new OperationStatus(OperationStatusCode.FAILURE,
+            "Put/Delete mutations only supported in batchMutate() now");
+      }
+    }
+
+    private void checkAndMergeCPMutations(final MiniBatchOperationInProgress<Mutation> miniBatchOp,
+        final List<RowLock> acquiredRowLocks, final long timestamp) throws IOException {
+      visitBatchOperations(true, nextIndexToProcess + miniBatchOp.size(), (int i) -> {
+        // we pass (i - firstIndex) below since the call expects a relative index
+        Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - nextIndexToProcess);
+        if (cpMutations == null) {
+          return true;
+        }
+        // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
+        Mutation mutation = getMutation(i);
+        for (Mutation cpMutation : cpMutations) {
+          this.checkAndPrepareMutation(cpMutation, timestamp);
+
+          // Acquire row locks. If not, the whole batch will fail.
+          acquiredRowLocks.add(region.getRowLockInternal(cpMutation.getRow(), true));
+
+          // Returned mutations from coprocessor correspond to the Mutation at index i. We can
+          // directly add the cells from those mutations to the familyMaps of this mutation.
+          Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
+          // will get added to the memStore later
+          mergeFamilyMaps(familyCellMaps[i], cpFamilyMap);
+
+          // The durability of returned mutation is replaced by the corresponding mutation.
+          // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
+          // cells of returned mutation.
+          if (region.getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
+            for (List<Cell> cells : cpFamilyMap.values()) {
+              miniBatchOp.addCellCount(cells.size());
+            }
+          }
+        }
+        return true;
+      });
+    }
+
+    private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap,
+        Map<byte[], List<Cell>> toBeMerged) {
+      for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) {
+        List<Cell> cells = familyMap.get(entry.getKey());
+        if (cells == null) {
+          familyMap.put(entry.getKey(), entry.getValue());
+        } else {
+          cells.addAll(entry.getValue());
+        }
+      }
     }
   }
 
+  /**
+   * Batch of mutations for replay. Base class is shared with {@link MutationBatchOperation} as most
+   * of the logic is same.
+   */
   private static class ReplayBatchOperation extends BatchOperation<MutationReplay> {
-    private long replaySeqId = 0;
-    public ReplayBatchOperation(MutationReplay[] operations, long seqId) {
-      super(operations);
-      this.replaySeqId = seqId;
+    private long origLogSeqNum = 0;
+    public ReplayBatchOperation(final HRegion region, MutationReplay[] operations,
+        long origLogSeqNum) {
+      super(region, operations);
+      this.origLogSeqNum = origLogSeqNum;
     }
 
     @Override
@@ -3183,8 +3689,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     @Override
     public Mutation[] getMutationsForCoprocs() {
-      assert false;
-      throw new RuntimeException("Should not be called for replay batch");
+      return null;
     }
 
     @Override
@@ -3193,8 +3698,80 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
 
     @Override
-    public long getReplaySequenceId() {
-      return this.replaySeqId;
+    public long getOrigLogSeqNum() {
+      return this.origLogSeqNum;
+    }
+
+    @Override
+    public void startRegionOperation() throws IOException {
+      region.startRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+    }
+
+    @Override
+    public void closeRegionOperation() throws IOException {
+      region.closeRegionOperation(Operation.REPLAY_BATCH_MUTATE);
+    }
+
+    /**
+     * During replay, there could exist column families which are removed between region server
+     * failure and replay
+     */
+    @Override
+    protected void checkAndPreparePut(Put p) throws IOException {
+      Map<byte[], List<Cell>> familyCellMap = p.getFamilyCellMap();
+      List<byte[]> nonExistentList = null;
+      for (byte[] family : familyCellMap.keySet()) {
+        if (!region.htableDescriptor.hasColumnFamily(family)) {
+          if (nonExistentList == null) {
+            nonExistentList = new ArrayList<>();
+          }
+          nonExistentList.add(family);
+        }
+      }
+      if (nonExistentList != null) {
+        for (byte[] family : nonExistentList) {
+          // Perhaps schema was changed between crash and replay
+          LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
+          familyCellMap.remove(family);
+        }
+      }
+    }
+
+    @Override
+    public void checkAndPrepare() throws IOException {
+      long now = EnvironmentEdgeManager.currentTime();
+      visitBatchOperations(true, this.size(), (int index) -> {
+        checkAndPrepareMutation(index, now);
+        return true;
+      });
+    }
+
+    @Override
+    public void prepareMiniBatchOperations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+        long timestamp, final List<RowLock> acquiredRowLocks) throws IOException {
+      visitBatchOperations(true, miniBatchOp.getLastIndexExclusive(), (int index) -> {
+        // update cell count
+        for (List<Cell> cells : getMutation(index).getFamilyCellMap().values()) {
+          miniBatchOp.addCellCount(cells.size());
+        }
+        return true;
+      });
+    }
+
+    @Override
+    public WriteEntry writeMiniBatchOperationsToMemStore(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+        throws IOException {
+      super.writeMiniBatchOperationsToMemStore(miniBatchOp, getOrigLogSeqNum());
+      return writeEntry;
+    }
+
+    @Override
+    public void completeMiniBatchOperations(
+        final MiniBatchOperationInProgress<Mutation> miniBatchOp, final WriteEntry writeEntry)
+        throws IOException {
+      super.completeMiniBatchOperations(miniBatchOp, writeEntry);
+      region.mvcc.advanceTo(getOrigLogSeqNum());
     }
   }
 
@@ -3204,7 +3781,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     //  * batchMutate with single mutation - put/delete, separate or from checkAndMutate.
     //  * coprocessor calls (see ex. BulkDeleteEndpoint).
     // So nonces are not really ever used by HBase. They could be by coprocs, and checkAnd...
-    return batchMutate(new MutationBatchOperation(mutations, nonceGroup, nonce));
+    return batchMutate(new MutationBatchOperation(this, mutations, nonceGroup, nonce));
   }
 
   public OperationStatus[] batchMutate(Mutation[] mutations) throws IOException {
@@ -3232,14 +3809,24 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       return statuses;
     }
-    return batchMutate(new ReplayBatchOperation(mutations, replaySeqId));
+    return batchMutate(new ReplayBatchOperation(this, mutations, replaySeqId));
   }
 
   /**
    * Perform a batch of mutations.
+   *
    * It supports only Put and Delete mutations and will ignore other types passed. Operations in
    * a batch are stored with highest durability specified of for all operations in a batch,
    * except for {@link Durability#SKIP_WAL}.
+   *
+   * <p>This function is called from {@link #batchReplay(MutationReplay[], long)} with
+   * {@link ReplayBatchOperation} instance and {@link #batchMutate(Mutation[], long, long)} with
+   * {@link MutationBatchOperation} instance as an argument. As the processing of replay batch
+   * and mutation batch is very similar, lot of code is shared by providing generic methods in
+   * base class {@link BatchOperation}. The logic for this method and
+   * {@link #doMiniBatchMutate(BatchOperation)} is implemented using methods in base class which
+   * are overridden by derived classes to implement special behavior.
+   *
    * @param batchOp contains the list of mutations
    * @return an array of OperationStatus which internally contains the
    *         OperationStatusCode and the exceptionMessage if any.
@@ -3247,8 +3834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   OperationStatus[] batchMutate(BatchOperation<?> batchOp) throws IOException {
     boolean initialized = false;
-    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
-    startRegionOperation(op);
+    batchOp.startRegionOperation();
     try {
       while (!batchOp.isDone()) {
         if (!batchOp.isInReplay()) {
@@ -3257,12 +3843,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         checkResources();
 
         if (!initialized) {
-          this.writeRequestsCount.add(batchOp.operations.length);
-          if (!batchOp.isInReplay()) {
-            callPreMutateCPHooks(batchOp);
-          }
-          // validate and prepare batch for write, after CP pre-hooks
-          batchOp.checkAndPrepare(this);
+          this.writeRequestsCount.add(batchOp.size());
+          // validate and prepare batch for write, for MutationBatchOperation it also calls CP
+          // prePut()/ preDelete() hooks
+          batchOp.checkAndPrepare();
           initialized = true;
         }
         doMiniBatchMutate(batchOp);
@@ -3270,296 +3854,75 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         requestFlushIfNeeded(newSize);
       }
     } finally {
-      closeRegionOperation(op);
+      batchOp.closeRegionOperation();
     }
     return batchOp.retCodeDetails;
   }
 
   /**
-   * Runs prePut/ preDelete coprocessor hooks for each mutation in a batch.
-   * @param batchOp
-   */
-  private void callPreMutateCPHooks(BatchOperation<?> batchOp) throws IOException {
-    if (coprocessorHost == null) {
-      return;
-    }
-    /* Run coprocessor pre hook outside of locks to avoid deadlock */
-    WALEdit walEdit = new WALEdit();
-    int noOfPuts = 0;
-    int noOfDeletes = 0;
-    for (int i = 0 ; i < batchOp.operations.length; i++) {
-      Mutation m = batchOp.getMutation(i);
-      if (m instanceof Put) {
-        if (coprocessorHost.prePut((Put) m, walEdit, m.getDurability())) {
-          // pre hook says skip this Put
-          // mark as success and skip in doMiniBatchMutation
-          noOfPuts++;
-          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
-        }
-      } else if (m instanceof Delete) {
-        Delete curDel = (Delete) m;
-        if (curDel.getFamilyCellMap().isEmpty()) {
-          // handle deleting a row case
-          prepareDelete(curDel);
-        }
-        if (coprocessorHost.preDelete(curDel, walEdit, m.getDurability())) {
-          // pre hook says skip this Delete
-          // mark as success and skip in doMiniBatchMutation
-          noOfDeletes++;
-          batchOp.retCodeDetails[i] = OperationStatus.SUCCESS;
-        }
-      } else {
-        // In case of passing Append mutations along with the Puts and Deletes in batchMutate
-        // mark the operation return code as failure so that it will not be considered in
-        // the doMiniBatchMutation
-        batchOp.retCodeDetails[i] = new OperationStatus(OperationStatusCode.FAILURE,
-            "Put/Delete mutations only supported in batchMutate() now");
-      }
-      if (!walEdit.isEmpty()) {
-        batchOp.walEditsFromCoprocessors[i] = walEdit;
-        walEdit = new WALEdit();
-      }
-    }
-    // Update metrics in same way as it is done when we go the normal processing route (we now
-    // update general metrics though a Coprocessor did the work).
-    if (noOfPuts > 0) {
-      // There were some Puts in the batch.
-      if (this.metricsRegion != null) {
-        this.metricsRegion.updatePut();
-      }
-    }
-    if (noOfDeletes > 0) {
-      // There were some Deletes in the batch.
-      if (this.metricsRegion != null) {
-        this.metricsRegion.updateDelete();
-      }
-    }
-  }
-
-  /**
    * Called to do a piece of the batch that came in to {@link #batchMutate(Mutation[], long, long)}
    * In here we also handle replay of edits on region recover.
    * @return Change in size brought about by applying <code>batchOp</code>
    */
-  // TODO: This needs a rewrite. Doesn't have to be this long. St.Ack 20160120
   private void doMiniBatchMutate(BatchOperation<?> batchOp) throws IOException {
-    boolean replay = batchOp.isInReplay();
-    long currentNonceGroup = HConstants.NO_NONCE;
-    long currentNonce = HConstants.NO_NONCE;
-    WALEdit walEdit = null;
-    boolean locked = false;
-    // We try to set up a batch in the range [firstIndex,lastIndexExclusive)
-    int firstIndex = batchOp.nextIndexToProcess;
-    int lastIndexExclusive = firstIndex;
     boolean success = false;
-    int noOfPuts = 0;
-    int noOfDeletes = 0;
+    WALEdit walEdit = null;
     WriteEntry writeEntry = null;
-    int cellCount = 0;
+    boolean locked = false;
+    // We try to set up a batch in the range [batchOp.nextIndexToProcess,lastIndexExclusive)
+    MiniBatchOperationInProgress<Mutation> miniBatchOp = null;
     /** Keep track of the locks we hold so we can release them in finally clause */
-    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.operations.length);
-    MemStoreSizing memStoreAccounting = new MemStoreSizing();
+    List<RowLock> acquiredRowLocks = Lists.newArrayListWithCapacity(batchOp.size());
     try {
-      // STEP 1. Try to acquire as many locks as we can, and ensure we acquire at least one.
-      int numReadyToWrite = 0;
-      for (; lastIndexExclusive < batchOp.operations.length; lastIndexExclusive++) {
-        if (batchOp.retCodeDetails[lastIndexExclusive].getOperationStatusCode()
-            != OperationStatusCode.NOT_RUN) {
-          continue;
-        }
-        Mutation mutation = batchOp.getMutation(lastIndexExclusive);
-        // If we haven't got any rows in our batch, we should block to get the next one.
-        RowLock rowLock = null;
-        try {
-          rowLock = getRowLockInternal(mutation.getRow(), true);
-        } catch (TimeoutIOException e) {
-          // We will retry when other exceptions, but we should stop if we timeout .
-          throw e;
-        } catch (IOException ioe) {
-          LOG.warn("Failed getting lock, row=" + Bytes.toStringBinary(mutation.getRow()), ioe);
-        }
-        if (rowLock == null) {
-          // We failed to grab another lock
-          break; // Stop acquiring more rows for this batch
-        } else {
-          acquiredRowLocks.add(rowLock);
-        }
-
-        numReadyToWrite++;
-        if (replay || getEffectiveDurability(mutation.getDurability()) != Durability.SKIP_WAL) {
-          for (List<Cell> cells : mutation.getFamilyCellMap().values()) {
-            cellCount += cells.size();
-          }
-        }
-      }
+      // STEP 1. Try to acquire as many locks as we can and build mini-batch of operations with
+      // locked rows
+      miniBatchOp = batchOp.lockRowsAndBuildMiniBatch(acquiredRowLocks);
 
       // We've now grabbed as many mutations off the list as we can
-      // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
-      if (numReadyToWrite <= 0) {
+      // Ensure we acquire at least one.
+      if (miniBatchOp.getReadyToWriteCount() <= 0) {
+        // Nothing to put/delete -- an exception in the above such as NoSuchColumnFamily?
         return;
       }
 
-      // STEP 2. Update any LATEST_TIMESTAMP timestamps
+      lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount());
+      locked = true;
+
+      // STEP 2. Update mini batch of all operations in progress with  LATEST_TIMESTAMP timestamp
       // We should record the timestamp only after we have acquired the rowLock,
       // otherwise, newer puts/deletes are not guaranteed to have a newer timestamp
       long now = EnvironmentEdgeManager.currentTime();
-      if (!replay) {
-        byte[] byteNow = Bytes.toBytes(now);
-        for (int i = firstIndex; i < lastIndexExclusive; i++) {
-          // skip invalid
-          if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
-            // lastIndexExclusive was incremented above.
-            continue;
-          }
-
-          Mutation mutation = batchOp.getMutation(i);
-          if (mutation instanceof Put) {
-            updateCellTimestamps(batchOp.familyCellMaps[i].values(), byteNow);
-            noOfPuts++;
-          } else {
-            prepareDeleteTimestamps(mutation, batchOp.familyCellMaps[i], byteNow);
-            noOfDeletes++;
-          }
-          rewriteCellTags(batchOp.familyCellMaps[i], mutation);
-          WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
-          if (fromCP != null) {
-            cellCount += fromCP.size();
-          }
-        }
-      }
-      lock(this.updatesLock.readLock(), numReadyToWrite);
-      locked = true;
-
-      // calling the pre CP hook for batch mutation
-      if (!replay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Mutation> miniBatchOp =
-          new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
-          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
-        coprocessorHost.preBatchMutate(miniBatchOp);
-        for (int i = firstIndex; i < lastIndexExclusive; i++) {
-          if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
-            // lastIndexExclusive was incremented above.
-            continue;
-          }
-          // we pass (i - firstIndex) below since the call expects a relative index
-          Mutation[] cpMutations = miniBatchOp.getOperationsFromCoprocessors(i - firstIndex);
-          if (cpMutations == null) {
-            continue;
-          }
-          Mutation mutation = batchOp.getMutation(i);
-          boolean skipWal = getEffectiveDurability(mutation.getDurability()) == Durability.SKIP_WAL;
-          // Else Coprocessor added more Mutations corresponding to the Mutation at this index.
-          for (int j = 0; j < cpMutations.length; j++) {
-            Mutation cpMutation = cpMutations[j];
-            checkAndPrepareMutation(cpMutation, replay, now);
-
-            // Acquire row locks. If not, the whole batch will fail.
-            acquiredRowLocks.add(getRowLockInternal(cpMutation.getRow(), true));
-
-            // Returned mutations from coprocessor correspond to the Mutation at index i. We can
-            // directly add the cells from those mutations to the familyMaps of this mutation.
-            Map<byte[], List<Cell>> cpFamilyMap = cpMutation.getFamilyCellMap();
-            // will get added to the memStore later
-            mergeFamilyMaps(batchOp.familyCellMaps[i], cpFamilyMap);
-
-            // The durability of returned mutation is replaced by the corresponding mutation.
-            // If the corresponding mutation contains the SKIP_WAL, we shouldn't count the
-            // cells of returned mutation.
-            if (!skipWal) {
-              for (List<Cell> cells : cpFamilyMap.values()) {
-                cellCount += cells.size();
-              }
-            }
-          }
-        }
-      }
+      batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks);
 
       // STEP 3. Build WAL edit
-      walEdit = new WALEdit(cellCount, replay);
-      for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        // Skip puts that were determined to be invalid during preprocessing
-        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
-          continue;
-        }
+      List<Pair<NonceKey, WALEdit>> walEdits = batchOp.buildWALEdits(miniBatchOp);
 
-        Mutation m = batchOp.getMutation(i);
-        // we use durability of the original mutation for the mutation passed by CP.
-        if (getEffectiveDurability(m.getDurability()) == Durability.SKIP_WAL) {
-          recordMutationWithoutWal(m.getFamilyCellMap());
-          continue;
-        }
+      // STEP 4. Append the WALEdits to WAL and sync.
+      for(Iterator<Pair<NonceKey, WALEdit>> it = walEdits.iterator(); it.hasNext();) {
+        Pair<NonceKey, WALEdit> nonceKeyWALEditPair = it.next();
+        walEdit = nonceKeyWALEditPair.getSecond();
+        NonceKey nonceKey = nonceKeyWALEditPair.getFirst();
 
-        long nonceGroup = batchOp.getNonceGroup(i);
-        long nonce = batchOp.getNonce(i);
-        // In replay, the batch may contain multiple nonces. If so, write WALEdit for each.
-        // Given how nonces are originally written, these should be contiguous.
-        // They don't have to be, it will still work, just write more WALEdits than needed.
-        if (nonceGroup != currentNonceGroup || nonce != currentNonce) {
-          // Write what we have so far for nonces out to WAL
-          appendCurrentNonces(m, replay, walEdit, now, currentNonceGroup, currentNonce);
-          walEdit = new WALEdit(cellCount, replay);
-          currentNonceGroup = nonceGroup;
-          currentNonce = nonce;
+        if (walEdit != null && !walEdit.isEmpty()) {
+          writeEntry = doWALAppend(walEdit, batchOp.durability, batchOp.getClusterIds(), now,
+              nonceKey.getNonceGroup(), nonceKey.getNonce(), batchOp.getOrigLogSeqNum());
         }
 
-        // Add WAL edits by CP
-        WALEdit fromCP = batchOp.walEditsFromCoprocessors[i];
-        if (fromCP != null) {
-          for (Cell cell : fromCP.getCells()) {
-            walEdit.add(cell);
-          }
+        // STEP 6. Complete mvcc for all but last writeEntry (for replay case)
+        if (it.hasNext() && writeEntry != null) {
+          mvcc.complete(writeEntry);
+          writeEntry = null;
         }
-        addFamilyMapToWALEdit(batchOp.familyCellMaps[i], walEdit);
-      }
-
-      // STEP 4. Append the final edit to WAL and sync.
-      Mutation mutation = batchOp.getMutation(firstIndex);
-      writeEntry = doWALAppend(walEdit, batchOp.durability, mutation.getClusterIds(), now,
-          currentNonceGroup, currentNonce,
-          replay ? batchOp.getReplaySequenceId() : WALKey.NO_SEQUENCE_ID);
-      if (!replay && writeEntry == null) {
-        // If no writeEntry, then not in replay and skipping WAL or some such. Begin an MVCC
-        // transaction to get sequence id.
-        writeEntry = mvcc.begin();
       }
 
       // STEP 5. Write back to memStore
-      for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        if (batchOp.retCodeDetails[i].getOperationStatusCode() != OperationStatusCode.NOT_RUN) {
-          continue;
-        }
-        // We need to update the sequence id for following reasons.
-        // 1) If the op is in replay mode, FSWALEntry#stampRegionSequenceId won't stamp sequence id.
-        // 2) If no WAL, FSWALEntry won't be used
-        // we use durability of the original mutation for the mutation passed by CP.
-        boolean updateSeqId = replay || batchOp.getMutation(i).getDurability() == Durability.SKIP_WAL;
-        if (updateSeqId) {
-          this.updateSequenceId(batchOp.familyCellMaps[i].values(),
-            replay? batchOp.getReplaySequenceId(): writeEntry.getWriteNumber());
-        }
-        applyFamilyMapToMemStore(batchOp.familyCellMaps[i], memStoreAccounting);
-      }
-
-      // update memstore size
-      this.addAndGetMemStoreSize(memStoreAccounting);
-
-      // calling the post CP hook for batch mutation
-      if (!replay && coprocessorHost != null) {
-        MiniBatchOperationInProgress<Mutation> miniBatchOp =
-          new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
-          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
-        coprocessorHost.postBatchMutate(miniBatchOp);
-      }
-
-      // STEP 6. Complete mvcc.
-      if (writeEntry != null) {
-        mvcc.completeAndWait(writeEntry);
-        writeEntry = null;
-      }
-      if (replay) {
-        this.mvcc.advanceTo(batchOp.getReplaySequenceId());
-      }
+      // NOTE: writeEntry can be null here
+      writeEntry = batchOp.writeMiniBatchOperationsToMemStore(miniBatchOp, writeEntry);
 
+      // STEP 6. Complete MiniBatchOperations: If required calls postBatchMutate() CP hook and
+      // complete mvcc for last writeEntry
+      batchOp.completeMiniBatchOperations(miniBatchOp, writeEntry);
+      writeEntry = null;
       success = true;
     } finally {
       // Call complete rather than completeAndWait because we probably had error if walKey != null
@@ -3570,122 +3933,18 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       }
       releaseRowLocks(acquiredRowLocks);
 
-      for (int i = firstIndex; i < lastIndexExclusive; i++) {
-        if (batchOp.retCodeDetails[i] == OperationStatus.NOT_RUN) {
-          batchOp.retCodeDetails[i] = success? OperationStatus.SUCCESS : OperationStatus.FAILURE;
-        }
-      }
-
-      // synced so that the coprocessor contract is adhered to.
-      if (!replay && coprocessorHost != null) {
-        for (int i = firstIndex; i < lastIndexExclusive; i++) {
-          // only for successful puts
-          if (batchOp.retCodeDetails[i].getOperationStatusCode()
-              != OperationStatusCode.SUCCESS) {
-            continue;
-          }
-          Mutation m = batchOp.getMutation(i);
-          if (m instanceof Put) {
-            coprocessorHost.postPut((Put) m, walEdit, m.getDurability());
-          } else {
-            coprocessorHost.postDelete((Delete) m, walEdit, m.getDurability());
-          }
-        }
-      }
-
-      // See if the column families were consistent through the whole thing.
-      // if they were then keep them. If they were not then pass a null.
-      // null will be treated as unknown.
-      // Total time taken might be involving Puts and Deletes.
-      // Split the time for puts and deletes based on the total number of Puts and Deletes.
-
-      if (noOfPuts > 0) {
-        // There were some Puts in the batch.
-        if (this.metricsRegion != null) {
-          this.metricsRegion.updatePut();
-        }
-      }
-      if (noOfDeletes > 0) {
-        // There were some Deletes in the batch.
-        if (this.metricsRegion != null) {
-          this.metricsRegion.updateDelete();
-        }
-      }
-
-      if (coprocessorHost != null && !batchOp.isInReplay()) {
-        // call the coprocessor hook to do any finalization steps
-        // after the put is done
-        MiniBatchOperationInProgress<Mutation> miniBatchOp =
-          new MiniBatchOperationInProgress<>(batchOp.getMutationsForCoprocs(),
-          batchOp.retCodeDetails, batchOp.walEditsFromCoprocessors, firstIndex, lastIndexExclusive);
-        coprocessorHost.postBatchMutateIndispensably(miniBatchOp, success);
-      }
-
-      batchOp.nextIndexToProcess = lastIndexExclusive;
-    }
-  }
-
-  private void mergeFamilyMaps(Map<byte[], List<Cell>> familyMap,
-      Map<byte[], List<Cell>> toBeMerged) {
-    for (Map.Entry<byte[], List<Cell>> entry : toBeMerged.entrySet()) {
-      List<Cell> cells = familyMap.get(entry.getKey());
-      if (cells == null) {
-        familyMap.put(entry.getKey(), entry.getValue());
-      } else {
-        cells.addAll(entry.getValue());
-      }
-    }
-  }
-
-  private void appendCurrentNonces(final Mutation mutation, final boolean replay,
-      final WALEdit walEdit, final long now, final long currentNonceGroup, final long currentNonce)
-  throws IOException {
-    if (walEdit.isEmpty()) return;
-    if (!replay) throw new IOException("Multiple nonces per batch and not in replay");
-    WALKey walKey = new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
-        this.htableDescriptor.getTableName(), now, mutation.getClusterIds(),
-        currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
-    this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
-    // Complete the mvcc transaction started down in append else it will block others
-    this.mvcc.complete(walKey.getWriteEntry());
-  }
+      final int finalLastIndexExclusive =
+          miniBatchOp != null ? miniBatchOp.getLastIndexExclusive() : batchOp.size();
+      final boolean finalSuccess = success;
+      batchOp.visitBatchOperations(true, finalLastIndexExclusive, (int i) -> {
+        batchOp.retCodeDetails[i] =
+            finalSuccess ? OperationStatus.SUCCESS : OperationStatus.FAILURE;
+        return true;
+      });
 
-  private void checkAndPrepareMutation(Mutation mutation, boolean replay, final long now)
-      throws IOException {
-    checkRow(mutation.getRow(), "doMiniBatchMutation");
-    if (mutation instanceof Put) {
-      // Check the families in the put. If bad, skip this one.
-      if (replay) {
-        removeNonExistentColumnFamilyForReplay(mutation.getFamilyCellMap());
-      } else {
-        checkFamilies(mutation.getFamilyCellMap().keySet());
-      }
-      checkTimestamps(mutation.getFamilyCellMap(), now);
-    } else {
-      prepareDelete((Delete)mutation);
-    }
-  }
+      batchOp.doPostOpCleanupForMiniBatch(miniBatchOp, walEdit, finalSuccess);
 
-  /**
-   * During replay, there could exist column families which are removed between region server
-   * failure and replay
-   */
-  private void removeNonExistentColumnFamilyForReplay(final Map<byte[], List<Cell>> familyMap) {
-    List<byte[]> nonExistentList = null;
-    for (byte[] family : familyMap.keySet()) {
-      if (!this.htableDescriptor.hasColumnFamily(family)) {
-        if (nonExistentList == null) {
-          nonExistentList = new ArrayList<>();
-        }
-        nonExistentList.add(family);
-      }
-    }
-    if (nonExistentList != null) {
-      for (byte[] family : nonExistentList) {
-        // Perhaps schema was changed between crash and replay
-        LOG.info("No family for " + Bytes.toString(family) + " omit from reply.");
-        familyMap.remove(family);
-      }
+      batchOp.nextIndexToProcess = finalLastIndexExclusive;
     }
   }
 
@@ -4003,25 +4262,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     doBatchMutate(p);
   }
 
-  /*
-   * Atomically apply the given map of family->edits to the memstore.
-   * This handles the consistency control on its own, but the caller
-   * should already have locked updatesLock.readLock(). This also does
-   * <b>not</b> check the families for validity.
-   *
-   * @param familyMap Map of Cells by family
-   * @param memstoreSize
-   */
-  private void applyFamilyMapToMemStore(Map<byte[], List<Cell>> familyMap,
-      MemStoreSizing memstoreAccounting) throws IOException {
-    for (Map.Entry<byte[], List<Cell>> e : familyMap.entrySet()) {
-      byte[] family = e.getKey();
-      List<Cell> cells = e.getValue();
-      assert cells instanceof RandomAccess;
-      applyToMemStore(getStore(family), cells, false, memstoreAccounting);
-    }
-  }
-
   /**
    * @param delta If we are doing delta changes -- e.g. increment/append -- then this flag will be
    *          set; when set we will run operations that make sense in the increment/append scenario
@@ -4090,24 +4330,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     }
   }
 
-  /**
-   * Append the given map of family->edits to a WALEdit data structure.
-   * This does not write to the WAL itself.
-   * @param familyMap map of family->edits
-   * @param walEdit the destination entry to append into
-   */
-  private void addFamilyMapToWALEdit(Map<byte[], List<Cell>> familyMap,
-      WALEdit walEdit) {
-    for (List<Cell> edits : familyMap.values()) {
-      assert edits instanceof RandomAccess;
-      int listSize = edits.size();
-      for (int i=0; i < listSize; i++) {
-        Cell cell = edits.get(i);
-        walEdit.add(cell);
-      }
-    }
-  }
-
   /*
    * @param size
    * @return True if size is over the flush threshold
@@ -5471,8 +5693,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   private void releaseRowLocks(List<RowLock> rowLocks) {
     if (rowLocks != null) {
-      for (int i = 0; i < rowLocks.size(); i++) {
-        rowLocks.get(i).release();
+      for (RowLock rowLock : rowLocks) {
+        rowLock.release();
       }
       rowLocks.clear();
     }
@@ -5626,7 +5848,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * bulkLoadHFile() to perform any necessary
    * pre/post processing of a given bulkload call
    */
-  public static interface BulkLoadListener {
+  public interface BulkLoadListener {
     /**
      * Called before an HFile is actually loaded
      * @param family family being loaded to
@@ -6081,7 +6303,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // to handle scan or get operation.
         moreValues = nextInternal(outResults, scannerContext);
       } else {
-        List<Cell> tmpList = new ArrayList<Cell>();
+        List<Cell> tmpList = new ArrayList<>();
         moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
@@ -6861,46 +7083,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /**
-   * Create a daughter region from given a temp directory with the region data.
-   * @param hri Spec. for daughter region to open.
-   * @throws IOException
-   */
-  public HRegion createDaughterRegionFromSplits(final RegionInfo hri) throws IOException {
-    // Move the files from the temporary .splits to the final /table/region directory
-    fs.commitDaughterRegion(hri);
-
-    // Create the daughter HRegion instance
-    HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(), fs.getFileSystem(),
-        this.getBaseConf(), hri, this.getTableDescriptor(), rsServices);
-    r.readRequestsCount.add(this.getReadRequestsCount() / 2);
-    r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount() / 2);
-    r.writeRequestsCount.add(this.getWriteRequestsCount() / 2);
-    return r;
-  }
-
-  /**
-   * Create a merged region given a temp directory with the region data.
-   * @param region_b another merging region
-   * @return merged HRegion
-   * @throws IOException
-   */
-  HRegion createMergedRegionFromMerges(final RegionInfo mergedRegionInfo,
-      final HRegion region_b) throws IOException {
-    HRegion r = HRegion.newHRegion(this.fs.getTableDir(), this.getWAL(),
-        fs.getFileSystem(), this.getBaseConf(), mergedRegionInfo,
-        this.getTableDescriptor(), this.rsServices);
-    r.readRequestsCount.add(this.getReadRequestsCount()
-        + region_b.getReadRequestsCount());
-    r.filteredReadRequestsCount.add(this.getFilteredReadRequestsCount()
-      + region_b.getFilteredReadRequestsCount());
-    r.writeRequestsCount.add(this.getWriteRequestsCount()
-
-        + region_b.getWriteRequestsCount());
-    this.fs.commitMergedRegion(mergedRegionInfo);
-    return r;
-  }
-
-  /**
    * Computes the Path of the HRegion
    *
    * @param tabledir qualified path for table
@@ -6960,7 +7142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return Result.create(results, get.isCheckExistenceOnly() ? !results.isEmpty() : null, stale);
   }
 
-  void prepareGet(final Get get) throws IOException, NoSuchColumnFamilyException {
+  void prepareGet(final Get get) throws IOException {
     checkRow(get.getRow(), "Get");
     // Verify families are all valid
     if (get.hasFamilies()) {
@@ -7396,32 +7578,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return writeEntry associated with this append
    */
   private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List<UUID> clusterIds,
-      long now, long nonceGroup, long nonce, long replaySeqId) throws IOException {
-    Preconditions.checkArgument(!walEdit.isReplay() || replaySeqId != WALKey.NO_SEQUENCE_ID,
+      long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException {
+    Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(),
+        "WALEdit is null or empty!");
+    Preconditions.checkArgument(!walEdit.isReplay() || origLogSeqNum != WALKey.NO_SEQUENCE_ID,
         "Invalid replay sequence Id for replay WALEdit!");
+    // Using default cluster id, as this can only happen in the originating cluster.
+    // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
+    // here instead of WALKey directly to support legacy coprocessors.
+    WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+        this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
+        nonce, mvcc) :
+        new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
+            this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
+            nonceGroup, nonce, mvcc, this.getReplicationScope());
+    if (walEdit.isReplay()) {
+      walKey.setOrigLogSeqNum(origLogSeqNum);
+    }
     WriteEntry writeEntry = null;
-    if (!walEdit.isEmpty()) {
-      // Using default cluster id, as this can only happen in the originating cluster.
-      // A slave cluster receives the final value (not the delta) as a Put. We use HLogKey
-      // here instead of WALKey directly to support legacy coprocessors.
-      WALKey walKey = walEdit.isReplay() ? new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
-          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds, nonceGroup,
-          nonce, mvcc) :
-          new WALKey(this.getRegionInfo().getEncodedNameAsBytes(),
-          this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now, clusterIds,
-          nonceGroup, nonce, mvcc, this.getReplicationScope());
-      if (walEdit.isReplay()) {
-        walKey.setOrigLogSeqNum(replaySeqId);
+    try {
+      long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
+      // Call sync on our edit.
+      if (txid != 0) {
+        sync(txid, durability);
       }
-      try {
-        long txid = this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
-        // Call sync on our edit.
-        if (txid != 0) sync(txid, durability);
-        writeEntry = walKey.getWriteEntry();
-      } catch (IOException ioe) {
-        if (walKey != null) mvcc.complete(walKey.getWriteEntry());
-        throw ioe;
+      writeEntry = walKey.getWriteEntry();
+    } catch (IOException ioe) {
+      if (walKey != null) {
+        mvcc.complete(walKey.getWriteEntry());
       }
+      throw ioe;
     }
     return writeEntry;
   }
@@ -7637,7 +7823,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return Sorted list of <code>cells</code> using <code>comparator</code>
    */
   private static List<Cell> sort(List<Cell> cells, final CellComparator comparator) {
-    Collections.sort(cells, comparator);
+    cells.sort(comparator);
     return cells;
   }
 
@@ -7658,7 +7844,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       ClassSize.OBJECT +
       ClassSize.ARRAY +
       51 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
-      (15 * Bytes.SIZEOF_LONG) +
+      (14 * Bytes.SIZEOF_LONG) +
       6 * Bytes.SIZEOF_BOOLEAN);
 
   // woefully out of date - currently missing:

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
index 56a97e0..ba847a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MiniBatchOperationInProgress.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -40,13 +41,22 @@ public class MiniBatchOperationInProgress<T> {
   private final int firstIndex;
   private final int lastIndexExclusive;
 
+  private int readyToWriteCount = 0;
+  private int cellCount = 0;
+  private int numOfPuts = 0;
+  private int numOfDeletes = 0;
+
+
   public MiniBatchOperationInProgress(T[] operations, OperationStatus[] retCodeDetails,
-      WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive) {
+      WALEdit[] walEditsFromCoprocessors, int firstIndex, int lastIndexExclusive,
+      int readyToWriteCount) {
+    Preconditions.checkArgument(readyToWriteCount <= (lastIndexExclusive - firstIndex));
     this.operations = operations;
     this.retCodeDetails = retCodeDetails;
     this.walEditsFromCoprocessors = walEditsFromCoprocessors;
     this.firstIndex = firstIndex;
     this.lastIndexExclusive = lastIndexExclusive;
+    this.readyToWriteCount = readyToWriteCount;
   }
 
   /**
@@ -127,4 +137,36 @@ public class MiniBatchOperationInProgress<T> {
     return operationsFromCoprocessors == null ? null :
         operationsFromCoprocessors[getAbsoluteIndex(index)];
   }
+
+  public int getReadyToWriteCount() {
+    return readyToWriteCount;
+  }
+
+  public int getLastIndexExclusive() {
+    return lastIndexExclusive;
+  }
+
+  public int getCellCount() {
+    return cellCount;
+  }
+
+  public void addCellCount(int cellCount) {
+    this.cellCount += cellCount;
+  }
+
+  public int getNumOfPuts() {
+    return numOfPuts;
+  }
+
+  public void incrementNumOfPuts() {
+    this.numOfPuts += 1;
+  }
+
+  public int getNumOfDeletes() {
+    return numOfDeletes;
+  }
+
+  public void incrementNumOfDeletes() {
+    this.numOfDeletes += 1;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
index c8e9940..0d9d149 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiRowMutationProcessor.java
@@ -137,7 +137,7 @@ MultiRowMutationProcessorResponse> {
     if (coprocessorHost != null) {
       miniBatch = new MiniBatchOperationInProgress<>(
           mutations.toArray(new Mutation[mutations.size()]), opStatus, walEditsFromCP, 0,
-          mutations.size());
+          mutations.size(), mutations.size());
       coprocessorHost.preBatchMutate(miniBatch);
     }
     // Apply edits to a single WALEdit

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
index 4a59379..c3472b5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMiniBatchOperationInProgress.java
@@ -44,7 +44,7 @@ public class TestMiniBatchOperationInProgress {
     }
     MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatch = 
       new MiniBatchOperationInProgress<>(operations, retCodeDetails,
-      walEditsFromCoprocessors, 0, 5);
+      walEditsFromCoprocessors, 0, 5, 5);
 
     assertEquals(5, miniBatch.size());
     assertTrue(Bytes.equals(Bytes.toBytes(0), miniBatch.getOperation(0).getFirst().getRow()));
@@ -69,7 +69,7 @@ public class TestMiniBatchOperationInProgress {
     }
 
     miniBatch = new MiniBatchOperationInProgress<>(operations,
-        retCodeDetails, walEditsFromCoprocessors, 7, 10);
+        retCodeDetails, walEditsFromCoprocessors, 7, 10, 3);
     try {
       miniBatch.setWalEdit(-1, new WALEdit());
       fail("Should throw Exception while accessing out of range");

http://git-wip-us.apache.org/repos/asf/hbase/blob/4eae5a29/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
index eb336fe..2fd3909 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java
@@ -867,7 +867,7 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
       @Override
       public Object run() throws Exception {
         ACCESS_CONTROLLER.preBatchMutate(ObserverContextImpl.createAndPrepare(RCP_ENV),
-          new MiniBatchOperationInProgress<>(null, null, null, 0, 0));
+          new MiniBatchOperationInProgress<>(null, null, null, 0, 0, 0));
         return null;
       }
     }, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);