You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2020/10/13 03:30:50 UTC

[phoenix] branch master updated: PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b2ff49  PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes
4b2ff49 is described below

commit 4b2ff49fde9b52741e6ec4b7cb1e90724f4ed63a
Author: Kadir Ozdemir <ko...@salesforce.com>
AuthorDate: Sun Sep 27 15:59:20 2020 -0700

    PHOENIX-6160 Simplifying concurrent mutation handling for global Indexes
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 329 +++++++++++----------
 1 file changed, 169 insertions(+), 160 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 0457481..4d804e0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -76,7 +76,6 @@ import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException
 import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
 import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
-import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
@@ -98,6 +97,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.applyNew;
 import static org.apache.phoenix.coprocessor.IndexRebuildRegionScanner.prepareIndexMutationsForRebuild;
@@ -125,24 +126,29 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
    * Class to represent pending data table rows
    */
   private static class PendingRow {
-      private boolean concurrent = false;
-      private long count = 1;
+      private int count;
+      private BatchMutateContext lastContext;
 
-      public void add() {
+      PendingRow(BatchMutateContext context) {
+          count = 1;
+          lastContext = context;
+      }
+
+      public void add(BatchMutateContext context) {
           count++;
-          concurrent = true;
+          lastContext = context;
       }
 
       public void remove() {
           count--;
       }
 
-      public long getCount() {
+      public int getCount() {
           return count;
       }
 
-      public boolean isConcurrent() {
-          return concurrent;
+      public BatchMutateContext getLastContext() {
+          return lastContext;
       }
   }
 
@@ -161,9 +167,25 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
       failDataTableUpdatesForTesting = fail;
   }
 
+  public enum BatchMutatePhase {
+      PRE, POST, FAILED
+  }
+
   // Hack to get around not being able to save any state between
   // coprocessor calls. TODO: remove after HBASE-18127 when available
+
+  /**
+   * The concurrent batch of mutations is a set such that every pair of batches in this set has at least one common row.
+   * Since a BatchMutateContext object of a batch is modified only after the row locks for all the rows that are mutated
+   * by this batch are acquired, there can be only one thread can acquire the locks for its batch and safely access
+   * all the batch contexts in the set of concurrent batches. Because of this, we do not read atomic variables or
+   * additional locks to serialize the access to the BatchMutateContext objects.
+   */
+
   private static class BatchMutateContext {
+      private BatchMutatePhase currentPhase = BatchMutatePhase.PRE;
+      // The max of reference counts on the pending rows of this batch at the time this batch arrives
+      private int maxPendingRowCount = 0;
       private final int clientVersion;
       // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
       // the verified column) will have the value false ("unverified") on these mutations
@@ -175,15 +197,42 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
       private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
-      private boolean rebuild;
       // The current and next states of the data rows corresponding to the pending mutations
       private HashMap<ImmutableBytesPtr, Pair<Put, Put>> dataRowStates;
-      // Data table pending mutations
+      // The previous concurrent batch contexts
+      private HashMap<ImmutableBytesPtr, BatchMutateContext> lastConcurrentBatchContext = null;
+      // The latches of the threads waiting for this batch to complete
+      private List<CountDownLatch> waitList = null;
       private Map<ImmutableBytesPtr, MultiMutation> multiMutationMap;
 
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
+
+      public BatchMutatePhase getCurrentPhase() {
+          return currentPhase;
+      }
+
+      public Put getNextDataRowState(ImmutableBytesPtr rowKeyPtr) {
+          Pair<Put, Put> rowState = dataRowStates.get(rowKeyPtr);
+          if (rowState != null) {
+              return rowState.getSecond();
+          }
+          return null;
+      }
+
+      public CountDownLatch getCountDownLatch() {
+          if (waitList == null) {
+              waitList = new ArrayList<>();
+          }
+          CountDownLatch countDownLatch = new CountDownLatch(1);
+          waitList.add(countDownLatch);
+          return countDownLatch;
+      }
+
+      public int getMaxPendingRowCount() {
+          return maxPendingRowCount;
+      }
   }
 
   private ThreadLocal<BatchMutateContext> batchMutateContext =
@@ -223,9 +272,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
   private long slowIndexPrepareThreshold;
   private long slowPreIncrementThreshold;
   private int rowLockWaitDuration;
+  private int concurrentMutationWaitDuration;
   private String dataTableName;
 
   private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+  private static final int DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS = 100;
 
   @Override
   public Optional<RegionObserver> getRegionObserver() {
@@ -259,8 +310,9 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
 
         this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
                 DEFAULT_ROWLOCK_WAIT_DURATION);
-        this.lockManager = new LockManager();
-
+          this.lockManager = new LockManager();
+          this.concurrentMutationWaitDuration = env.getConfiguration().getInt("phoenix.index.concurrent.wait.duration.ms",
+                  DEFAULT_CONCURRENT_MUTATION_WAIT_DURATION_IN_MS);
           // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
           this.metricSource = MetricsIndexerSourceFactory.getInstance().getIndexerSource();
           setSlowThresholds(e.getConfiguration());
@@ -409,15 +461,22 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
       }
   }
 
+    private void unlockRows(BatchMutateContext context) throws IOException {
+        for (RowLock rowLock : context.rowLocks) {
+            rowLock.release();
+        }
+        context.rowLocks.clear();
+    }
+
   private void populatePendingRows(BatchMutateContext context) {
       for (RowLock rowLock : context.rowLocks) {
           ImmutableBytesPtr rowKey = rowLock.getRowKey();
           PendingRow pendingRow = pendingRows.get(rowKey);
           if (pendingRow == null) {
-              pendingRows.put(rowKey, new PendingRow());
+              pendingRows.put(rowKey, new PendingRow(context));
           } else {
               // m is a mutation on a row that has already a pending mutation in progress from another batch
-              pendingRow.add();
+              pendingRow.add(context);
           }
       }
   }
@@ -608,15 +667,34 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
     private void getCurrentRowStates(ObserverContext<RegionCoprocessorEnvironment> c,
                                      BatchMutateContext context) throws IOException {
         Set<KeyRange> keys = new HashSet<KeyRange>(context.rowsToLock.size());
+        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
         for (ImmutableBytesPtr rowKeyPtr : context.rowsToLock) {
-            keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+            PendingRow pendingRow = pendingRows.get(rowKeyPtr);
+            if (pendingRow != null && pendingRow.getLastContext().getCurrentPhase() == BatchMutatePhase.PRE) {
+                if (context.lastConcurrentBatchContext == null) {
+                    context.lastConcurrentBatchContext = new HashMap<>();
+                }
+                context.lastConcurrentBatchContext.put(rowKeyPtr, pendingRow.getLastContext());
+                if (context.maxPendingRowCount < pendingRow.getCount()) {
+                    context.maxPendingRowCount = pendingRow.getCount();
+                }
+                Put put = pendingRow.getLastContext().getNextDataRowState(rowKeyPtr);
+                if (put != null) {
+                    context.dataRowStates.put(rowKeyPtr, new Pair<Put, Put>(put, new Put(put)));
+                }
+            }
+            else {
+                keys.add(PVarbinary.INSTANCE.getKeyRange(rowKeyPtr.get()));
+            }
+        }
+        if (keys.isEmpty()) {
+            return;
         }
         Scan scan = new Scan();
         ScanRanges scanRanges = ScanRanges.createPointLookup(new ArrayList<KeyRange>(keys));
         scanRanges.initializeScan(scan);
         SkipScanFilter skipScanFilter = scanRanges.getSkipScanFilter();
         scan.setFilter(skipScanFilter);
-        context.dataRowStates = new HashMap<ImmutableBytesPtr, Pair<Put, Put>>(context.rowsToLock.size());
         try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
             boolean more = true;
             while(more) {
@@ -765,43 +843,11 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
         return (PhoenixIndexMetaData)indexMetaData;
     }
 
-    /**
-     * IndexMaintainer.getIndexedColumns() returns the data column references for indexed columns. The data columns are
-     * grouped into three classes, pk columns (data table pk columns), the indexed columns (the columns for which
-     * we want to have indexing; they form the prefix for the primary key for the index table (after salt and tenant id))
-     * and covered columns. The purpose of this method is to find out if all the indexed columns are included in the
-     * pending data table mutation pointed by multiMutation.
-     */
-    private boolean hasAllIndexedColumns(IndexMaintainer indexMaintainer, MultiMutation multiMutation) {
-        Map<byte[], List<Cell>> familyMap = multiMutation.getFamilyCellMap();
-        for (ColumnReference columnReference : indexMaintainer.getIndexedColumns()) {
-            byte[] family = columnReference.getFamily();
-            List<Cell> cellList = familyMap.get(family);
-            if (cellList == null) {
-                return false;
-            }
-            boolean has = false;
-            for (Cell cell : cellList) {
-                if (CellUtil.matchingColumn(cell, family, columnReference.getQualifier())) {
-                    has = true;
-                    break;
-                }
-            }
-            if (!has) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private void preparePostIndexMutations(TableName table,
-                                           BatchMutateContext context,
+    private void preparePostIndexMutations(BatchMutateContext context,
                                            long now,
-                                           PhoenixIndexMetaData indexMetaData)
-            throws Throwable {
+                                           PhoenixIndexMetaData indexMetaData) {
         context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
         List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-        // Check if we need to skip post index update for any of the rows
         for (IndexMaintainer indexMaintainer : maintainers) {
             byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
             byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
@@ -809,90 +855,21 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
                     new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
             List<Pair<Mutation, byte[]>> updates = context.indexUpdates.get(hTableInterfaceReference);
             for (Pair<Mutation, byte[]> update : updates) {
-                // Are there concurrent updates on the data table row? if so, skip post index updates
-                // and let read repair resolve conflicts
-                ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-                PendingRow pendingRow = pendingRows.get(rowKey);
-                if (!pendingRow.isConcurrent()) {
-                    Mutation m = update.getFirst();
-                    if (m instanceof Put) {
-                        Put verifiedPut = new Put(m.getRow());
-                        // Set the status of the index row to "verified"
-                        verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
-                        context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
-                    } else {
-                        context.postIndexUpdates.put(hTableInterfaceReference, m);
-                    }
+                Mutation m = update.getFirst();
+                if (m instanceof Put) {
+                    Put verifiedPut = new Put(m.getRow());
+                    // Set the status of the index row to "verified"
+                    verifiedPut.addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
+                    context.postIndexUpdates.put(hTableInterfaceReference, verifiedPut);
                 } else {
-                    if (!hasAllIndexedColumns(indexMaintainer, context.multiMutationMap.get(rowKey))) {
-                        // This batch needs to be retried since one of the concurrent mutations does not have the value
-                        // for an indexed column. Not including an index column may lead to incorrect index row key
-                        // generation for concurrent mutations since concurrent mutations are not serialized entirely
-                        // and do not see each other's effect on data table. Throwing an IOException will result in
-                        // retries of this batch. Before throwing exception, we need to remove reference counts and
-                        // locks for the rows of this batch
-                        removePendingRows(context);
-                        context.indexUpdates.clear();
-                        for (RowLock rowLock : context.rowLocks) {
-                            rowLock.release();
-                        }
-                        context.rowLocks.clear();
-                        throw new IOException("One of the concurrent mutations does not have all indexed columns. " +
-                                "The batch needs to be retried " + table.getNameAsString());
-                    }
+                    context.postIndexUpdates.put(hTableInterfaceReference, m);
                 }
             }
         }
-
-        // We are done with handling concurrent mutations. So we can remove the rows of this batch from
-        // the collection of pending rows
         removePendingRows(context);
         context.indexUpdates.clear();
     }
 
-    /**
-     * There are at most two rebuild mutation for every row, one put and one delete. They are listed in indexMutations
-     * next to each other such that put comes before delete by {@link IndexRebuildRegionScanner}. This method is called
-     * only for global indexes.
-     */
-    private void preBatchMutateWithExceptionsForRebuild(ObserverContext<RegionCoprocessorEnvironment> c,
-                                                        MiniBatchOperationInProgress<Mutation> miniBatchOp,
-                                                        BatchMutateContext context,
-                                                        IndexMaintainer indexMaintainer) throws Throwable {
-        Put put = null;
-        List <Mutation> indexMutations = new ArrayList<>();
-        for (int i = 0; i < miniBatchOp.size(); i++) {
-            if (miniBatchOp.getOperationStatus(i) == IGNORE) {
-                continue;
-            }
-            Mutation m = miniBatchOp.getOperation(i);
-            if (!this.builder.isEnabled(m)) {
-                continue;
-            }
-            if (m instanceof Put) {
-                if (put != null) {
-                    indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
-                }
-                put = (Put)m;
-            } else {
-                indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, (Delete)m));
-                put = null;
-            }
-            miniBatchOp.setOperationStatus(i, NOWRITE);
-        }
-        if (put != null) {
-            indexMutations.addAll(prepareIndexMutationsForRebuild(indexMaintainer, put, null));
-        }
-        HTableInterfaceReference hTableInterfaceReference =
-                new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
-        context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-        for (Mutation m : indexMutations) {
-            context.preIndexUpdates.put(hTableInterfaceReference, m);
-        }
-        doPre(c, context, miniBatchOp);
-        // For rebuild updates, no post index update is prepared. Just create an empty list.
-        context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
-    }
 
     private static boolean hasGlobalIndex(PhoenixIndexMetaData indexMetaData) {
         for (IndexMaintainer indexMaintainer : indexMetaData.getIndexMaintainers()) {
@@ -912,46 +889,75 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
         return false;
     }
 
+    private void waitForPreviousConcurrentBatch(TableName table, BatchMutateContext context)
+            throws Throwable {
+        boolean done;
+        BatchMutatePhase phase;
+        done = true;
+        for (BatchMutateContext lastContext : context.lastConcurrentBatchContext.values()) {
+            phase = lastContext.getCurrentPhase();
+            if (phase == BatchMutatePhase.FAILED) {
+                done = false;
+                break;
+            }
+            if (phase == BatchMutatePhase.PRE) {
+                CountDownLatch countDownLatch = lastContext.getCountDownLatch();
+                // Release the locks so that the previous concurrent mutation can go into the post phase
+                unlockRows(context);
+                // Wait for at most one concurrentMutationWaitDuration for each level in the dependency tree of batches.
+                // lastContext.getMaxPendingRowCount() is the depth of the subtree rooted at the batch pointed by lastContext
+                if (!countDownLatch.await((lastContext.getMaxPendingRowCount() + 1) * concurrentMutationWaitDuration,
+                        TimeUnit.MILLISECONDS)) {
+                    done = false;
+                    break;
+                }
+                // Acquire the locks again before letting the region proceed with data table updates
+                lockRows(context);
+            }
+        }
+        if (!done) {
+            // This batch needs to be retried since one of the previous concurrent batches has not completed yet.
+            // Throwing an IOException will result in retries of this batch. Before throwing exception,
+            // we need to remove reference counts and locks for the rows of this batch
+            removePendingRows(context);
+            context.indexUpdates.clear();
+            for (RowLock rowLock : context.rowLocks) {
+                rowLock.release();
+            }
+            context.rowLocks.clear();
+            throw new IOException("One of the previous concurrent mutations has not completed. " +
+                    "The batch needs to be retried " + table.getNameAsString());
+        }
+    }
+
     public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
                                              MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
         ignoreAtomicOperations(miniBatchOp);
         PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
         BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
         setBatchMutateContext(c, context);
-        Mutation firstMutation = miniBatchOp.getOperation(0);
-        ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
-        context.rebuild = replayWrite != null;
-        if (context.rebuild) {
-            preBatchMutateWithExceptionsForRebuild(c, miniBatchOp, context, indexMetaData.getIndexMaintainers().get(0));
-            return;
-        }
         /*
          * Exclusively lock all rows so we get a consistent read
          * while determining the index updates
          */
         populateRowsToLock(miniBatchOp, context);
+        // early exit if it turns out we don't have any update for indexes
+        if (context.rowsToLock.isEmpty()) {
+            return;
+        }
         lockRows(context);
-
         long now = EnvironmentEdgeManager.currentTimeMillis();
-        // Unless we're replaying edits to rebuild the index, we update the time stamp
-        // of the data table to prevent overlapping time stamps (which prevents index
+        // Update the timestamps of the data table mutations to prevent overlapping timestamps (which prevents index
         // inconsistencies as this case isn't handled correctly currently).
         setTimestamps(miniBatchOp, builder, now);
 
-        // Group all the updates for a single row into a single update to be processed (for local indexes, and global index retries)
-        Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
-        // early exit if it turns out we don't have any edits
-        if (mutations == null || mutations.isEmpty()) {
-            return;
-        }
-
         TableName table = c.getEnvironment().getRegion().getRegionInfo().getTable();
         if (hasGlobalIndex(indexMetaData)) {
+            // Prepare current and next data rows states for pending mutations (for global indexes)
+            prepareDataRowStates(c, miniBatchOp, context, now);
             // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
             // concurrent updates
             populatePendingRows(context);
-            // Prepare current and next data rows states for pending mutations (for global indexes)
-            prepareDataRowStates(c, miniBatchOp, context, now);
             // early exit if it turns out we don't have any edits
             long start = EnvironmentEdgeManager.currentTimeMillis();
             preparePreIndexMutations(context, now, indexMetaData);
@@ -965,17 +971,19 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
                 LOG.debug("slept 1ms for " + table.getNameAsString());
             }
             // Release the locks before making RPC calls for index updates
-            for (RowLock rowLock : context.rowLocks) {
-                rowLock.release();
-            }
+            unlockRows(context);
             // Do the first phase index updates
             doPre(c, context, miniBatchOp);
             // Acquire the locks again before letting the region proceed with data table updates
-            context.rowLocks.clear();
             lockRows(context);
-            preparePostIndexMutations(table, context, now, indexMetaData);
+            if (context.lastConcurrentBatchContext != null) {
+                waitForPreviousConcurrentBatch(table, context);
+            }
+            preparePostIndexMutations(context, now, indexMetaData);
         }
         if (hasLocalIndex(indexMetaData)) {
+            // Group all the updates for a single row into a single update to be processed (for local indexes)
+            Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, context);
             handleLocalIndexUpdates(table, miniBatchOp, mutations, indexMetaData);
         }
         if (failDataTableUpdatesForTesting) {
@@ -1006,9 +1014,17 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
           return;
       }
       try {
-          for (RowLock rowLock : context.rowLocks) {
-              rowLock.release();
+          if (success) {
+              context.currentPhase = BatchMutatePhase.POST;
+          } else {
+              context.currentPhase = BatchMutatePhase.FAILED;
           }
+          if (context.waitList != null) {
+              for (CountDownLatch countDownLatch : context.waitList) {
+                  countDownLatch.countDown();
+              }
+          }
+          unlockRows(context);
           this.builder.batchCompleted(miniBatchOp);
 
           if (success) { // The pre-index and data table updates are successful, and now, do post index updates
@@ -1077,9 +1093,6 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
 
   private void doPre(ObserverContext<RegionCoprocessorEnvironment> c, BatchMutateContext context,
                      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-      if (ignoreIndexRebuildForTesting && context.rebuild) {
-          return;
-      }
       long start = EnvironmentEdgeManager.currentTimeMillis();
       try {
           if (failPreIndexUpdatesForTesting) {
@@ -1097,11 +1110,7 @@ public class IndexRegionObserver implements RegionObserver, RegionCoprocessor {
           // postBatchMutateIndispensably() is called
           removePendingRows(context);
           context.rowLocks.clear();
-          if (context.rebuild) {
-              throw new IOException(String.format("%s for rebuild", e.getMessage()), e);
-          } else {
-              rethrowIndexingException(e);
-          }
+          rethrowIndexingException(e);
       }
       throw new RuntimeException(
               "Somehow didn't complete the index update, but didn't return succesfully either!");