You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by el...@apache.org on 2017/07/21 21:51:56 UTC
[1/4] phoenix git commit: PHOENIX-4004 Remove unnecessary allocations
in server-side mutable secondary-index path
Repository: phoenix
Updated Branches:
refs/heads/4.x-HBase-0.98 e3c63614a -> 771f766eb
refs/heads/4.x-HBase-1.1 c2a738961 -> ce71efc9f
refs/heads/4.x-HBase-1.2 6dea01173 -> 6b6bb7751
refs/heads/master ca1105630 -> 87976eb6f
PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/87976eb6
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/87976eb6
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/87976eb6
Branch: refs/heads/master
Commit: 87976eb6f9892ab8c20f38db5e1cef3934ec2b89
Parents: ca11056
Author: Josh Elser <el...@apache.org>
Authored: Fri Jul 7 16:40:27 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jul 21 17:22:15 2017 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 4 +-
.../hbase/index/builder/IndexBuildManager.java | 78 ++------------------
.../hbase/index/covered/LocalTableState.java | 24 +++++-
.../example/CoveredColumnIndexCodec.java | 21 ++++--
4 files changed, 47 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/87976eb6/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 2d923e9..ea5bf4f 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -605,7 +605,9 @@ public class Indexer extends BaseRegionObserver {
* @return the mutations to apply to the index tables
*/
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
- Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+ // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
+ int initialSize = Math.min(edit.size(), 64);
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/87976eb6/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 325904d..c015a77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -45,35 +45,14 @@ import com.google.common.util.concurrent.MoreExecutors;
/**
* Manage the building of index updates from primary table updates.
- * <p>
- * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
- * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
*/
public class IndexBuildManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
private final IndexBuilder delegate;
- private QuickFailingTaskRunner pool;
private boolean stopped;
/**
- * Set the number of threads with which we can concurrently build index updates. Unused threads
- * will be released, but setting the number of threads too high could cause frequent swapping and
- * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
- * of time building index updates, it could be worthwhile to spend the time to tune this parameter
- * as it could lead to dramatic increases in speed.
- */
- public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
- /** Default to a single thread. This is the safest course of action, but the slowest as well */
- private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
- /**
- * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
- * threads and will re-create them as needed, up to the configured max
- */
- private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.builder.threads.keepalivetime";
-
- /**
* @param env environment in which <tt>this</tt> is running. Used to setup the
* {@link IndexBuilder} and executor
* @throws IOException if an {@link IndexBuilder} cannot be correctly steup
@@ -81,7 +60,7 @@ public class IndexBuildManager implements Stoppable {
public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
// Prevent deadlock by using single thread for all reads so that we know
// we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
- this(getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor()));
+ this.delegate = getIndexBuilder(env);
}
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -101,20 +80,6 @@ public class IndexBuildManager implements Stoppable {
}
}
- private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
- String serverName = env.getRegionServerServices().getServerName().getServerName();
- return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
- setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
- setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
- }
-
- public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
- this.delegate = builder;
- this.pool = pool;
- }
-
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
@@ -122,41 +87,11 @@ public class IndexBuildManager implements Stoppable {
final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
this.delegate.batchStarted(miniBatchOp, indexMetaData);
- // parallelize each mutation into its own task
- // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
- // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
- // acknowledged on each thread before doing the actual lookup, but after that depends on the
- // underlying builder to look for the closed flag.
- TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
- new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
- for (final Mutation m : mutations) {
- tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
-
- @Override
- public Collection<Pair<Mutation, byte[]>> call() throws IOException {
- return delegate.getIndexUpdate(m, indexMetaData);
- }
-
- });
- }
- List<Collection<Pair<Mutation, byte[]>>> allResults = null;
- try {
- allResults = pool.submitUninterruptible(tasks);
- } catch (CancellationException e) {
- throw e;
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- throw e.getCause();
+ // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+ ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+ for (Mutation m : mutations) {
+ results.addAll(delegate.getIndexUpdate(m, indexMetaData));
}
-
- // we can only get here if we get successes from each of the tasks, so each of these must have a
- // correct result
- Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
- for (Collection<Pair<Mutation, byte[]>> result : allResults) {
- assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
- results.addAll(result);
- }
-
return results;
}
@@ -194,7 +129,6 @@ public class IndexBuildManager implements Stoppable {
}
this.stopped = true;
this.delegate.stop(why);
- this.pool.stop(why);
}
@Override
@@ -206,4 +140,4 @@ public class IndexBuildManager implements Stoppable {
return this.delegate;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/87976eb6/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 245bd66..acbf1ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -18,7 +18,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
@@ -84,6 +86,24 @@ public class LocalTableState implements TableState {
}
}
+ private void addUpdateCells(List<Cell> list, boolean overwrite) {
+ if (list == null) return;
+ // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
+ for (Cell c : list) {
+ this.memstore.add(maybeCopyCell(c), overwrite);
+ }
+ }
+
+ private KeyValue maybeCopyCell(Cell c) {
+ // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
+ // that will likely be removed at some point in time.
+ if (c == null) return null;
+ if (c instanceof KeyValue) {
+ return (KeyValue) c;
+ }
+ return KeyValueUtil.copyToNewKeyValue(c);
+ }
+
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return this.env;
@@ -176,8 +196,8 @@ public class LocalTableState implements TableState {
// no need to perform scan to find prior row values when the indexed columns are immutable, as
// by definition, there won't be any.
if (!indexMetaData.isImmutableRows()) {
- // add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
+ // add the current state of the row. Uses listCells() to avoid a new array creation.
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
}
// add the covered columns to the set
http://git-wip-us.apache.org/repos/asf/phoenix/blob/87976eb6/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 5963f2e..1392906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -61,7 +61,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) {
- List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData);
updates.add(update);
@@ -115,7 +115,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
- List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
deletes.add(getDeleteForGroup(group, state, context));
}
@@ -238,9 +238,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
* to use when building the key
*/
static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+ final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
// now build up expected row key, each of the values, in order, followed by the PK and then some
// info about lengths so we can deserialize each value
- byte[] output = new byte[length + pk.length];
+ //
+ // output = length of values + primary key + column entries + length of each column entry + number of column entries
+ byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT];
int pos = 0;
int[] lengths = new int[values.size()];
int i = 0;
@@ -256,14 +259,22 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
// add the primary key to the end of the row key
System.arraycopy(pk, 0, output, pos, pk.length);
+ pos += pk.length;
// add the lengths as suffixes so we can deserialize the elements again
for (int l : lengths) {
- output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+ byte[] serializedLength = Bytes.toBytes(l);
+ System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT);
+ pos += Bytes.SIZEOF_INT;
}
// and the last integer is the number of values
- return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
+ byte[] serializedNumValues = Bytes.toBytes(values.size());
+ System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT);
+ // Just in case we serialize more in the rowkey in the future..
+ pos += Bytes.SIZEOF_INT;
+
+ return output;
}
/**
[2/4] phoenix git commit: PHOENIX-4004 Remove unnecessary allocations
in server-side mutable secondary-index path
Posted by el...@apache.org.
PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/6b6bb775
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/6b6bb775
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/6b6bb775
Branch: refs/heads/4.x-HBase-1.2
Commit: 6b6bb775194edf931609bcd54ed1188b54c52e3e
Parents: 6dea011
Author: Josh Elser <el...@apache.org>
Authored: Fri Jul 7 16:40:27 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jul 21 17:29:25 2017 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 4 +-
.../hbase/index/builder/IndexBuildManager.java | 78 ++------------------
.../hbase/index/covered/LocalTableState.java | 24 +++++-
.../example/CoveredColumnIndexCodec.java | 21 ++++--
4 files changed, 47 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b6bb775/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 5a78c94..38401d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -610,7 +610,9 @@ public class Indexer extends BaseRegionObserver {
* @return the mutations to apply to the index tables
*/
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
- Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+ // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
+ int initialSize = Math.min(edit.size(), 64);
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b6bb775/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 325904d..c015a77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -45,35 +45,14 @@ import com.google.common.util.concurrent.MoreExecutors;
/**
* Manage the building of index updates from primary table updates.
- * <p>
- * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
- * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
*/
public class IndexBuildManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
private final IndexBuilder delegate;
- private QuickFailingTaskRunner pool;
private boolean stopped;
/**
- * Set the number of threads with which we can concurrently build index updates. Unused threads
- * will be released, but setting the number of threads too high could cause frequent swapping and
- * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
- * of time building index updates, it could be worthwhile to spend the time to tune this parameter
- * as it could lead to dramatic increases in speed.
- */
- public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
- /** Default to a single thread. This is the safest course of action, but the slowest as well */
- private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
- /**
- * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
- * threads and will re-create them as needed, up to the configured max
- */
- private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.builder.threads.keepalivetime";
-
- /**
* @param env environment in which <tt>this</tt> is running. Used to setup the
* {@link IndexBuilder} and executor
* @throws IOException if an {@link IndexBuilder} cannot be correctly steup
@@ -81,7 +60,7 @@ public class IndexBuildManager implements Stoppable {
public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
// Prevent deadlock by using single thread for all reads so that we know
// we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
- this(getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor()));
+ this.delegate = getIndexBuilder(env);
}
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -101,20 +80,6 @@ public class IndexBuildManager implements Stoppable {
}
}
- private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
- String serverName = env.getRegionServerServices().getServerName().getServerName();
- return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
- setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
- setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
- }
-
- public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
- this.delegate = builder;
- this.pool = pool;
- }
-
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
@@ -122,41 +87,11 @@ public class IndexBuildManager implements Stoppable {
final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
this.delegate.batchStarted(miniBatchOp, indexMetaData);
- // parallelize each mutation into its own task
- // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
- // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
- // acknowledged on each thread before doing the actual lookup, but after that depends on the
- // underlying builder to look for the closed flag.
- TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
- new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
- for (final Mutation m : mutations) {
- tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
-
- @Override
- public Collection<Pair<Mutation, byte[]>> call() throws IOException {
- return delegate.getIndexUpdate(m, indexMetaData);
- }
-
- });
- }
- List<Collection<Pair<Mutation, byte[]>>> allResults = null;
- try {
- allResults = pool.submitUninterruptible(tasks);
- } catch (CancellationException e) {
- throw e;
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- throw e.getCause();
+ // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+ ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+ for (Mutation m : mutations) {
+ results.addAll(delegate.getIndexUpdate(m, indexMetaData));
}
-
- // we can only get here if we get successes from each of the tasks, so each of these must have a
- // correct result
- Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
- for (Collection<Pair<Mutation, byte[]>> result : allResults) {
- assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
- results.addAll(result);
- }
-
return results;
}
@@ -194,7 +129,6 @@ public class IndexBuildManager implements Stoppable {
}
this.stopped = true;
this.delegate.stop(why);
- this.pool.stop(why);
}
@Override
@@ -206,4 +140,4 @@ public class IndexBuildManager implements Stoppable {
return this.delegate;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b6bb775/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 245bd66..acbf1ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -18,7 +18,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
@@ -84,6 +86,24 @@ public class LocalTableState implements TableState {
}
}
+ private void addUpdateCells(List<Cell> list, boolean overwrite) {
+ if (list == null) return;
+ // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
+ for (Cell c : list) {
+ this.memstore.add(maybeCopyCell(c), overwrite);
+ }
+ }
+
+ private KeyValue maybeCopyCell(Cell c) {
+ // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
+ // that will likely be removed at some point in time.
+ if (c == null) return null;
+ if (c instanceof KeyValue) {
+ return (KeyValue) c;
+ }
+ return KeyValueUtil.copyToNewKeyValue(c);
+ }
+
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return this.env;
@@ -176,8 +196,8 @@ public class LocalTableState implements TableState {
// no need to perform scan to find prior row values when the indexed columns are immutable, as
// by definition, there won't be any.
if (!indexMetaData.isImmutableRows()) {
- // add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
+ // add the current state of the row. Uses listCells() to avoid a new array creation.
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
}
// add the covered columns to the set
http://git-wip-us.apache.org/repos/asf/phoenix/blob/6b6bb775/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 5963f2e..1392906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -61,7 +61,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) {
- List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData);
updates.add(update);
@@ -115,7 +115,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
- List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
deletes.add(getDeleteForGroup(group, state, context));
}
@@ -238,9 +238,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
* to use when building the key
*/
static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+ final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
// now build up expected row key, each of the values, in order, followed by the PK and then some
// info about lengths so we can deserialize each value
- byte[] output = new byte[length + pk.length];
+ //
+ // output = length of values + primary key + column entries + length of each column entry + number of column entries
+ byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT];
int pos = 0;
int[] lengths = new int[values.size()];
int i = 0;
@@ -256,14 +259,22 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
// add the primary key to the end of the row key
System.arraycopy(pk, 0, output, pos, pk.length);
+ pos += pk.length;
// add the lengths as suffixes so we can deserialize the elements again
for (int l : lengths) {
- output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+ byte[] serializedLength = Bytes.toBytes(l);
+ System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT);
+ pos += Bytes.SIZEOF_INT;
}
// and the last integer is the number of values
- return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
+ byte[] serializedNumValues = Bytes.toBytes(values.size());
+ System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT);
+ // Just in case we serialize more in the rowkey in the future..
+ pos += Bytes.SIZEOF_INT;
+
+ return output;
}
/**
[4/4] phoenix git commit: PHOENIX-4004 Remove unnecessary allocations
in server-side mutable secondary-index path
Posted by el...@apache.org.
PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/771f766e
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/771f766e
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/771f766e
Branch: refs/heads/4.x-HBase-0.98
Commit: 771f766eb1379dd0ad6822dd894cefbd658c1812
Parents: e3c6361
Author: Josh Elser <el...@apache.org>
Authored: Fri Jul 7 16:40:27 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jul 21 17:42:31 2017 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 4 +-
.../hbase/index/builder/IndexBuildManager.java | 78 ++------------------
.../hbase/index/covered/LocalTableState.java | 24 +++++-
.../example/CoveredColumnIndexCodec.java | 21 ++++--
4 files changed, 47 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/771f766e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index ea99747..27dbff4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -610,7 +610,9 @@ public class Indexer extends BaseRegionObserver {
* @return the mutations to apply to the index tables
*/
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
- Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+ // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
+ int initialSize = Math.min(edit.size(), 64);
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
for (KeyValue kv : edit.getKeyValues()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/771f766e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 325904d..c015a77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -45,35 +45,14 @@ import com.google.common.util.concurrent.MoreExecutors;
/**
* Manage the building of index updates from primary table updates.
- * <p>
- * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
- * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
*/
public class IndexBuildManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
private final IndexBuilder delegate;
- private QuickFailingTaskRunner pool;
private boolean stopped;
/**
- * Set the number of threads with which we can concurrently build index updates. Unused threads
- * will be released, but setting the number of threads too high could cause frequent swapping and
- * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
- * of time building index updates, it could be worthwhile to spend the time to tune this parameter
- * as it could lead to dramatic increases in speed.
- */
- public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
- /** Default to a single thread. This is the safest course of action, but the slowest as well */
- private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
- /**
- * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
- * threads and will re-create them as needed, up to the configured max
- */
- private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.builder.threads.keepalivetime";
-
- /**
* @param env environment in which <tt>this</tt> is running. Used to setup the
* {@link IndexBuilder} and executor
* @throws IOException if an {@link IndexBuilder} cannot be correctly steup
@@ -81,7 +60,7 @@ public class IndexBuildManager implements Stoppable {
public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
// Prevent deadlock by using single thread for all reads so that we know
// we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
- this(getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor()));
+ this.delegate = getIndexBuilder(env);
}
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -101,20 +80,6 @@ public class IndexBuildManager implements Stoppable {
}
}
- private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
- String serverName = env.getRegionServerServices().getServerName().getServerName();
- return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
- setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
- setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
- }
-
- public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
- this.delegate = builder;
- this.pool = pool;
- }
-
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
@@ -122,41 +87,11 @@ public class IndexBuildManager implements Stoppable {
final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
this.delegate.batchStarted(miniBatchOp, indexMetaData);
- // parallelize each mutation into its own task
- // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
- // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
- // acknowledged on each thread before doing the actual lookup, but after that depends on the
- // underlying builder to look for the closed flag.
- TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
- new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
- for (final Mutation m : mutations) {
- tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
-
- @Override
- public Collection<Pair<Mutation, byte[]>> call() throws IOException {
- return delegate.getIndexUpdate(m, indexMetaData);
- }
-
- });
- }
- List<Collection<Pair<Mutation, byte[]>>> allResults = null;
- try {
- allResults = pool.submitUninterruptible(tasks);
- } catch (CancellationException e) {
- throw e;
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- throw e.getCause();
+ // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+ ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+ for (Mutation m : mutations) {
+ results.addAll(delegate.getIndexUpdate(m, indexMetaData));
}
-
- // we can only get here if we get successes from each of the tasks, so each of these must have a
- // correct result
- Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
- for (Collection<Pair<Mutation, byte[]>> result : allResults) {
- assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
- results.addAll(result);
- }
-
return results;
}
@@ -194,7 +129,6 @@ public class IndexBuildManager implements Stoppable {
}
this.stopped = true;
this.delegate.stop(why);
- this.pool.stop(why);
}
@Override
@@ -206,4 +140,4 @@ public class IndexBuildManager implements Stoppable {
return this.delegate;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/771f766e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 245bd66..acbf1ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -18,7 +18,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
@@ -84,6 +86,24 @@ public class LocalTableState implements TableState {
}
}
+ private void addUpdateCells(List<Cell> list, boolean overwrite) {
+ if (list == null) return;
+ // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
+ for (Cell c : list) {
+ this.memstore.add(maybeCopyCell(c), overwrite);
+ }
+ }
+
+ private KeyValue maybeCopyCell(Cell c) {
+ // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
+ // that will likely be removed at some point in time.
+ if (c == null) return null;
+ if (c instanceof KeyValue) {
+ return (KeyValue) c;
+ }
+ return KeyValueUtil.copyToNewKeyValue(c);
+ }
+
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return this.env;
@@ -176,8 +196,8 @@ public class LocalTableState implements TableState {
// no need to perform scan to find prior row values when the indexed columns are immutable, as
// by definition, there won't be any.
if (!indexMetaData.isImmutableRows()) {
- // add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
+ // add the current state of the row. Uses listCells() to avoid a new array creation.
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
}
// add the covered columns to the set
http://git-wip-us.apache.org/repos/asf/phoenix/blob/771f766e/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 5963f2e..1392906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -61,7 +61,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) {
- List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData);
updates.add(update);
@@ -115,7 +115,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
- List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
deletes.add(getDeleteForGroup(group, state, context));
}
@@ -238,9 +238,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
* to use when building the key
*/
static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+ final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
// now build up expected row key, each of the values, in order, followed by the PK and then some
// info about lengths so we can deserialize each value
- byte[] output = new byte[length + pk.length];
+ //
+ // output = length of values + primary key + column entries + length of each column entry + number of column entries
+ byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT];
int pos = 0;
int[] lengths = new int[values.size()];
int i = 0;
@@ -256,14 +259,22 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
// add the primary key to the end of the row key
System.arraycopy(pk, 0, output, pos, pk.length);
+ pos += pk.length;
// add the lengths as suffixes so we can deserialize the elements again
for (int l : lengths) {
- output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+ byte[] serializedLength = Bytes.toBytes(l);
+ System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT);
+ pos += Bytes.SIZEOF_INT;
}
// and the last integer is the number of values
- return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
+ byte[] serializedNumValues = Bytes.toBytes(values.size());
+ System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT);
+ // Just in case we serialize more in the rowkey in the future..
+ pos += Bytes.SIZEOF_INT;
+
+ return output;
}
/**
[3/4] phoenix git commit: PHOENIX-4004 Remove unnecessary allocations
in server-side mutable secondary-index path
Posted by el...@apache.org.
PHOENIX-4004 Remove unnecessary allocations in server-side mutable secondary-index path
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ce71efc9
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ce71efc9
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ce71efc9
Branch: refs/heads/4.x-HBase-1.1
Commit: ce71efc9fe19f03524b24e69ecaa19b0f03846de
Parents: c2a7389
Author: Josh Elser <el...@apache.org>
Authored: Fri Jul 7 16:40:27 2017 -0400
Committer: Josh Elser <el...@apache.org>
Committed: Fri Jul 21 17:35:11 2017 -0400
----------------------------------------------------------------------
.../org/apache/phoenix/hbase/index/Indexer.java | 4 +-
.../hbase/index/builder/IndexBuildManager.java | 78 ++------------------
.../hbase/index/covered/LocalTableState.java | 24 +++++-
.../example/CoveredColumnIndexCodec.java | 21 ++++--
4 files changed, 47 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
index 5a78c94..38401d4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/Indexer.java
@@ -610,7 +610,9 @@ public class Indexer extends BaseRegionObserver {
* @return the mutations to apply to the index tables
*/
private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
- Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+ // Avoid multiple internal array resizings. Initial size of 64, unless we have fewer cells in the edit
+ int initialSize = Math.min(edit.size(), 64);
+ Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>(initialSize);
for (Cell kv : edit.getCells()) {
if (kv instanceof IndexedKeyValue) {
IndexedKeyValue ikv = (IndexedKeyValue) kv;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index 325904d..c015a77 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -45,35 +45,14 @@ import com.google.common.util.concurrent.MoreExecutors;
/**
* Manage the building of index updates from primary table updates.
- * <p>
- * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
- * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
*/
public class IndexBuildManager implements Stoppable {
private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
private final IndexBuilder delegate;
- private QuickFailingTaskRunner pool;
private boolean stopped;
/**
- * Set the number of threads with which we can concurrently build index updates. Unused threads
- * will be released, but setting the number of threads too high could cause frequent swapping and
- * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
- * of time building index updates, it could be worthwhile to spend the time to tune this parameter
- * as it could lead to dramatic increases in speed.
- */
- public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
- /** Default to a single thread. This is the safest course of action, but the slowest as well */
- private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
- /**
- * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
- * threads and will re-create them as needed, up to the configured max
- */
- private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.builder.threads.keepalivetime";
-
- /**
* @param env environment in which <tt>this</tt> is running. Used to setup the
* {@link IndexBuilder} and executor
* @throws IOException if an {@link IndexBuilder} cannot be correctly steup
@@ -81,7 +60,7 @@ public class IndexBuildManager implements Stoppable {
public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
// Prevent deadlock by using single thread for all reads so that we know
// we can get the ReentrantRWLock. See PHOENIX-2671 for more details.
- this(getIndexBuilder(env), new QuickFailingTaskRunner(MoreExecutors.sameThreadExecutor()));
+ this.delegate = getIndexBuilder(env);
}
private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
@@ -101,20 +80,6 @@ public class IndexBuildManager implements Stoppable {
}
}
- private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
- String serverName = env.getRegionServerServices().getServerName().getServerName();
- return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
- setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
- setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
- }
-
- public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
- this.delegate = builder;
- this.pool = pool;
- }
-
-
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
MiniBatchOperationInProgress<Mutation> miniBatchOp,
Collection<? extends Mutation> mutations) throws Throwable {
@@ -122,41 +87,11 @@ public class IndexBuildManager implements Stoppable {
final IndexMetaData indexMetaData = this.delegate.getIndexMetaData(miniBatchOp);
this.delegate.batchStarted(miniBatchOp, indexMetaData);
- // parallelize each mutation into its own task
- // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
- // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
- // acknowledged on each thread before doing the actual lookup, but after that depends on the
- // underlying builder to look for the closed flag.
- TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
- new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
- for (final Mutation m : mutations) {
- tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
-
- @Override
- public Collection<Pair<Mutation, byte[]>> call() throws IOException {
- return delegate.getIndexUpdate(m, indexMetaData);
- }
-
- });
- }
- List<Collection<Pair<Mutation, byte[]>>> allResults = null;
- try {
- allResults = pool.submitUninterruptible(tasks);
- } catch (CancellationException e) {
- throw e;
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- throw e.getCause();
+ // Avoid the Object overhead of the executor when it's not actually parallelizing anything.
+ ArrayList<Pair<Mutation, byte[]>> results = new ArrayList<>(mutations.size());
+ for (Mutation m : mutations) {
+ results.addAll(delegate.getIndexUpdate(m, indexMetaData));
}
-
- // we can only get here if we get successes from each of the tasks, so each of these must have a
- // correct result
- Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
- for (Collection<Pair<Mutation, byte[]>> result : allResults) {
- assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
- results.addAll(result);
- }
-
return results;
}
@@ -194,7 +129,6 @@ public class IndexBuildManager implements Stoppable {
}
this.stopped = true;
this.delegate.stop(why);
- this.pool.stop(why);
}
@Override
@@ -206,4 +140,4 @@ public class IndexBuildManager implements Stoppable {
return this.delegate;
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index 245bd66..acbf1ab 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -18,7 +18,9 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.util.Pair;
@@ -84,6 +86,24 @@ public class LocalTableState implements TableState {
}
}
+ private void addUpdateCells(List<Cell> list, boolean overwrite) {
+ if (list == null) return;
+ // Avoid a copy of the Cell into a KeyValue if it's already a KeyValue
+ for (Cell c : list) {
+ this.memstore.add(maybeCopyCell(c), overwrite);
+ }
+ }
+
+ private KeyValue maybeCopyCell(Cell c) {
+ // Same as KeyValueUtil, but HBase has deprecated this method. Avoid depending on something
+ // that will likely be removed at some point in time.
+ if (c == null) return null;
+ if (c instanceof KeyValue) {
+ return (KeyValue) c;
+ }
+ return KeyValueUtil.copyToNewKeyValue(c);
+ }
+
@Override
public RegionCoprocessorEnvironment getEnvironment() {
return this.env;
@@ -176,8 +196,8 @@ public class LocalTableState implements TableState {
// no need to perform scan to find prior row values when the indexed columns are immutable, as
// by definition, there won't be any.
if (!indexMetaData.isImmutableRows()) {
- // add the current state of the row
- this.addUpdate(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).list(), false);
+ // add the current state of the row. Uses listCells() to avoid a new array creation.
+ this.addUpdateCells(this.table.getCurrentRowState(update, toCover, ignoreNewerMutations).listCells(), false);
}
// add the covered columns to the set
http://git-wip-us.apache.org/repos/asf/phoenix/blob/ce71efc9/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
index 5963f2e..1392906 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/example/CoveredColumnIndexCodec.java
@@ -61,7 +61,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexUpserts(TableState state, IndexMetaData indexMetaData) {
- List<IndexUpdate> updates = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> updates = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
IndexUpdate update = getIndexUpdateForGroup(group, state, indexMetaData);
updates.add(update);
@@ -115,7 +115,7 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
@Override
public Iterable<IndexUpdate> getIndexDeletes(TableState state, IndexMetaData context) {
- List<IndexUpdate> deletes = new ArrayList<IndexUpdate>();
+ List<IndexUpdate> deletes = new ArrayList<IndexUpdate>(groups.size());
for (ColumnGroup group : groups) {
deletes.add(getDeleteForGroup(group, state, context));
}
@@ -238,9 +238,12 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
* to use when building the key
*/
static byte[] composeRowKey(byte[] pk, int length, List<ColumnEntry> values) {
+ final int numColumnEntries = values.size() * Bytes.SIZEOF_INT;
// now build up expected row key, each of the values, in order, followed by the PK and then some
// info about lengths so we can deserialize each value
- byte[] output = new byte[length + pk.length];
+ //
+ // output = length of values + primary key + column entries + length of each column entry + number of column entries
+ byte[] output = new byte[length + pk.length + numColumnEntries + Bytes.SIZEOF_INT];
int pos = 0;
int[] lengths = new int[values.size()];
int i = 0;
@@ -256,14 +259,22 @@ public class CoveredColumnIndexCodec extends BaseIndexCodec {
// add the primary key to the end of the row key
System.arraycopy(pk, 0, output, pos, pk.length);
+ pos += pk.length;
// add the lengths as suffixes so we can deserialize the elements again
for (int l : lengths) {
- output = ArrayUtils.addAll(output, Bytes.toBytes(l));
+ byte[] serializedLength = Bytes.toBytes(l);
+ System.arraycopy(serializedLength, 0, output, pos, Bytes.SIZEOF_INT);
+ pos += Bytes.SIZEOF_INT;
}
// and the last integer is the number of values
- return ArrayUtils.addAll(output, Bytes.toBytes(values.size()));
+ byte[] serializedNumValues = Bytes.toBytes(values.size());
+ System.arraycopy(serializedNumValues, 0, output, pos, Bytes.SIZEOF_INT);
+ // Just in case we serialize more in the rowkey in the future..
+ pos += Bytes.SIZEOF_INT;
+
+ return output;
}
/**