You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@phoenix.apache.org by GitBox <gi...@apache.org> on 2019/05/23 21:51:42 UTC

[GitHub] [phoenix] vincentpoon commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes for Non-Transactional Tables

vincentpoon commented on a change in pull request #469: PHOENIX-5156 Consistent Global Indexes for Non-Transactional Tables
URL: https://github.com/apache/phoenix/pull/469#discussion_r287105359
 
 

 ##########
 File path: phoenix-core/src/main/java/org/apache/phoenix/hbase/index/GlobalIndexer.java
 ##########
 @@ -0,0 +1,949 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import static org.apache.phoenix.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+import static org.apache.phoenix.index.IndexMaintainer.getIndexMaintainer;
+import static org.apache.phoenix.schema.types.PDataType.FALSE_BYTES;
+import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.OperationStatus;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.htrace.Span;
+import org.apache.htrace.Trace;
+import org.apache.htrace.TraceScope;
+import org.apache.phoenix.coprocessor.BaseScannerRegionObserver.ReplayWrite;
+import org.apache.phoenix.coprocessor.DelegateRegionCoprocessorEnvironment;
+import org.apache.phoenix.hbase.index.LockManager.RowLock;
+import org.apache.phoenix.hbase.index.builder.FatalIndexBuildingFailureException;
+import org.apache.phoenix.hbase.index.builder.IndexBuildManager;
+import org.apache.phoenix.hbase.index.builder.IndexBuilder;
+import org.apache.phoenix.hbase.index.covered.IndexMetaData;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSource;
+import org.apache.phoenix.hbase.index.metrics.MetricsIndexerSourceFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.util.VersionUtil;
+import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
+import org.apache.phoenix.hbase.index.write.IndexFailurePolicy;
+import org.apache.phoenix.hbase.index.write.IndexWriter;
+import org.apache.phoenix.hbase.index.write.LazyParallelWriterIndexCommitter;
+import org.apache.phoenix.hbase.index.write.RecoveryIndexWriter;
+import org.apache.phoenix.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.phoenix.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.phoenix.index.IndexMaintainer;
+import org.apache.phoenix.index.PhoenixIndexMetaData;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.trace.TracingUtils;
+import org.apache.phoenix.trace.util.NullSpan;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.ScanUtil;
+import org.apache.phoenix.util.ServerUtil;
+import org.apache.phoenix.util.ServerUtil.ConnectionType;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Multimap;
+
+/**
+ * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
+ * to an {@link IndexBuilder} to determine the actual updates to make.
+ * We don't need to implement {@link #postPut(ObserverContext, Put, WALEdit, Durability)} and
+ * {@link #postDelete(ObserverContext, Delete, WALEdit, Durability)} hooks because
+ * Phoenix always does batch mutations.
+ * <p>
+ */
+public class GlobalIndexer implements RegionObserver, RegionCoprocessor {
+
+  private static final Log LOG = LogFactory.getLog(GlobalIndexer.class);
+  private static final OperationStatus IGNORE = new OperationStatus(OperationStatusCode.SUCCESS);
+  private static final OperationStatus NOWRITE = new OperationStatus(OperationStatusCode.SUCCESS);
+
+  /**
+   * Class to represent pending data table rows
+   */
+  private static class PendingRow {
+      private long latestTimestamp;
+      private long count;
+      PendingRow(long latestTimestamp) {
+          count = 1;
+          this.latestTimestamp = latestTimestamp;
+      }
+
+      public void add(long timestamp) {
+          count++;
+          if (latestTimestamp < timestamp) {
+              latestTimestamp = timestamp;
+          }
+      }
+
+      public void remove() {
+          count--;
+      }
+
+      public long getCount() {
+          return count;
+      }
+
+      public long getLatestTimestamp() {
+          return latestTimestamp;
+      }
+  }
+
+  // Index writers get invoked before and after data table updates
+  protected IndexWriter preWriter;
+  protected IndexWriter postWriter;
+
+  protected IndexBuildManager builder;
+  private LockManager lockManager;
+  // The collection of pending data table rows
+  private Map<ImmutableBytesPtr, PendingRow> pendingRows = new HashMap<>();
+
+  private static boolean skipPostIndexUpdatesForTesting = false;
+  private static boolean skipDataTableUpdatesForTesting = false;
+
+  public static void setSkipPostIndexUpdatesForTesting(boolean skip) {
+      skipPostIndexUpdatesForTesting = skip;
+  }
+
+  public static void setSkipDataTableUpdatesForTesting(boolean skip) {
+      skipDataTableUpdatesForTesting = skip;
+  }
+
+  // Hack to get around not being able to save any state between
+  // coprocessor calls. TODO: remove after HBASE-18127 when available
+  private static class BatchMutateContext {
+      public final int clientVersion;
+      public Collection<Pair<Mutation, byte[]>> indexUpdates = Collections.emptyList();
+      public Collection<Pair<Mutation, byte[]>> postIndexUpdates = Collections.emptyList();
+      public Collection<Pair<Pair<Mutation, byte[]>, byte[]>> intermediatePostIndexUpdates;
+      public List<RowLock> rowLocks = Lists.newArrayListWithExpectedSize(QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE);
+      // The set of row keys for the data table rows of this batch such that for each of these rows there exists another
+      // batch with a timestamp earlier than the timestamp of this batch and the earlier batch has a mutation on the
+      // row (i.e., concurrent updates).
+      HashSet<ImmutableBytesPtr> pendingRows = new HashSet<>();
+
+      public BatchMutateContext(int clientVersion) {
+          this.clientVersion = clientVersion;
+      }
+  }
+  
+  private ThreadLocal<BatchMutateContext> batchMutateContext =
+          new ThreadLocal<BatchMutateContext>();
+  
+  /** Configuration key for the {@link IndexBuilder} to use */
+  public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
+
+  /**
+   * Configuration key for if the indexer should check the version of HBase is running. Generally,
+   * you only want to ignore this for testing or for custom versions of HBase.
+   */
+  public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
+
+  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hadoop.hbase.index.recovery.failurepolicy";
+
+  public static final String INDEX_LAZY_POST_BATCH_WRITE = "org.apache.hadoop.hbase.index.lazy.post_batch.write";
+  private static final boolean INDEX_LAZY_POST_BATCH_WRITE_DEFAULT = false;
+
+  private static final String INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.post.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.batch.mutate.threshold";
+  private static final long INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT = 3_000;
+  private static final String INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.wal.restore.threshold";
+  private static final long INDEXER_PRE_WAL_RESTORE_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.open.threshold";
+  private static final long INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT = 3_000;
+  private static final String INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY = "phoenix.indexer.slow.pre.increment";
+  private static final long INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT = 3_000;
+
+  /**
+   * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
+   * more robust in the face of recoverying index regions that were on the same server as the
+   * primary table region
+   */
+  private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
+
+  /**
+   * IndexWriter for writing the recovered index edits. Separate from the main indexer since we need
+   * different write/failure policies
+   */
+  private IndexWriter recoveryWriter;
+
+  private MetricsIndexerSource metricSource;
+
+  private boolean stopped;
+  private boolean disabled;
+  private long slowIndexWriteThreshold;
+  private long slowIndexPrepareThreshold;
+  private long slowPostOpenThreshold;
+  private long slowPreIncrementThreshold;
+  private int rowLockWaitDuration;
+
+  public static final int INDEXING_SUPPORTED_MAJOR_VERSION = VersionUtil
+            .encodeMaxPatchVersion(0, 94);
+  public static final int INDEXING_SUPPORTED__MIN_MAJOR_VERSION = VersionUtil
+            .encodeVersion("0.94.0");
+  private static final int INDEX_WAL_COMPRESSION_MINIMUM_SUPPORTED_VERSION = VersionUtil
+            .encodeVersion("0.94.9");
+
+  private static final int DEFAULT_ROWLOCK_WAIT_DURATION = 30000;
+
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+      try {
+        final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+        String serverName = env.getServerName().getServerName();
+        if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
+          // make sure the right version <-> combinations are allowed.
+          String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
+          if (errormsg != null) {
+              throw new FatalIndexBuildingFailureException(errormsg);
+          }
+        }
+
+        this.builder = new IndexBuildManager(env);
+        // Clone the config since it is shared
+        DelegateRegionCoprocessorEnvironment indexWriterEnv = new DelegateRegionCoprocessorEnvironment(env, ConnectionType.INDEX_WRITER_CONNECTION);
+        // setup the actual index preWriter
+        this.preWriter = new IndexWriter(indexWriterEnv, serverName + "-index-preWriter");
+        if (env.getConfiguration().getBoolean(INDEX_LAZY_POST_BATCH_WRITE, INDEX_LAZY_POST_BATCH_WRITE_DEFAULT)) {
+            this.postWriter = new IndexWriter(indexWriterEnv, new LazyParallelWriterIndexCommitter(), serverName + "-index-postWriter");
+        }
+        else {
+            this.postWriter = this.preWriter;
+        }
+        
+        this.rowLockWaitDuration = env.getConfiguration().getInt("hbase.rowlock.wait.duration",
+                DEFAULT_ROWLOCK_WAIT_DURATION);
+        this.lockManager = new LockManager();
+
+        // Metrics impl for the Indexer -- avoiding unnecessary indirection for hadoop-1/2 compat
+        this.metricSource = MetricsIndexerSourceFactory.getInstance().create();
+        setSlowThresholds(e.getConfiguration());
+
+        try {
+          // get the specified failure policy. We only ever override it in tests, but we need to do it
+          // here
+          Class<? extends IndexFailurePolicy> policyClass =
+              env.getConfiguration().getClass(INDEX_RECOVERY_FAILURE_POLICY_KEY,
+                StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
+          IndexFailurePolicy policy =
+              policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits);
+          LOG.debug("Setting up recovery writter with failure policy: " + policy.getClass());
+          recoveryWriter =
+              new RecoveryIndexWriter(policy, indexWriterEnv, serverName + "-recovery-writer");
+        } catch (Exception ex) {
+          throw new IOException("Could not instantiate recovery failure policy!", ex);
+        }
+      } catch (NoSuchMethodError ex) {
+          disabled = true;
+          LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
+      }
+  }
+
+  /**
+   * Extracts the slow call threshold values from the configuration.
+   */
+  private void setSlowThresholds(Configuration c) {
+      slowIndexPrepareThreshold = c.getLong(INDEXER_INDEX_WRITE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_WRITE_SLOW_THRESHOLD_DEFAULT);
+      slowIndexWriteThreshold = c.getLong(INDEXER_INDEX_PREPARE_SLOW_THRESHOLD_KEY,
+          INDEXER_INDEX_PREPARE_SLOW_THREHSOLD_DEFAULT);
+      slowPostOpenThreshold = c.getLong(INDEXER_POST_OPEN_SLOW_THRESHOLD_KEY,
+          INDEXER_POST_OPEN_SLOW_THRESHOLD_DEFAULT);
+      slowPreIncrementThreshold = c.getLong(INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_KEY,
+          INDEXER_PRE_INCREMENT_SLOW_THRESHOLD_DEFAULT);
+  }
+
+  private String getCallTooSlowMessage(String callName, long duration, long threshold) {
+      StringBuilder sb = new StringBuilder(64);
+      sb.append("(callTooSlow) ").append(callName).append(" duration=").append(duration);
+      sb.append("ms, threshold=").append(threshold).append("ms");
+      return sb.toString();
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    if (this.stopped) {
+      return;
+    }
+    if (this.disabled) {
+        return;
+      }
+    this.stopped = true;
+    String msg = "Indexer is being stopped";
+    this.builder.stop(msg);
+    this.preWriter.stop(msg);
+    this.recoveryWriter.stop(msg);
+    this.postWriter.stop(msg);
+  }
+
+  /**
+   * We use an Increment to serialize the ON DUPLICATE KEY clause so that the HBase plumbing
+   * sets up the necessary locks and mvcc to allow an atomic update. The Increment is not a
+   * real increment, though, it's really more of a Put. We translate the Increment into a
+   * list of mutations, at most a single Put and Delete that are the changes upon executing
+   * the list of ON DUPLICATE KEY clauses for this row.
+   */
+  @Override
+  public Result preIncrementAfterRowLock(final ObserverContext<RegionCoprocessorEnvironment> e,
+          final Increment inc) throws IOException {
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          List<Mutation> mutations = this.builder.executeAtomicOp(inc);
+          if (mutations == null) {
+              return null;
+          }
+
+          // Causes the Increment to be ignored as we're committing the mutations
+          // ourselves below.
+          e.bypass();
+          // ON DUPLICATE KEY IGNORE will return empty list if row already exists
+          // as no action is required in that case.
+          if (!mutations.isEmpty()) {
+              Region region = e.getEnvironment().getRegion();
+              // Otherwise, submit the mutations directly here
+                region.batchMutate(mutations.toArray(new Mutation[0]));
+          }
+          return Result.EMPTY_RESULT;
+      } catch (Throwable t) {
+          throw ServerUtil.createIOException(
+                  "Unable to process ON DUPLICATE IGNORE for " + 
+                  e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString() + 
+                  "(" + Bytes.toStringBinary(inc.getRow()) + ")", t);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preIncrementAfterRowLock", duration, slowPreIncrementThreshold));
+              }
+              metricSource.incrementSlowDuplicateKeyCheckCalls();
+          }
+          metricSource.updateDuplicateKeyCheckTime(duration);
+      }
+  }
+
+  @Override
+  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+      if (this.disabled) {
+          return;
+      }
+      long start = EnvironmentEdgeManager.currentTimeMillis();
+      try {
+          preBatchMutateWithExceptions(c, miniBatchOp);
+          return;
+      } catch (Throwable t) {
+          rethrowIndexingException(t);
+      } finally {
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("preBatchMutate", duration, slowIndexPrepareThreshold));
+              }
+              metricSource.incrementNumSlowIndexPrepareCalls();
+          }
+          metricSource.updateIndexPrepareTime(duration);
+      }
+      throw new RuntimeException(
+        "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+  }
+
+  private static void setTimeStamp(KeyValue kv, byte[] tsBytes) {
+      int tsOffset = kv.getTimestampOffset();
+      System.arraycopy(tsBytes, 0, kv.getBuffer(), tsOffset, Bytes.SIZEOF_LONG);
+  }
+  private long getMaxTimestamp(Mutation m) {
+      long maxTs = 0;
+      long ts = 0;
+      Iterator iterator = m.getFamilyCellMap().entrySet().iterator();
+      while (iterator.hasNext()) {
+          Map.Entry<byte[], List<Cell>> entry = (Map.Entry) iterator.next();
+          Iterator<Cell> cellIterator = entry.getValue().iterator();
+          while (cellIterator.hasNext()) {
+              Cell cell = cellIterator.next();
+              ts = cell.getTimestamp();
+              if (ts > maxTs) {
+                  maxTs = ts;
+              }
+          }
+      }
+      return maxTs;
+  }
+
+  private void ignoreAtomicOperations (MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isAtomicOp(m)) {
+              miniBatchOp.setOperationStatus(i, IGNORE);
+              continue;
+          }
+      }
+  }
+
+  private void lockRows(MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context) throws IOException {
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+              continue;
+          }
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isEnabled(m)) {
+              context.rowLocks.add(lockManager.lockRow(m.getRow(), rowLockWaitDuration));
+          }
+      }
+  }
+
+  private void populatePendingRows(BatchMutateContext context, long now) {
+      synchronized (this) {
+          for (RowLock rowLock : context.rowLocks) {
+              ImmutableBytesPtr rowKey = rowLock.getRowKey();
+              PendingRow pendingRow = pendingRows.get(rowKey);
+              if (pendingRow == null) {
+                  pendingRows.put(rowKey, new PendingRow(now));
+              } else {
+                  // m is a mutation on a row that has already a pending mutation in progress from another batch
+                  pendingRow.add(now);
+                  context.pendingRows.add(rowKey);
+              }
+          }
+      }
+  }
+
+  private Collection<? extends Mutation> groupMutations(MiniBatchOperationInProgress<Mutation> miniBatchOp,
+                                                        long now, ReplayWrite replayWrite) throws IOException {
+      Map<ImmutableBytesPtr, MultiMutation> mutationsMap = new HashMap<>();
+      boolean copyMutations = false;
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          if (miniBatchOp.getOperationStatus(i) == IGNORE) {
+              continue;
+          }
+          Mutation m = miniBatchOp.getOperation(i);
+          if (this.builder.isEnabled(m)) {
+              // Track whether or not we need to
+              ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+              if (mutationsMap.containsKey(row)) {
+                  copyMutations = true;
+              } else {
+                  mutationsMap.put(row, null);
+              }
+          }
+      }
+      // early exit if it turns out we don't have any edits
+      if (mutationsMap.isEmpty()) {
+          return null;
+      }
+      // If we're copying the mutations
+      Collection<Mutation> originalMutations;
+      Collection<? extends Mutation> mutations;
+      if (copyMutations) {
+          originalMutations = null;
+          mutations = mutationsMap.values();
+      } else {
+          originalMutations = Lists.newArrayListWithExpectedSize(mutationsMap.size());
+          mutations = originalMutations;
+      }
+
+      boolean resetTimeStamp = replayWrite == null;
+
+      for (int i = 0; i < miniBatchOp.size(); i++) {
+          Mutation m = miniBatchOp.getOperation(i);
+          // skip this mutation if we aren't enabling indexing
+          // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+          // should be indexed, which means we need to expose another method on the builder. Such is the
+          // way optimization go though.
+          if (miniBatchOp.getOperationStatus(i) != IGNORE && this.builder.isEnabled(m)) {
+              if (resetTimeStamp) {
+                  // Unless we're replaying edits to rebuild the index, we update the time stamp
+                  // of the data table to prevent overlapping time stamps (which prevents index
+                  // inconsistencies as this case isn't handled correctly currently).
+                  for (List<Cell> cells : m.getFamilyCellMap().values()) {
+                      for (Cell cell : cells) {
+                          CellUtil.setTimestamp(cell, now);
+                      }
+                  }
+              }
+              // No need to write the table mutations when we're rebuilding
+              // the index as they're already written and just being replayed.
+              if (replayWrite == ReplayWrite.INDEX_ONLY
+                      || replayWrite == ReplayWrite.REBUILD_INDEX_ONLY || skipDataTableUpdatesForTesting) {
+                  miniBatchOp.setOperationStatus(i, NOWRITE);
+              }
+
+              // Only copy mutations if we found duplicate rows
+              // which only occurs when we're partially rebuilding
+              // the index (since we'll potentially have both a
+              // Put and a Delete mutation for the same row).
+              if (copyMutations) {
+                  // Add the mutation to the batch set
+
+                  ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+                  MultiMutation stored = mutationsMap.get(row);
+                  // we haven't seen this row before, so add it
+                  if (stored == null) {
+                      stored = new MultiMutation(row);
+                      mutationsMap.put(row, stored);
+                  }
+                  stored.addAll(m);
+              } else {
+                  originalMutations.add(m);
+              }
+          }
+      }
+
+      if (copyMutations || replayWrite != null) {
+          mutations = IndexManagementUtil.flattenMutationsByTimestamp(mutations);
+      }
+      return mutations;
+  }
+
+  private void prepareIndexMutations(ObserverContext<RegionCoprocessorEnvironment> c,
+                                     MiniBatchOperationInProgress<Mutation> miniBatchOp, BatchMutateContext context,
+                                     Collection<? extends Mutation> mutations) throws Throwable {
+      IndexMetaData indexMetaData = this.builder.getIndexMetaData(miniBatchOp);
+      if (!(indexMetaData instanceof PhoenixIndexMetaData)) {
+          throw new DoNotRetryIOException(
+                  "preBatchMutateWithExceptions: indexMetaData is not an instance of PhoenixIndexMetaData " +
+                          c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+      }
+      List<IndexMaintainer> maintainers = ((PhoenixIndexMetaData)indexMetaData).getIndexMaintainers();
+
+      List<Pair<Mutation, byte[]>> indexUpdatesForDeletes;
+      // get the current span, or just use a null-span to avoid a bunch of if statements
+      try (TraceScope scope = Trace.startSpan("Starting to build index updates")) {
+          Span current = scope.getSpan();
+          if (current == null) {
+              current = NullSpan.INSTANCE;
+          }
+          long start = EnvironmentEdgeManager.currentTimeMillis();
+
+          // get the index updates for all elements in this batch
+          Collection<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdates =
+                  this.builder.getGlobalIndexUpdate(miniBatchOp, mutations);
+
+          long duration = EnvironmentEdgeManager.currentTimeMillis() - start;
+          if (duration >= slowIndexPrepareThreshold) {
+              if (LOG.isDebugEnabled()) {
+                  LOG.debug(getCallTooSlowMessage("indexPrepare", duration, slowIndexPrepareThreshold));
+              }
+              metricSource.incrementNumSlowIndexPrepareCalls();
+          }
+          metricSource.updateIndexPrepareTime(duration);
+          current.addTimelineAnnotation("Built index updates, doing preStep");
+          TracingUtils.addAnnotation(current, "index update count", indexUpdates.size());
+          byte[] tableName = c.getEnvironment().getRegion().getTableDescriptor().getTableName().getName();
+          Iterator<Pair<Pair<Mutation, byte[]>, byte[]>> indexUpdatesItr = indexUpdates.iterator();
+          List<Mutation> localUpdates = new ArrayList<Mutation>(indexUpdates.size());
+          indexUpdatesForDeletes = new ArrayList<>(indexUpdates.size());
+          context.intermediatePostIndexUpdates = new ArrayList<>(indexUpdates.size());
+          while(indexUpdatesItr.hasNext()) {
+              Pair<Pair<Mutation, byte[]>, byte[]> next = indexUpdatesItr.next();
+              if (Bytes.compareTo(next.getFirst().getSecond(), tableName) == 0) {
+                  localUpdates.add(next.getFirst().getFirst());
+                  indexUpdatesItr.remove();
+              }
+              else {
+                  // get index maintainer for this index table
+                  IndexMaintainer indexMaintainer = getIndexMaintainer(maintainers, next.getFirst().getSecond());
+                  if (indexMaintainer == null) {
+                      throw new DoNotRetryIOException(
+                              "preBatchMutateWithExceptions: indexMaintainer is null " +
+                                      c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString());
+                  }
+                  byte[] emptyCF = indexMaintainer.getEmptyKeyValueFamily().copyBytesIfNecessary();
+                  byte[] emptyCQ = indexMaintainer.getEmptyKeyValueQualifier();
+                  // add the VERIFIED cell, which is the empty cell
+                  Mutation m = next.getFirst().getFirst();
+                  boolean rebuild = PhoenixIndexMetaData.isIndexRebuild(m.getAttributesMap());
+                  long ts = getMaxTimestamp(m);
+                  if (rebuild) {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                      }
+                  } else {
+                      if (m instanceof Put) {
+                          ((Put)m).addColumn(emptyCF, emptyCQ, ts, FALSE_BYTES);
+                          // Ignore post index updates (i.e., the third write phase updates) for this row if it is
+                          // going through concurrent updates
+                          ImmutableBytesPtr rowKey = new ImmutableBytesPtr(next.getSecond());
+                          if (!context.pendingRows.contains(rowKey)) {
+                              Put put = new Put(m.getRow());
+                              put.addColumn(emptyCF, emptyCQ, ts, TRUE_BYTES);
+                              context.intermediatePostIndexUpdates.add(new Pair<>(new Pair<>(put, next.getFirst().getSecond()), next.getSecond()));
+                          }
+                      } else {
+                          // For a delete mutation, first unverify the exiting row in the index table and then delete
 
 Review comment:
   nit: typo "existing"

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services