You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/17 23:20:00 UTC
git commit: PHOENIX-1166 Avoid HTable creation in coprocessors to
write into local index table (JeffreyZ)
Repository: phoenix
Updated Branches:
refs/heads/4.0 eb902f0a7 -> a33f4c75e
PHOENIX-1166 Avoid HTable creation in coprocessors to write into local index table (JeffreyZ)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a33f4c75
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a33f4c75
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a33f4c75
Branch: refs/heads/4.0
Commit: a33f4c75e49859fa992dee17eee8796a71058e15
Parents: eb902f0
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Aug 17 14:23:04 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Aug 17 14:23:40 2014 -0700
----------------------------------------------------------------------
.../index/table/CoprocessorHTableFactory.java | 12 -
.../write/ParallelWriterIndexCommitter.java | 345 ++++++++++---------
.../TrackingParallelWriterIndexCommitter.java | 345 ++++++++++---------
3 files changed, 355 insertions(+), 347 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a33f4c75/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index 907eb3d..72a28be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -33,11 +33,6 @@ import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
public class CoprocessorHTableFactory implements HTableFactory {
- /** Number of milliseconds per-interval to retry zookeeper */
- private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL =
- "zookeeper.recovery.retry.intervalmill";
- /** Number of retries for zookeeper */
- private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
private CoprocessorEnvironment e;
@@ -48,13 +43,6 @@ public class CoprocessorHTableFactory implements HTableFactory {
@Override
public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
Configuration conf = e.getConfiguration();
- // make sure writers fail fast
- IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
- IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
- IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
- IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
- IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
- IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
// make sure we use the index priority writer for our rpcs
IndexQosCompat.setPhoenixIndexRpcController(conf);
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a33f4c75/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index bdb927d..f72dec0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -1,19 +1,11 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.write;
@@ -33,6 +25,7 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
@@ -44,179 +37,197 @@ import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
import org.apache.phoenix.hbase.index.table.HTableFactory;
import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import com.google.common.collect.Multimap;
/**
- * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
- * any of the index updates fails. Completion is determined by the following criteria: *
+ * Write index updates to the index tables in parallel. We attempt to early exit from the writes if any of the index
+ * updates fails. Completion is determined by the following criteria: *
* <ol>
* <li>All index writes have returned, OR</li>
* <li>Any single index write has failed</li>
* </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
- * ensure a timely recovery of the failed index writes.
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely
+ * recovery of the failed index writes.
*/
public class ParallelWriterIndexCommitter implements IndexCommitter {
- public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.writer.threads.keepalivetime";
- private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
-
- private HTableFactory factory;
- private Stoppable stopped;
- private QuickFailingTaskRunner pool;
- private KeyValueBuilder kvBuilder;
-
- public ParallelWriterIndexCommitter() {
- }
-
- // For testing
- public ParallelWriterIndexCommitter(String hbaseVersion) {
- kvBuilder = KeyValueBuilder.get(hbaseVersion);
- }
-
- @Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
- Configuration conf = env.getConfiguration();
- setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
- ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder(name, conf).
- setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
- setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
- env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
- this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
- }
-
- /**
- * Setup <tt>this</tt>.
- * <p>
- * Exposed for TESTING
- */
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
- int cacheSize) {
- this.factory = new CachingHTableFactory(factory, cacheSize);
- this.pool = new QuickFailingTaskRunner(pool);
- this.stopped = stop;
- }
-
- @Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws SingleIndexWriteFailureException {
- /*
- * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
- * writes in parallel to each index table, so each table gets its own task and is submitted to
- * the pool. Where it gets tricky is that we want to block the calling thread until one of two
- * things happens: (1) all index tables get successfully updated, or (2) any one of the index
- * table writes fail; in either case, we should return as quickly as possible. We get a little
- * more complicated in that if we do get a single failure, but any of the index writes hasn't
- * been started yet (its been queued up, but not submitted to a thread) we want to that task to
- * fail immediately as we know that write is a waste and will need to be replayed anyways.
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+ private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+
+ private HTableFactory factory;
+ private Stoppable stopped;
+ private QuickFailingTaskRunner pool;
+ private KeyValueBuilder kvBuilder;
+ private RegionCoprocessorEnvironment env;
+
+ public ParallelWriterIndexCommitter() {}
+
+ // For testing
+ public ParallelWriterIndexCommitter(String hbaseVersion) {
+ kvBuilder = KeyValueBuilder.get(hbaseVersion);
+ }
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ this.env = env;
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent,
+ CachingHTableFactory.getCacheSize(conf));
+ this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
*/
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
+ this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.pool = new QuickFailingTaskRunner(pool);
+ this.stopped = stop;
+ }
- Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
- for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
- // get the mutations for each table. We leak the implementation here a little bit to save
- // doing a complete copy over of all the index update for each table.
- final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>) entry.getValue());
- final HTableInterfaceReference tableReference = entry.getKey();
- /*
- * Write a batch of index updates to an index table. This operation stops (is cancelable) via
- * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
- * running thread. The former will only work if we are not in the midst of writing the current
- * batch to the table, though we do check these status variables before starting and before
- * writing the batch. The latter usage, interrupting the thread, will work in the previous
- * situations as was at some points while writing the batch, depending on the underlying
- * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
- * supports an interrupt).
- */
- tasks.add(new Task<Void>() {
-
- /**
- * Do the actual write to the primary table. We don't need to worry about closing the table
- * because that is handled the {@link CachingHTableFactory}.
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException {
+ /*
+ * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
+ * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
+ * tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get
+ * successfully updated, or (2) any one of the index table writes fail; in either case, we should return as
+ * quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the
+ * index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task
+ * to fail immediately as we know that write is a waste and will need to be replayed anyways.
*/
- @SuppressWarnings("deprecation")
- @Override
- public Void call() throws Exception {
- // this may have been queued, so another task infront of us may have failed, so we should
- // early exit, if that's the case
- throwFailureIfDone();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
- }
- try {
- HTableInterface table = factory.getTable(tableReference.get());
- throwFailureIfDone();
- table.batch(mutations);
- } catch (SingleIndexWriteFailureException e) {
- throw e;
- } catch (IOException e) {
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
- } catch (InterruptedException e) {
- // reset the interrupt status on the thread
- Thread.currentThread().interrupt();
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
- }
- return null;
- }
- private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
- throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null);
- }
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
+ final HTableInterfaceReference tableReference = entry.getKey();
+ final RegionCoprocessorEnvironment env = this.env;
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
+ * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
+ * The former will only work if we are not in the midst of writing the current batch to the table, though we
+ * do check these status variables before starting and before writing the batch. The latter usage,
+ * interrupting the thread, will work in the previous situations as was at some points while writing the
+ * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
+ * elaborate when is supports an interrupt).
+ */
+ tasks.add(new Task<Void>() {
+
+ /**
+ * Do the actual write to the primary table. We don't need to worry about closing the table because that
+ * is handled the {@link CachingHTableFactory}.
+ *
+ * @return
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public Void call() throws Exception {
+ // this may have been queued, so another task infront of us may have failed, so we should
+ // early exit, if that's the case
+ throwFailureIfDone();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+ try {
+ // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
+ // Also, checking the prefix of the table name to determine if this is a local
+ // index is pretty hacky. If we're going to keep this, we should revisit that
+ // as well.
+ try {
+ if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
+ HRegion indexRegion = IndexUtil.getIndexRegion(env);
+ if (indexRegion != null) {
+ throwFailureIfDone();
+ indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
+ return null;
+ }
+ }
+ } catch (IOException ignord) {
+ // when it's failed we fall back to the standard & slow way
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+ + ignord);
+ }
+ }
+ HTableInterface table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (SingleIndexWriteFailureException e) {
+ throw e;
+ } catch (IOException e) {
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+ }
+ return null;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null); }
+
+ }
+ });
+ }
+ // actually submit the tasks to the pool and wait for them to finish/fail
+ try {
+ pool.submitUninterruptible(tasks);
+ } catch (EarlyExitFailure e) {
+ propagateFailure(e);
+ } catch (ExecutionException e) {
+ LOG.error("Found a failed index update!");
+ propagateFailure(e.getCause());
}
- });
- }
- // actually submit the tasks to the pool and wait for them to finish/fail
- try {
- pool.submitUninterruptible(tasks);
- } catch (EarlyExitFailure e) {
- propagateFailure(e);
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- propagateFailure(e.getCause());
}
- }
+ private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+ try {
+ throw throwable;
+ } catch (SingleIndexWriteFailureException e1) {
+ throw e1;
+ } catch (Throwable e1) {
+ throw new SingleIndexWriteFailureException("Got an abort notification while writing to the index!", e1);
+ }
+
+ }
- private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
- try {
- throw throwable;
- } catch (SingleIndexWriteFailureException e1) {
- throw e1;
- } catch (Throwable e1) {
- throw new SingleIndexWriteFailureException(
- "Got an abort notification while writing to the index!", e1);
+ /**
+ * {@inheritDoc}
+ * <p>
+ * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed by the external
+ * {@link Stoppable}. This call does not delegate the stop down to the {@link Stoppable} passed in the constructor.
+ *
+ * @param why
+ * the reason for stopping
+ */
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+ this.pool.stop(why);
+ this.factory.shutdown();
}
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
- * by the external {@link Stoppable}. This call does not delegate the stop down to the
- * {@link Stoppable} passed in the constructor.
- * @param why the reason for stopping
- */
- @Override
- public void stop(String why) {
- LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
- this.pool.stop(why);
- this.factory.shutdown();
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.isStopped();
- }
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/a33f4c75/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index 7f4fc9d..9a61191 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -1,22 +1,15 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
*/
package org.apache.phoenix.hbase.index.write.recovery;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -34,6 +27,7 @@ import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.phoenix.hbase.index.CapturingAbortable;
import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
@@ -51,177 +45,192 @@ import org.apache.phoenix.hbase.index.write.IndexCommitter;
import org.apache.phoenix.hbase.index.write.IndexWriter;
import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
import com.google.common.collect.Multimap;
/**
- * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
- * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
- * be a lot slower, in the face of failures, when compared to the
- * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
- * you need to at least attempt all writes and know their result; for instance, this is fine for
- * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
- * edits we need to retry.
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to
+ * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures,
+ * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery -
+ * it's not a performance intensive situation and we want to limit the the edits we need to retry.
* <p>
- * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
- * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of
+ * {@link HTableInterfaceReference} that didn't complete successfully.
* <p>
* Failures to write to the index can happen several different ways:
* <ol>
- * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
- * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
- * that have not begun are not even attempted and marked as failures.</li>
- * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
- * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
- * exceptions.</li>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any
+ * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even
+ * attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not
+ * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li>
* </ol>
- * Regardless of how the write fails, we still wait for all writes to complete before passing the
- * failure back to the client.
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the
+ * client.
*/
public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
- private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
-
- public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.trackingwriter.threads.keepalivetime";
-
- private TaskRunner pool;
- private HTableFactory factory;
- private CapturingAbortable abortable;
- private Stoppable stopped;
-
- @Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
- Configuration conf = env.getConfiguration();
- setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
- ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder(name, conf).
- setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
- setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
- env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
- }
-
- /**
- * Setup <tt>this</tt>.
- * <p>
- * Exposed for TESTING
- */
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
- int cacheSize) {
- this.pool = new WaitForCompletionTaskRunner(pool);
- this.factory = new CachingHTableFactory(factory, cacheSize);
- this.abortable = new CapturingAbortable(abortable);
- this.stopped = stop;
- }
-
- @Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws MultiIndexWriteFailureException {
- Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
- List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
- for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
- // get the mutations for each table. We leak the implementation here a little bit to save
- // doing a complete copy over of all the index update for each table.
- final List<Mutation> mutations = (List<Mutation>) entry.getValue();
- // track each reference so we can get at it easily later, when determing failures
- final HTableInterfaceReference tableReference = entry.getKey();
- tables.add(tableReference);
-
- /*
- * Write a batch of index updates to an index table. This operation stops (is cancelable) via
- * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
- * running thread. The former will only work if we are not in the midst of writing the current
- * batch to the table, though we do check these status variables before starting and before
- * writing the batch. The latter usage, interrupting the thread, will work in the previous
- * situations as was at some points while writing the batch, depending on the underlying
- * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
- * supports an interrupt).
- */
- tasks.add(new Task<Boolean>() {
-
- /**
- * Do the actual write to the primary table. We don't need to worry about closing the table
- * because that is handled the {@link CachingHTableFactory}.
- */
- @SuppressWarnings("deprecation")
- @Override
- public Boolean call() throws Exception {
- try {
- // this may have been queued, but there was an abort/stop so we try to early exit
- throwFailureIfDone();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
- }
- HTableInterface table = factory.getTable(tableReference.get());
- throwFailureIfDone();
- table.batch(mutations);
- } catch (InterruptedException e) {
- // reset the interrupt status on the thread
- Thread.currentThread().interrupt();
- throw e;
- } catch (Exception e) {
- throw e;
- }
- return Boolean.TRUE;
+ private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+ public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
+ private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+ private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.trackingwriter.threads.keepalivetime";
+
+ private TaskRunner pool;
+ private HTableFactory factory;
+ private CapturingAbortable abortable;
+ private Stoppable stopped;
+ private RegionCoprocessorEnvironment env;
+
+ @Override
+ public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+ this.env = env;
+ Configuration conf = env.getConfiguration();
+ setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+ ThreadPoolManager.getExecutor(
+ new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+ DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+ INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent,
+ CachingHTableFactory.getCacheSize(conf));
+ }
+
+ /**
+ * Setup <tt>this</tt>.
+ * <p>
+ * Exposed for TESTING
+ */
+ void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
+ this.pool = new WaitForCompletionTaskRunner(pool);
+ this.factory = new CachingHTableFactory(factory, cacheSize);
+ this.abortable = new CapturingAbortable(abortable);
+ this.stopped = stop;
+ }
+
+ @Override
+ public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws MultiIndexWriteFailureException {
+ Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+ TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+ List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+ for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+ // get the mutations for each table. We leak the implementation here a little bit to save
+ // doing a complete copy over of all the index update for each table.
+ final List<Mutation> mutations = (List<Mutation>)entry.getValue();
+ // track each reference so we can get at it easily later, when determing failures
+ final HTableInterfaceReference tableReference = entry.getKey();
+ final RegionCoprocessorEnvironment env = this.env;
+ tables.add(tableReference);
+
+ /*
+ * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
+ * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
+ * The former will only work if we are not in the midst of writing the current batch to the table, though we
+ * do check these status variables before starting and before writing the batch. The latter usage,
+ * interrupting the thread, will work in the previous situations as was at some points while writing the
+ * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
+ * elaborate when is supports an interrupt).
+ */
+ tasks.add(new Task<Boolean>() {
+
+ /**
+ * Do the actual write to the primary table. We don't need to worry about closing the table because that
+ * is handled the {@link CachingHTableFactory}.
+ */
+ @SuppressWarnings("deprecation")
+ @Override
+ public Boolean call() throws Exception {
+ try {
+ // this may have been queued, but there was an abort/stop so we try to early exit
+ throwFailureIfDone();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+ }
+
+ try {
+ // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
+ // Also, checking the prefix of the table name to determine if this is a local
+ // index is pretty hacky. If we're going to keep this, we should revisit that
+ // as well.
+ if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
+ HRegion indexRegion = IndexUtil.getIndexRegion(env);
+ if (indexRegion != null) {
+ throwFailureIfDone();
+ indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
+ return null;
+ }
+ }
+ } catch (IOException ignord) {
+ // when it's failed we fall back to the standard & slow way
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+ + ignord);
+ }
+ }
+
+ HTableInterface table = factory.getTable(tableReference.get());
+ throwFailureIfDone();
+ table.batch(mutations);
+ } catch (InterruptedException e) {
+ // reset the interrupt status on the thread
+ Thread.currentThread().interrupt();
+ throw e;
+ } catch (Exception e) {
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+
+ private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+ if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+ "Pool closed, not attempting to write to the index!", null); }
+
+ }
+ });
}
- private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (stopped.isStopped() || abortable.isAborted()
- || Thread.currentThread().isInterrupted()) {
- throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null);
- }
+ List<Boolean> results = null;
+ try {
+ LOG.debug("Waiting on index update tasks to complete...");
+ results = this.pool.submitUninterruptible(tasks);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+ } catch (EarlyExitFailure e) {
+ throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+ }
+ // track the failures. We only ever access this on return from our calls, so no extra
+ // synchronization is needed. We could update all the failures as we find them, but that add a
+ // lot of locking overhead, and just doing the copy later is about as efficient.
+ List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+ int index = 0;
+ for (Boolean result : results) {
+ // there was a failure
+ if (result == null) {
+ // we know which table failed by the index of the result
+ failures.add(tables.get(index));
+ }
+ index++;
}
- });
- }
- List<Boolean> results = null;
- try {
- LOG.debug("Waiting on index update tasks to complete...");
- results = this.pool.submitUninterruptible(tasks);
- } catch (ExecutionException e) {
- throw new RuntimeException(
- "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
- } catch (EarlyExitFailure e) {
- throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+ // if any of the tasks failed, then we need to propagate the failure
+ if (failures.size() > 0) {
+ // make the list unmodifiable to avoid any more synchronization concerns
+ throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+ }
+ return;
}
-
- // track the failures. We only ever access this on return from our calls, so no extra
- // synchronization is needed. We could update all the failures as we find them, but that add a
- // lot of locking overhead, and just doing the copy later is about as efficient.
- List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
- int index = 0;
- for (Boolean result : results) {
- // there was a failure
- if (result == null) {
- // we know which table failed by the index of the result
- failures.add(tables.get(index));
- }
- index++;
+
+ @Override
+ public void stop(String why) {
+ LOG.info("Shutting down " + this.getClass().getSimpleName());
+ this.pool.stop(why);
+ this.factory.shutdown();
}
- // if any of the tasks failed, then we need to propagate the failure
- if (failures.size() > 0) {
- // make the list unmodifiable to avoid any more synchronization concerns
- throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+ @Override
+ public boolean isStopped() {
+ return this.stopped.isStopped();
}
- return;
- }
-
- @Override
- public void stop(String why) {
- LOG.info("Shutting down " + this.getClass().getSimpleName());
- this.pool.stop(why);
- this.factory.shutdown();
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.isStopped();
- }
}
\ No newline at end of file