You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/08/17 23:20:00 UTC

git commit: PHOENIX-1166 Avoid HTable creation in coprocessors to write into local index table (JeffreyZ)

Repository: phoenix
Updated Branches:
  refs/heads/4.0 eb902f0a7 -> a33f4c75e


PHOENIX-1166 Avoid HTable creation in coprocessors to write into local index table (JeffreyZ)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/a33f4c75
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/a33f4c75
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/a33f4c75

Branch: refs/heads/4.0
Commit: a33f4c75e49859fa992dee17eee8796a71058e15
Parents: eb902f0
Author: James Taylor <jt...@salesforce.com>
Authored: Sun Aug 17 14:23:04 2014 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Sun Aug 17 14:23:40 2014 -0700

----------------------------------------------------------------------
 .../index/table/CoprocessorHTableFactory.java   |  12 -
 .../write/ParallelWriterIndexCommitter.java     | 345 ++++++++++---------
 .../TrackingParallelWriterIndexCommitter.java   | 345 ++++++++++---------
 3 files changed, 355 insertions(+), 347 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/a33f4c75/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
index 907eb3d..72a28be 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -33,11 +33,6 @@ import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 
 public class CoprocessorHTableFactory implements HTableFactory {
 
-    /** Number of milliseconds per-interval to retry zookeeper */
-    private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL =
-            "zookeeper.recovery.retry.intervalmill";
-    /** Number of retries for zookeeper */
-    private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
     private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
     private CoprocessorEnvironment e;
 
@@ -48,13 +43,6 @@ public class CoprocessorHTableFactory implements HTableFactory {
     @Override
     public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
         Configuration conf = e.getConfiguration();
-        // make sure writers fail fast
-        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
-        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
-        IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
-        IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
-        IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
-        IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
 
         // make sure we use the index priority writer for our rpcs
         IndexQosCompat.setPhoenixIndexRpcController(conf);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a33f4c75/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
index bdb927d..f72dec0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.write;
 
@@ -33,6 +25,7 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
 import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
@@ -44,179 +37,197 @@ import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableFactory;
 import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 
 import com.google.common.collect.Multimap;
 
 /**
- * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
- * any of the index updates fails. Completion is determined by the following criteria: *
+ * Write index updates to the index tables in parallel. We attempt to early exit from the writes if any of the index
+ * updates fails. Completion is determined by the following criteria: *
  * <ol>
  * <li>All index writes have returned, OR</li>
  * <li>Any single index write has failed</li>
  * </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
- * ensure a timely recovery of the failed index writes.
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes to ensure a timely
+ * recovery of the failed index writes.
  */
 public class ParallelWriterIndexCommitter implements IndexCommitter {
 
-  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
-  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
-      "index.writer.threads.keepalivetime";
-  private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
-
-  private HTableFactory factory;
-  private Stoppable stopped;
-  private QuickFailingTaskRunner pool;
-  private KeyValueBuilder kvBuilder;
-
-  public ParallelWriterIndexCommitter() {
-  }
-
-  // For testing
-  public ParallelWriterIndexCommitter(String hbaseVersion) {
-      kvBuilder = KeyValueBuilder.get(hbaseVersion);
-  }
-  
-  @Override
-  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
-    Configuration conf = env.getConfiguration();
-    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
-      ThreadPoolManager.getExecutor(
-        new ThreadPoolBuilder(name, conf).
-          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
-            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
-          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
-      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
-    this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
-  }
-
-  /**
-   * Setup <tt>this</tt>.
-   * <p>
-   * Exposed for TESTING
-   */
-  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
-      int cacheSize) {
-    this.factory = new CachingHTableFactory(factory, cacheSize);
-    this.pool = new QuickFailingTaskRunner(pool);
-    this.stopped = stop;
-  }
-
-  @Override
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws SingleIndexWriteFailureException {
-    /*
-     * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
-     * writes in parallel to each index table, so each table gets its own task and is submitted to
-     * the pool. Where it gets tricky is that we want to block the calling thread until one of two
-     * things happens: (1) all index tables get successfully updated, or (2) any one of the index
-     * table writes fail; in either case, we should return as quickly as possible. We get a little
-     * more complicated in that if we do get a single failure, but any of the index writes hasn't
-     * been started yet (its been queued up, but not submitted to a thread) we want to that task to
-     * fail immediately as we know that write is a waste and will need to be replayed anyways.
+    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+    private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.writer.threads.keepalivetime";
+    private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+
+    private HTableFactory factory;
+    private Stoppable stopped;
+    private QuickFailingTaskRunner pool;
+    private KeyValueBuilder kvBuilder;
+    private RegionCoprocessorEnvironment env;
+
+    public ParallelWriterIndexCommitter() {}
+
+    // For testing
+    public ParallelWriterIndexCommitter(String hbaseVersion) {
+        kvBuilder = KeyValueBuilder.get(hbaseVersion);
+    }
+
+    @Override
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+        this.env = env;
+        Configuration conf = env.getConfiguration();
+        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+                ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+                                DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent,
+                CachingHTableFactory.getCacheSize(conf));
+        this.kvBuilder = KeyValueBuilder.get(env.getHBaseVersion());
+    }
+
+    /**
+     * Setup <tt>this</tt>.
+     * <p>
+     * Exposed for TESTING
      */
+    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
+        this.factory = new CachingHTableFactory(factory, cacheSize);
+        this.pool = new QuickFailingTaskRunner(pool);
+        this.stopped = stop;
+    }
 
-    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-    TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
-    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
-      // get the mutations for each table. We leak the implementation here a little bit to save
-      // doing a complete copy over of all the index update for each table.
-      final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>) entry.getValue());
-      final HTableInterfaceReference tableReference = entry.getKey();
-      /*
-       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
-       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
-       * running thread. The former will only work if we are not in the midst of writing the current
-       * batch to the table, though we do check these status variables before starting and before
-       * writing the batch. The latter usage, interrupting the thread, will work in the previous
-       * situations as was at some points while writing the batch, depending on the underlying
-       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
-       * supports an interrupt).
-       */
-      tasks.add(new Task<Void>() {
-
-        /**
-         * Do the actual write to the primary table. We don't need to worry about closing the table
-         * because that is handled the {@link CachingHTableFactory}.
+    @Override
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws SingleIndexWriteFailureException {
+        /*
+         * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the writes in
+         * parallel to each index table, so each table gets its own task and is submitted to the pool. Where it gets
+         * tricky is that we want to block the calling thread until one of two things happens: (1) all index tables get
+         * successfully updated, or (2) any one of the index table writes fail; in either case, we should return as
+         * quickly as possible. We get a little more complicated in that if we do get a single failure, but any of the
+         * index writes hasn't been started yet (its been queued up, but not submitted to a thread) we want to that task
+         * to fail immediately as we know that write is a waste and will need to be replayed anyways.
          */
-        @SuppressWarnings("deprecation")
-        @Override
-        public Void call() throws Exception {
-          // this may have been queued, so another task infront of us may have failed, so we should
-          // early exit, if that's the case
-          throwFailureIfDone();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
-          }
-          try {
-            HTableInterface table = factory.getTable(tableReference.get());
-            throwFailureIfDone();
-            table.batch(mutations);
-          } catch (SingleIndexWriteFailureException e) {
-            throw e;
-          } catch (IOException e) {
-            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
-          } catch (InterruptedException e) {
-            // reset the interrupt status on the thread
-            Thread.currentThread().interrupt();
-            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
-          }
-          return null;
-        }
 
-        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-          if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
-            throw new SingleIndexWriteFailureException(
-                "Pool closed, not attempting to write to the index!", null);
-          }
+        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+        TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+            // get the mutations for each table. We leak the implementation here a little bit to save
+            // doing a complete copy over of all the index update for each table.
+            final List<Mutation> mutations = kvBuilder.cloneIfNecessary((List<Mutation>)entry.getValue());
+            final HTableInterfaceReference tableReference = entry.getKey();
+            final RegionCoprocessorEnvironment env = this.env;
+            /*
+             * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
+             * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
+             * The former will only work if we are not in the midst of writing the current batch to the table, though we
+             * do check these status variables before starting and before writing the batch. The latter usage,
+             * interrupting the thread, will work in the previous situations as was at some points while writing the
+             * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
+             * elaborate when is supports an interrupt).
+             */
+            tasks.add(new Task<Void>() {
+
+                /**
+                 * Do the actual write to the primary table. We don't need to worry about closing the table because that
+                 * is handled the {@link CachingHTableFactory}.
+                 * 
+                 * @return
+                 */
+                @SuppressWarnings("deprecation")
+                @Override
+                public Void call() throws Exception {
+                    // this may have been queued, so another task infront of us may have failed, so we should
+                    // early exit, if that's the case
+                    throwFailureIfDone();
+
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+                    }
+                    try {
+                        // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
+                        // Also, checking the prefix of the table name to determine if this is a local
+                        // index is pretty hacky. If we're going to keep this, we should revisit that
+                        // as well.
+                        try {
+                            if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
+                                HRegion indexRegion = IndexUtil.getIndexRegion(env);
+                                if (indexRegion != null) {
+                                    throwFailureIfDone();
+                                    indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
+                                    return null;
+                                }
+                            }
+                        } catch (IOException ignord) {
+                            // when it's failed we fall back to the standard & slow way
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+                                        + ignord);
+                            }
+                        }
+                        HTableInterface table = factory.getTable(tableReference.get());
+                        throwFailureIfDone();
+                        table.batch(mutations);
+                    } catch (SingleIndexWriteFailureException e) {
+                        throw e;
+                    } catch (IOException e) {
+                        throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+                    } catch (InterruptedException e) {
+                        // reset the interrupt status on the thread
+                        Thread.currentThread().interrupt();
+                        throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+                    }
+                    return null;
+                }
+
+                private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+                    if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+                            "Pool closed, not attempting to write to the index!", null); }
+
+                }
+            });
+        }
 
+        // actually submit the tasks to the pool and wait for them to finish/fail
+        try {
+            pool.submitUninterruptible(tasks);
+        } catch (EarlyExitFailure e) {
+            propagateFailure(e);
+        } catch (ExecutionException e) {
+            LOG.error("Found a failed index update!");
+            propagateFailure(e.getCause());
         }
-      });
-    }
 
-    // actually submit the tasks to the pool and wait for them to finish/fail
-    try {
-      pool.submitUninterruptible(tasks);
-    } catch (EarlyExitFailure e) {
-      propagateFailure(e);
-    } catch (ExecutionException e) {
-      LOG.error("Found a failed index update!");
-      propagateFailure(e.getCause());
     }
 
-  }
+    private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+        try {
+            throw throwable;
+        } catch (SingleIndexWriteFailureException e1) {
+            throw e1;
+        } catch (Throwable e1) {
+            throw new SingleIndexWriteFailureException("Got an abort notification while writing to the index!", e1);
+        }
+
+    }
 
-  private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
-    try {
-      throw throwable;
-    } catch (SingleIndexWriteFailureException e1) {
-      throw e1;
-    } catch (Throwable e1) {
-      throw new SingleIndexWriteFailureException(
-          "Got an abort notification while writing to the index!", e1);
+    /**
+     * {@inheritDoc}
+     * <p>
+     * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed by the external
+     * {@link Stoppable}. This call does not delegate the stop down to the {@link Stoppable} passed in the constructor.
+     * 
+     * @param why
+     *            the reason for stopping
+     */
+    @Override
+    public void stop(String why) {
+        LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+        this.pool.stop(why);
+        this.factory.shutdown();
     }
 
-  }
-
-  /**
-   * {@inheritDoc}
-   * <p>
-   * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
-   * by the external {@link Stoppable}. This call does not delegate the stop down to the
-   * {@link Stoppable} passed in the constructor.
-   * @param why the reason for stopping
-   */
-  @Override
-  public void stop(String why) {
-    LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
-    this.pool.stop(why);
-    this.factory.shutdown();
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.stopped.isStopped();
-  }
+    @Override
+    public boolean isStopped() {
+        return this.stopped.isStopped();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/a33f4c75/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
index 7f4fc9d..9a61191 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -1,22 +1,15 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.write.recovery;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -34,6 +27,7 @@ import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.phoenix.hbase.index.CapturingAbortable;
 import org.apache.phoenix.hbase.index.exception.MultiIndexWriteFailureException;
 import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
@@ -51,177 +45,192 @@ import org.apache.phoenix.hbase.index.write.IndexCommitter;
 import org.apache.phoenix.hbase.index.write.IndexWriter;
 import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.hbase.index.write.ParallelWriterIndexCommitter;
+import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.MetaDataUtil;
 
 import com.google.common.collect.Multimap;
 
 /**
- * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
- * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
- * be a lot slower, in the face of failures, when compared to the
- * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
- * you need to at least attempt all writes and know their result; for instance, this is fine for
- * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
- * edits we need to retry.
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to allow the caller to
+ * retrieve the failed and succeeded index updates. Therefore, this class will be a lot slower, in the face of failures,
+ * when compared to the {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for doing WAL recovery -
+ * it's not a performance intensive situation and we want to limit the the edits we need to retry.
  * <p>
- * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
- * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that contains the list of
+ * {@link HTableInterfaceReference} that didn't complete successfully.
  * <p>
  * Failures to write to the index can happen several different ways:
  * <ol>
- * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
- * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
- * that have not begun are not even attempted and marked as failures.</li>
- * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
- * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
- * exceptions.</li>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}. This causing any
+ * pending tasks to fail whatever they are doing as fast as possible. Any writes that have not begun are not even
+ * attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index table is not
+ * available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase exceptions.</li>
  * </ol>
- * Regardless of how the write fails, we still wait for all writes to complete before passing the
- * failure back to the client.
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the failure back to the
+ * client.
  */
 public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
-  private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
-
-  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
-  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
-      "index.trackingwriter.threads.keepalivetime";
-  
-  private TaskRunner pool;
-  private HTableFactory factory;
-  private CapturingAbortable abortable;
-  private Stoppable stopped;
-
-  @Override
-  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
-    Configuration conf = env.getConfiguration();
-    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
-      ThreadPoolManager.getExecutor(
-        new ThreadPoolBuilder(name, conf).
-          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
-            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
-          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
-      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
-  }
-
-  /**
-   * Setup <tt>this</tt>.
-   * <p>
-   * Exposed for TESTING
-   */
-  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
-      int cacheSize) {
-    this.pool = new WaitForCompletionTaskRunner(pool);
-    this.factory = new CachingHTableFactory(factory, cacheSize);
-    this.abortable = new CapturingAbortable(abortable);
-    this.stopped = stop;
-  }
-
-  @Override
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws MultiIndexWriteFailureException {
-    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-    TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
-    List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
-    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
-      // get the mutations for each table. We leak the implementation here a little bit to save
-      // doing a complete copy over of all the index update for each table.
-      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
-      // track each reference so we can get at it easily later, when determing failures
-      final HTableInterfaceReference tableReference = entry.getKey();
-      tables.add(tableReference);
-
-      /*
-       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
-       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
-       * running thread. The former will only work if we are not in the midst of writing the current
-       * batch to the table, though we do check these status variables before starting and before
-       * writing the batch. The latter usage, interrupting the thread, will work in the previous
-       * situations as was at some points while writing the batch, depending on the underlying
-       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
-       * supports an interrupt).
-       */
-      tasks.add(new Task<Boolean>() {
-
-        /**
-         * Do the actual write to the primary table. We don't need to worry about closing the table
-         * because that is handled the {@link CachingHTableFactory}.
-         */
-        @SuppressWarnings("deprecation")
-        @Override
-        public Boolean call() throws Exception {
-          try {
-            // this may have been queued, but there was an abort/stop so we try to early exit
-            throwFailureIfDone();
-
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
-            }
-            HTableInterface table = factory.getTable(tableReference.get());
-            throwFailureIfDone();
-            table.batch(mutations);
-          } catch (InterruptedException e) {
-            // reset the interrupt status on the thread
-            Thread.currentThread().interrupt();
-            throw e;
-          } catch (Exception e) {
-            throw e;
-          }
-          return Boolean.TRUE;
+    private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+    public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
+    private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+    private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY = "index.trackingwriter.threads.keepalivetime";
+
+    private TaskRunner pool;
+    private HTableFactory factory;
+    private CapturingAbortable abortable;
+    private Stoppable stopped;
+    private RegionCoprocessorEnvironment env;
+
+    @Override
+    public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+        this.env = env;
+        Configuration conf = env.getConfiguration();
+        setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+                ThreadPoolManager.getExecutor(
+                        new ThreadPoolBuilder(name, conf).setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+                                DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).setCoreTimeout(
+                                INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env), env.getRegionServerServices(), parent,
+                CachingHTableFactory.getCacheSize(conf));
+    }
+
+    /**
+     * Setup <tt>this</tt>.
+     * <p>
+     * Exposed for TESTING
+     */
+    void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop, int cacheSize) {
+        this.pool = new WaitForCompletionTaskRunner(pool);
+        this.factory = new CachingHTableFactory(factory, cacheSize);
+        this.abortable = new CapturingAbortable(abortable);
+        this.stopped = stop;
+    }
+
+    @Override
+    public void write(Multimap<HTableInterfaceReference, Mutation> toWrite) throws MultiIndexWriteFailureException {
+        Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+        TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+        List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+        for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+            // get the mutations for each table. We leak the implementation here a little bit to save
+            // doing a complete copy over of all the index update for each table.
+            final List<Mutation> mutations = (List<Mutation>)entry.getValue();
+            // track each reference so we can get at it easily later, when determing failures
+            final HTableInterfaceReference tableReference = entry.getKey();
+            final RegionCoprocessorEnvironment env = this.env;
+            tables.add(tableReference);
+
+            /*
+             * Write a batch of index updates to an index table. This operation stops (is cancelable) via two
+             * mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the running thread.
+             * The former will only work if we are not in the midst of writing the current batch to the table, though we
+             * do check these status variables before starting and before writing the batch. The latter usage,
+             * interrupting the thread, will work in the previous situations as was at some points while writing the
+             * batch, depending on the underlying writer implementation (HTableInterface#batch is blocking, but doesn't
+             * elaborate when is supports an interrupt).
+             */
+            tasks.add(new Task<Boolean>() {
+
+                /**
+                 * Do the actual write to the primary table. We don't need to worry about closing the table because that
+                 * is handled the {@link CachingHTableFactory}.
+                 */
+                @SuppressWarnings("deprecation")
+                @Override
+                public Boolean call() throws Exception {
+                    try {
+                        // this may have been queued, but there was an abort/stop so we try to early exit
+                        throwFailureIfDone();
+
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+                        }
+
+                        try {
+                            // TODO: Once HBASE-11766 is fixed, reexamine whether this is necessary.
+                            // Also, checking the prefix of the table name to determine if this is a local
+                            // index is pretty hacky. If we're going to keep this, we should revisit that
+                            // as well.
+                            if (tableReference.getTableName().startsWith(MetaDataUtil.LOCAL_INDEX_TABLE_PREFIX)) {
+                                HRegion indexRegion = IndexUtil.getIndexRegion(env);
+                                if (indexRegion != null) {
+                                    throwFailureIfDone();
+                                    indexRegion.batchMutate(mutations.toArray(new Mutation[mutations.size()]));
+                                    return null;
+                                }
+                            }
+                        } catch (IOException ignord) {
+                            // when it's failed we fall back to the standard & slow way
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("indexRegion.batchMutate failed and fall back to HTable.batch(). Got error="
+                                        + ignord);
+                            }
+                        }
+
+                        HTableInterface table = factory.getTable(tableReference.get());
+                        throwFailureIfDone();
+                        table.batch(mutations);
+                    } catch (InterruptedException e) {
+                        // reset the interrupt status on the thread
+                        Thread.currentThread().interrupt();
+                        throw e;
+                    } catch (Exception e) {
+                        throw e;
+                    }
+                    return Boolean.TRUE;
+                }
+
+                private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+                    if (stopped.isStopped() || abortable.isAborted() || Thread.currentThread().isInterrupted()) { throw new SingleIndexWriteFailureException(
+                            "Pool closed, not attempting to write to the index!", null); }
+
+                }
+            });
         }
 
-        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-          if (stopped.isStopped() || abortable.isAborted()
-              || Thread.currentThread().isInterrupted()) {
-            throw new SingleIndexWriteFailureException(
-                "Pool closed, not attempting to write to the index!", null);
-          }
+        List<Boolean> results = null;
+        try {
+            LOG.debug("Waiting on index update tasks to complete...");
+            results = this.pool.submitUninterruptible(tasks);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+        } catch (EarlyExitFailure e) {
+            throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+        }
 
+        // track the failures. We only ever access this on return from our calls, so no extra
+        // synchronization is needed. We could update all the failures as we find them, but that add a
+        // lot of locking overhead, and just doing the copy later is about as efficient.
+        List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+        int index = 0;
+        for (Boolean result : results) {
+            // there was a failure
+            if (result == null) {
+                // we know which table failed by the index of the result
+                failures.add(tables.get(index));
+            }
+            index++;
         }
-      });
-    }
 
-    List<Boolean> results = null;
-    try {
-      LOG.debug("Waiting on index update tasks to complete...");
-      results = this.pool.submitUninterruptible(tasks);
-    } catch (ExecutionException e) {
-      throw new RuntimeException(
-          "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
-    } catch (EarlyExitFailure e) {
-      throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+        // if any of the tasks failed, then we need to propagate the failure
+        if (failures.size() > 0) {
+            // make the list unmodifiable to avoid any more synchronization concerns
+            throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+        }
+        return;
     }
-    
-    // track the failures. We only ever access this on return from our calls, so no extra
-    // synchronization is needed. We could update all the failures as we find them, but that add a
-    // lot of locking overhead, and just doing the copy later is about as efficient.
-    List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
-    int index = 0;
-    for (Boolean result : results) {
-      // there was a failure
-      if (result == null) {
-        // we know which table failed by the index of the result
-        failures.add(tables.get(index));
-      }
-      index++;
+
+    @Override
+    public void stop(String why) {
+        LOG.info("Shutting down " + this.getClass().getSimpleName());
+        this.pool.stop(why);
+        this.factory.shutdown();
     }
 
-    // if any of the tasks failed, then we need to propagate the failure
-    if (failures.size() > 0) {
-      // make the list unmodifiable to avoid any more synchronization concerns
-      throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+    @Override
+    public boolean isStopped() {
+        return this.stopped.isStopped();
     }
-    return;
-  }
-
-  @Override
-  public void stop(String why) {
-    LOG.info("Shutting down " + this.getClass().getSimpleName());
-    this.pool.stop(why);
-    this.factory.shutdown();
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.stopped.isStopped();
-  }
 }
\ No newline at end of file