You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ka...@apache.org on 2019/11/16 00:41:58 UTC

[phoenix] branch 4.14-HBase-1.3 updated (ce28cb0 -> 2e7ac99)

This is an automated email from the ASF dual-hosted git repository.

kadir pushed a change to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git.


    from ce28cb0  PHOENIX-5564 Restructure read repair to improve readability and correctness
     new 62382d7  PHOENIX-5556 Avoid repeatedly loading IndexMetaData For IndexRegionObserver
     new 2e7ac99  PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../phoenix/hbase/index/IndexRegionObserver.java   | 186 +++++++++++----------
 .../hbase/index/builder/IndexBuildManager.java     |  19 +--
 2 files changed, 108 insertions(+), 97 deletions(-)


[phoenix] 02/02: PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 2e7ac9944df652d59f21fa26c11c4d88789cf1c5
Author: Kadir <ko...@salesforce.com>
AuthorDate: Thu Nov 7 15:50:40 2019 -0800

    PHOENIX-5565 Unify index update structures in IndexRegionObserver and IndexCommitter
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 155 +++++++++++----------
 .../hbase/index/builder/IndexBuildManager.java     |  15 +-
 2 files changed, 84 insertions(+), 86 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index b058b33..340832f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -32,6 +32,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -67,6 +69,7 @@ import org.apache.phoenix.hbase.index.builder.IndexBuilder;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
 import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
@@ -145,16 +148,16 @@ public class IndexRegionObserver extends BaseRegionObserver {
       private final int clientVersion;
       // The collection of index mutations that will be applied before the data table mutations. The empty column (i.e.,
       // the verified column) will have the value false ("unverified") on these mutations
-      private Collection<Pair<Mutation, byte[]>> preIndexUpdates = Collections.emptyList();
+      private ListMultimap<HTableInterfaceReference, Mutation> preIndexUpdates;
       // The collection of index mutations that will be applied after the data table mutations. The empty column (i.e.,
       // the verified column) will have the value true ("verified") on the put mutations
-      private Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+      private ListMultimap<HTableInterfaceReference, Mutation> postIndexUpdates;
       // The collection of candidate index mutations that will be applied after the data table mutations
-      private Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+      private ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> intermediatePostIndexUpdates;
       private List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
       private HashSet<ImmutableBytesPtr> rowsToLock = new HashSet<>();
-      long dataWriteStartTime;
-
+      private long dataWriteStartTime;
+      private boolean rebuild;
       private BatchMutateContext(int clientVersion) {
           this.clientVersion = clientVersion;
       }
@@ -506,6 +509,27 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
+  private void handleLocalIndexUpdates(ObserverContext<RegionCoprocessorEnvironment> c,
+                                       MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                       ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates) {
+      byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
+      HTableInterfaceReference hTableInterfaceReference =
+                          new HTableInterfaceReference(new ImmutableBytesPtr(tableName));
+      List<Pair<Mutation, byte[]>> localIndexUpdates = indexUpdates.removeAll(hTableInterfaceReference);
+      if (localIndexUpdates == null || localIndexUpdates.isEmpty()) {
+          return;
+      }
+      List<Mutation> localUpdates = new ArrayList<Mutation>();
+      Iterator<Pair<Mutation, byte[]>> indexUpdatesItr = localIndexUpdates.iterator();
+      while (indexUpdatesItr.hasNext()) {
+          Pair<Mutation, byte[]> next = indexUpdatesItr.next();
+          localUpdates.add(next.getFirst());
+      }
+      if (!localUpdates.isEmpty()) {
+          miniBatchOp.addOperationsFromCP(0, localUpdates.toArray(new Mutation[localUpdates.size()]));
+      }
+  }
+
   private void prepareIndexMutations(
           ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp,
@@ -513,79 +537,56 @@ public class IndexRegionObserver extends BaseRegionObserver {
           Collection<? extends Mutation> mutations,
           long now,
           PhoenixIndexMetaData indexMetaData) throws Throwable {
-
       List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
-
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
           Span current = scope.getSpan();
           if (current == null) {
               current = NullSpan.INSTANCE;
           }
-
           // get the index updates for all elements in this batch
-          Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
-                  this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);
-
+          context.intermediatePostIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Pair<Mutation, byte[]>>create();
+          this.builder.getIndexUpdates(context.intermediatePostIndexUpdates, miniBatchOp, mutations, indexMetaData);
           current.addTimelineAnnotation("Built index updates, doing preStep");
-          TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
-          byte[] tableName = c.getEnvironment().getRegion().getTableDesc().getTableName().getName();
-          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
-          List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
-          context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
-          context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
-          while(indexUpdatesItr.hasNext()) {
-              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
-              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
-                  localUpdates.add(next.getFirst().getFirst());
-                  indexUpdatesItr.remove();
-              }
-              else {
-                  // get index maintainer for this index table
-                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
-                  if (indexMaintainer == null) {
-                      throw new DoNotRetryIOException(
-                              "preBatchMutateWithExceptions: indexMaintainer is null " +
-                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-                  }
-                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
-                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+          handleLocalIndexUpdates(c, miniBatchOp, context.intermediatePostIndexUpdates);
+          context.preIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+          int updateCount = 0;
+          for (IndexMaintainer indexMaintainer : maintainers) {
+              updateCount++;
+              byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+              byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+              HTableInterfaceReference hTableInterfaceReference =
+                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+              Iterator<Pair<Mutation, byte[]>> indexUpdatesItr =
+                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+              while (indexUpdatesItr.hasNext()) {
+                  Pair<Mutation, byte[]> next = indexUpdatesItr.next();
                   // add the VERIFIED cell, which is the empty cell
-                  Mutation m = next.getFirst().getFirst();
-                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
-                  if (rebuild) {
+                  Mutation m = next.getFirst();
+                  if (context.rebuild) {
+                      indexUpdatesItr.remove();
                       if (m instanceof Put) {
                           long ts = getMaxTimestamp(m);
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
-                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
+                          ((Put) m).addColumn(emptyCF, emptyCQ, ts, VERIFIED_BYTES);
                       }
+                      context.preIndexUpdates.put(hTableInterfaceReference, m);
                   } else {
-                      indexUpdatesItr.remove();
                       // For this mutation whether it is put or delete, set the status of the index row "unverified"
                       // This will be done before the data table row is updated (i.e., in the first write phase)
                       Put unverifiedPut = new Put(m.getRow());
                       unverifiedPut.addColumn(emptyCF, emptyCQ, now - 1, UNVERIFIED_BYTES);
-                      context.preIndexUpdates.add(new Pair <Mutation, byte[]>(unverifiedPut, next.getFirst().getSecond()));
+                      context.preIndexUpdates.put(hTableInterfaceReference, unverifiedPut);
                       if (m instanceof Put) {
                           // Remove the empty column prepared by Index codec as we need to change its value
                           removeEmptyColumn(m, emptyCF, emptyCQ);
                           ((Put) m).addColumn(emptyCF, emptyCQ, now, VERIFIED_BYTES);
                       }
-                      context.intermediatePostIndexUpdates.add(next);
                   }
               }
           }
-          if (!localUpdates.isEmpty()) {
-              miniBatchOp.addOperationsFromCP(0,
-                      localUpdates.toArray(new Mutation[localUpdates.size()]));
-          }
-          if (!indexUpdates.isEmpty() && context.preIndexUpdates.isEmpty()) {
-              context.preIndexUpdates = new ArrayList<>(indexUpdates.size());
-          }
-          for (Pair<Pair<Mutation, byte[]>, byte[]> update : indexUpdates) {
-              context.preIndexUpdates.add(update.getFirst());
-          }
+          TracingUtils.addAnnotation(current, "index update count", updateCount);
       }
   }
 
@@ -610,20 +611,23 @@ public class IndexRegionObserver extends BaseRegionObserver {
       setBatchMutateContext(c, context);
       Mutation firstMutation = miniBatchOp.getOperation(0);
       ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
+      context.rebuild = replayWrite != null;
       /*
        * Exclusively lock all rows so we get a consistent read
        * while determining the index updates
        */
-      if (replayWrite == null) {
+      long now;
+      if (!context.rebuild) {
           populateRowsToLock(miniBatchOp, context);
           lockRows(context);
-      }
-      long now = EnvironmentEdgeManager.currentTimeMillis();
-      // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
-      // concurrent updates
-      if (replayWrite == null) {
+          now = EnvironmentEdgeManager.currentTimeMillis();
+          // Add the table rows in the mini batch to the collection of pending rows. This will be used to detect
+          // concurrent updates
           populatePendingRows(context);
       }
+      else {
+          now = EnvironmentEdgeManager.currentTimeMillis();
+      }
       // First group all the updates for a single row into a single update to be processed
       Collection<? extends Mutation> mutations = groupMutations(miniBatchOp, now, replayWrite);
       // early exit if it turns out we don't have any edits
@@ -646,9 +650,11 @@ public class IndexRegionObserver extends BaseRegionObserver {
       for (RowLock rowLock : context.rowLocks) {
           rowLock.release();
       }
-      // Do the index updates
+      // Do the first phase index updates
       doPre(c, context, miniBatchOp);
-      if (replayWrite == null) {
+      context.postIndexUpdates = ArrayListMultimap.<HTableInterfaceReference, Mutation>create();
+      if (!context.rebuild) {
+          List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
           // Acquire the locks again before letting the region proceed with data table updates
           List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(context.rowLocks.size());
           for (RowLock rowLock : context.rowLocks) {
@@ -658,29 +664,26 @@ public class IndexRegionObserver extends BaseRegionObserver {
           context.rowLocks.clear();
           context.rowLocks = rowLocks;
           // Check if we need to skip post index update for any of the row
-          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> iterator = context.intermediatePostIndexUpdates.iterator();
-          while (iterator.hasNext()) {
-              // Check if this row is going through another mutation which has a newer timestamp. If so,
-              // ignore the pending updates for this row
-              Pair<Pair<Mutation, byte[]>, byte[]> update = iterator.next();
-              ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
-              PendingRow pendingRow = pendingRows.get(rowKey);
-              // Are there concurrent updates on the data table row? if so, skip post index updates
-              // and let read repair resolve conflicts
-              if (pendingRow.isConcurrent()) {
-                  iterator.remove();
+          for (IndexMaintainer indexMaintainer : maintainers) {
+              HTableInterfaceReference hTableInterfaceReference =
+                      new HTableInterfaceReference(new ImmutableBytesPtr(indexMaintainer.getIndexTableName()));
+              Iterator<Pair<Mutation, byte[]>> iterator =
+                      context.intermediatePostIndexUpdates.get(hTableInterfaceReference).iterator();
+              while (iterator.hasNext()) {
+                  // Are there concurrent updates on the data table row? if so, skip post index updates
+                  // and let read repair resolve conflicts
+                  Pair<Mutation, byte[]> update = iterator.next();
+                  ImmutableBytesPtr rowKey = new ImmutableBytesPtr(update.getSecond());
+                  PendingRow pendingRow = pendingRows.get(rowKey);
+                  if (!pendingRow.isConcurrent()) {
+                      context.postIndexUpdates.put(hTableInterfaceReference, update.getFirst());
+                  }
               }
           }
           // We are done with handling concurrent mutations. So we can remove the rows of this batch from
           // the collection of pending rows
           removePendingRows(context);
       }
-      if (context.postIndexUpdates.isEmpty() && !context.intermediatePostIndexUpdates.isEmpty()) {
-          context.postIndexUpdates = new ArrayList<>(context.intermediatePostIndexUpdates.size());
-      }
-      for (Pair<Pair<Mutation, byte[]>, byte[]> update : context.intermediatePostIndexUpdates) {
-          context.postIndexUpdates.add(update.getFirst());
-      }
       if (failDataTableUpdatesForTesting) {
           throw new DoNotRetryIOException("Simulating the data table write failure");
       }
@@ -758,7 +761,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
   private void doIndexWritesWithExceptions(BatchMutateContext context, boolean post)
             throws IOException {
-      Collection<Pair<Mutation, byte[]>> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
+      ListMultimap<HTableInterfaceReference, Mutation> indexUpdates = post ? context.postIndexUpdates : context.preIndexUpdates;
       //short circuit, if we don't need to do any work
 
       if (context == null || indexUpdates.isEmpty()) {
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 7639a49..90d28b8 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import com.google.common.collect.ListMultimap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
@@ -34,6 +35,8 @@ import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
 import org.apache.phoenix.hbase.index.Indexer;
 import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.PhoenixIndexMetaData;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -79,7 +82,7 @@ public class IndexBuildManager implements Stoppable {
       return this.delegate.getIndexMetaData(miniBatchOp);
   }
 
-  public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
+  public void getIndexUpdates(ListMultimap<HTableInterfaceReference, Pair<Mutation, byte[]>> indexUpdates,
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations,
       IndexMetaData indexMetaData) throws Throwable {
@@ -87,20 +90,12 @@ public class IndexBuildManager implements Stoppable {
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
-    ArrayList<Pair<Pair<Mutation, byte[]>, byte[]>> results = new ArrayList<>(mutations.size());
     for (Mutation m : mutations) {
       Collection<Pair<Mutation, byte[]>> updates = delegate.getIndexUpdate(m, indexMetaData);
-      if (PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap())) {
-          for (Pair<Mutation, byte[]> update : updates) {
-            update.getFirst().setAttribute(BaseScannerRegionObserver.REPLAY_WRITES,
-                BaseScannerRegionObserver.REPLAY_INDEX_REBUILD_WRITES);
-          }
-      }
       for (Pair<Mutation, byte[]> update : updates) {
-        results.add(new Pair<>(update, m.getRow()));
+        indexUpdates.put(new HTableInterfaceReference(new ImmutableBytesPtr(update.getSecond())), new Pair<>(update.getFirst(), m.getRow()));
       }
     }
-    return results;
   }
 
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(


[phoenix] 01/02: PHOENIX-5556 Avoid repeatedly loading IndexMetaData For IndexRegionObserver

Posted by ka...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 62382d7d085ba976a44facd0a98da02172e04d5c
Author: chenglei <ch...@apache.org>
AuthorDate: Thu Nov 7 10:29:05 2019 +0800

    PHOENIX-5556 Avoid repeatedly loading IndexMetaData For IndexRegionObserver
---
 .../phoenix/hbase/index/IndexRegionObserver.java   | 39 ++++++++++++++--------
 .../hbase/index/builder/IndexBuildManager.java     |  4 +--
 2 files changed, 28 insertions(+), 15 deletions(-)

diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
index 27eb647..b058b33 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexRegionObserver.java
@@ -506,16 +506,15 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
-  private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
-                                     MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
-                                     Collection<? extends Mutation> mutations, long now) throws Throwable {
-      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
-      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
-          throw new DoNotRetryIOException(
-                  "preBatchMutateWithExceptions: indexMetaData is not an instance of PhoenixIndexMetaData " +
-                          c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
-      }
-      List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
+  private void prepareIndexMutations(
+          ObserverContext<RegionCoprocessorEnvironment> c,
+          MiniBatchOperationInProgress<Mutation> miniBatchOp,
+          BatchMutateContext context,
+          Collection<? extends Mutation> mutations,
+          long now,
+          PhoenixIndexMetaData indexMetaData) throws Throwable {
+
+      List<IndexMaintainer> maintainers = indexMetaData.getIndexMaintainers();
 
       // get the current span, or just use a null-span to avoid a bunch of if statements
       try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
@@ -526,7 +525,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
 
           // get the index updates for all elements in this batch
           Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
-                  this.builder.getIndexUpdates(miniBatchOp, mutations);
+                  this.builder.getIndexUpdates(miniBatchOp, mutations, indexMetaData);
 
           current.addTimelineAnnotation("Built index updates, doing preStep");
           TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
@@ -590,10 +589,24 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
   }
 
+  protected PhoenixIndexMetaData getPhoenixIndexMetaData(
+          ObserverContext<RegionCoprocessorEnvironment> observerContext,
+          MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+          throw new DoNotRetryIOException(
+                  "preBatchMutateWithExceptions: indexMetaData is not an instance of "+PhoenixIndexMetaData.class.getName() +
+                          ", current table is:" +
+                          observerContext.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+      }
+      return (PhoenixIndexMetaData)indexMetaData;
+  }
+
   public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
           MiniBatchOperationInProgress<Mutation> miniBatchOp) throws Throwable {
       ignoreAtomicOperations(miniBatchOp);
-      BatchMutateContext context = new BatchMutateContext(this.builder.getIndexMetaData(miniBatchOp).getClientVersion());
+      PhoenixIndexMetaData indexMetaData = getPhoenixIndexMetaData(c, miniBatchOp);
+      BatchMutateContext context = new BatchMutateContext(indexMetaData.getClientVersion());
       setBatchMutateContext(c, context);
       Mutation firstMutation = miniBatchOp.getOperation(0);
       ReplayWrite replayWrite = this.builder.getReplayWrite(firstMutation);
@@ -619,7 +632,7 @@ public class IndexRegionObserver extends BaseRegionObserver {
       }
 
       long start = EnvironmentEdgeManager.currentTimeMillis();
-      prepareIndexMutations(c, miniBatchOp, context, mutations, now);
+      prepareIndexMutations(c, miniBatchOp, context, mutations, now, indexMetaData);
       metricSource.updateIndexPrepareTime(EnvironmentEdgeManager.currentTimeMillis() - start);
 
       // Sleep for one millisecond if we have prepared the index updates in less than 1 ms. The sleep is necessary to
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 6b7e416..7639a49 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -81,9 +81,9 @@ public class IndexBuildManager implements Stoppable {
 
   public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> getIndexUpdates(
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
-      Collection<? extends Mutation> mutations) throws Throwable {
+      Collection<? extends Mutation> mutations,
+      IndexMetaData indexMetaData) throws Throwable {
     // notify the delegate that we have started processing a batch
-    final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
     // Avoid the Object overhead of the executor when it's not actually parallelizing anything.