You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2017/07/21 21:51:58 UTC

[3/4] phoenix git commit: PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path

PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce71efc9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce71efc9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce71efc9

Branch: refs/heads/4.x-HBase-1.1
Commit: ce71efc9fe19f03524b24e69ecaa19b0f03846de
Parents: c2a7389
Author: Josh Elser <el...@apache.org>
Authored: Fri Jul 7 16:40:27 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jul 21 17:35:11 2017 -0400

----------------------------------------------------------------------
 .../org/apache/phoenix/hbase/index/Indexer.java |  4 +-
 .../hbase/index/builder/IndexBuildManager.java  | 78 ++------------------
 .../hbase/index/covered/LocalTableState.java    | 24 +++++-
 .../example/CoveredColumnIndexCodec.java        | 21 ++++--
 4 files changed, 47 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 5a78c94..38401d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -610,7 +610,9 @@ public class Indexer extends BaseRegionObserver {
    * @return the mutations to apply to the index tables
    */
   private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
-    Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+    // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
+    int initialSize = Math.min(edit.size(), 64);
+    Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
     for (Cell kv : edit.getCells()) {
       if (kv instanceof IndexedKeyValue) {
         IndexedKeyValue ikv = (IndexedKeyValue) kv;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 325904d..c015a77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -45,35 +45,14 @@ import com.google.common.util.concurrent.MoreExecutors;
 
 /**
  * Manage the building of index updates from primary table updates.
- * <p>
- * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
- * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
  */
 public class IndexBuildManager implements Stoppable {
 
   private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
   private final IndexBuilder delegate;
-  private QuickFailingTaskRunner pool;
   private boolean stopped;
 
   /**
-   * Set the number of threads with which we can concurrently build index updates. Unused threads
-   * will be released, but setting the number of threads too high could cause frequent swapping and
-   * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
-   * of time building index updates, it could be worthwhile to spend the time to tune this parameter
-   * as it could lead to dramatic increases in speed.
-   */
-  public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
-  /** Default to a single thread. This is the safest course of action, but the slowest as well */
-  private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
-  /**
-   * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
-   * threads and will re-create them as needed, up to the configured max
-   */
-  private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
-      "index.builder.threads.keepalivetime";
-
-  /**
    * @param env environment in which <tt>this</tt> is running. Used to setup the
    *          {@link IndexBuilder} and executor
    * @throws IOException if an {@link IndexBuilder} cannot be correctly steup
@@ -81,7 +60,7 @@ public class IndexBuildManager implements Stoppable {
   public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
     // Prevent deadlock by using single thread for all reads so that we know
     // we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
-    this(getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor()));
+    this.delegate = getIndexBuilder(env);
   }
   
   private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -101,20 +80,6 @@ public class IndexBuildManager implements Stoppable {
     }
   }
 
-  private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
-    String serverName = env.getRegionServerServices().getServerName().getServerName();
-    return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
-        setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
-        setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
-          DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
-  }
-
-  public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
-    this.delegate = builder;
-    this.pool = pool;
-  }
-
-
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
       MiniBatchOperationInProgress<Mutation> miniBatchOp,
       Collection<? extends Mutation> mutations) throws Throwable {
@@ -122,41 +87,11 @@ public class IndexBuildManager implements Stoppable {
     final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
     this.delegate.batchStarted(miniBatchOp, indexMetaData);
 
-    // parallelize each mutation into its own task
-    // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
-    // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
-    // acknowledged on each thread before doing the actual lookup, but after that depends on the
-    // underlying builder to look for the closed flag.
-    TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
-        new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
-    for (final Mutation m : mutations) {
-      tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
-
-        @Override
-        public Collection<Pair<Mutation, byte[]>> call() throws IOException {
-          return delegate.getIndexUpdate(m, indexMetaData);
-        }
-
-      });
-    }
-    List<Collection<Pair<Mutation, byte[]>>> allResults = null;
-    try {
-      allResults = pool.submitUninterruptible(tasks);
-    } catch (CancellationException e) {
-      throw e;
-    } catch (ExecutionException e) {
-      LOG.error("Found a failed index update!");
-      throw e.getCause();
+    // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+    ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+    for (Mutation m : mutations) {
+      results.addAll(delegate.getIndexUpdate(m, indexMetaData));
     }
-
-    // we can only get here if we get successes from each of the tasks, so each of these must have a
-    // correct result
-    Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
-    for (Collection<Pair<Mutation, byte[]>> result : allResults) {
-      assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
-      results.addAll(result);
-    }
-
     return results;
   }
 
@@ -194,7 +129,6 @@ public class IndexBuildManager implements Stoppable {
     }
     this.stopped = true;
     this.delegate.stop(why);
-    this.pool.stop(why);
   }
 
   @Override
@@ -206,4 +140,4 @@ public class IndexBuildManager implements Stoppable {
     return this.delegate;
   }
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 245bd66..acbf1ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -18,7 +18,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.util.Pair;
@@ -84,6 +86,24 @@ public class LocalTableState implements TableState {
         }
     }
 
+    private void addUpdateCells(List<Cell> list, boolean overwrite) {
+        if (list == null) return;
+        // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
+        for (Cell c : list) {
+            this.memstore.add(maybeCopyCell(c), overwrite);
+        }
+    }
+
+    private KeyValue maybeCopyCell(Cell c) {
+        // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
+        // that will likely be removed at some point in time.
+        if (c == null) return null;
+        if (c instanceof KeyValue) {
+            return (KeyValue) c;
+        }
+        return KeyValueUtil.copyToNewKeyValue(c);
+    }
+
     @Override
     public RegionCoprocessorEnvironment getEnvironment() {
         return this.env;
@@ -176,8 +196,8 @@ public class LocalTableState implements TableState {
         // no need to perform scan to find prior row values when the indexed columns are immutable, as
         // by definition, there won't be any.
         if (!indexMetaData.isImmutableRows()) {
-            // add the current state of the row
-            this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
+            // add the current state of the row. Uses listCells() to avoid a new array creation.
+            this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
         }
 
         // add the covered columns to the set

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 5963f2e..1392906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -61,7 +61,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
 
     @Override
     public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) {
-        List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+        List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
         for (ColumnGroup group : groups) {
             IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData);
             updates.add(update);
@@ -115,7 +115,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
 
     @Override
     public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
-        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+        List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
         for (ColumnGroup group : groups) {
             deletes.add(getDeleteForGroup(group, state, context));
         }
@@ -238,9 +238,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
      *            to use when building the key
      */
     static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+        final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
         // now build up expected row key, each of the values, in order, followed by the PK and then some
         // info about lengths so we can deserialize each value
-        byte[] output = new byte[length + pk.length];
+        //
+        // output = length of values + primary key + column entries + length of each column entry + number of column entries
+        byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT];
         int pos = 0;
         int[] lengths = new int[values.size()];
         int i = 0;
@@ -256,14 +259,22 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
 
         // add the primary key to the end of the row key
         System.arraycopy(pk, 0, output, pos, pk.length);
+        pos += pk.length;
 
         // add the lengths as suffixes so we can deserialize the elements again
         for (int l : lengths) {
-            output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+            byte[] serializedLength = Bytes.toBytes(l);
+            System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT);
+            pos += Bytes.SIZEOF_INT;
         }
 
         // and the last integer is the number of values
-        return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
+        byte[] serializedNumValues = Bytes.toBytes(values.size());
+        System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT);
+        // Just in case we serialize more in the rowkey in the future..
+        pos += Bytes.SIZEOF_INT;
+
+        return output;
     }
 
     /**