You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2014/02/15 01:07:44 UTC

[11/15] Rename package from org.apache.hadoop.hbase.index.* to org.apache.phoenix.index.* to fix classloader issue causing mutable index performance regression - https://issues.apache.org/jira/browse/PHOENIX-38

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java
deleted file mode 100644
index 48e10ce..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.IndexWriteException;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Write the index updates to the index tables
- */
-public interface IndexCommitter extends Stoppable {
-
-  void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
-
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws IndexWriteException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java
deleted file mode 100644
index 653e8d6..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Handle failures to write to the index tables.
- */
-public interface IndexFailurePolicy extends Stoppable {
-
-  public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
-
-  /**
-   * Handle the failure of the attempted index updates
-   * @param attempted map of index table -> mutations to apply
-   * @param cause reason why there was a failure
- * @throws IOException 
-   */
-  public void
-      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java
deleted file mode 100644
index f41c55f..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.IndexWriteException;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
-
-/**
- * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
- * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
- * <p>
- * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
- * threads, so it will not block the region from shutting down.
- */
-public class IndexWriter implements Stoppable {
-
-  private static final Log LOG = LogFactory.getLog(IndexWriter.class);
-  private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
-  public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
-  private AtomicBoolean stopped = new AtomicBoolean(false);
-  private IndexCommitter writer;
-  private IndexFailurePolicy failurePolicy;
-
-  /**
-   * @throws IOException if the {@link IndexWriter} or {@link IndexFailurePolicy} cannot be
-   *           instantiated
-   */
-  public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
-    this(getCommitter(env), getFailurePolicy(env), env, name);
-  }
-
-  public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
-    Configuration conf = env.getConfiguration();
-    try {
-      IndexCommitter committer =
-          conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
-            IndexCommitter.class).newInstance();
-      return committer;
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    }
-  }
-
-  public static IndexFailurePolicy getFailurePolicy(RegionCoprocessorEnvironment env)
-      throws IOException {
-    Configuration conf = env.getConfiguration();
-    try {
-      IndexFailurePolicy committer =
-          conf.getClass(INDEX_FAILURE_POLICY_CONF_KEY, KillServerOnFailurePolicy.class,
-            IndexFailurePolicy.class).newInstance();
-      return committer;
-    } catch (InstantiationException e) {
-      throw new IOException(e);
-    } catch (IllegalAccessException e) {
-      throw new IOException(e);
-    }
-  }
-
-  /**
-   * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
-   * to be fully setup before calling.
-   * @param committer
-   * @param policy
-   * @param env
-   */
-  public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
-      RegionCoprocessorEnvironment env, String name) {
-    this(committer, policy);
-    this.writer.setup(this, env, name);
-    this.failurePolicy.setup(this, env);
-  }
-
-  /**
-   * Create an {@link IndexWriter} with an already setup {@link IndexCommitter} and
-   * {@link IndexFailurePolicy}.
-   * @param committer to write updates
-   * @param policy to handle failures
-   */
-  IndexWriter(IndexCommitter committer, IndexFailurePolicy policy) {
-    this.writer = committer;
-    this.failurePolicy = policy;
-  }
-  
-  /**
-   * Write the mutations to their respective table.
-   * <p>
-   * This method is blocking and could potentially cause the writer to block for a long time as we
-   * write the index updates. When we return depends on the specified {@link IndexCommitter}.
-   * <p>
-   * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
-   * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
-   * which ensures that the server crashes when an index write fails, ensuring that we get WAL
-   * replay of the index edits.
-   * @param indexUpdates Updates to write
- * @throws IOException 
-   */
-  public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException  {
-    // convert the strings to htableinterfaces to which we can talk and group by TABLE
-    Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
-    writeAndKillYourselfOnFailure(toWrite);
-  }
-
-  /**
-   * see {@link #writeAndKillYourselfOnFailure(Collection)}.
-   * @param toWrite
- * @throws IOException 
-   */
-  public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
-    try {
-      write(toWrite);
-      if (LOG.isTraceEnabled()) {
-        LOG.trace("Done writing all index updates!\n\t" + toWrite);
-      }
-    } catch (Exception e) {
-      this.failurePolicy.handleFailure(toWrite, e);
-    }
-  }
-
-  /**
-   * Write the mutations to their respective table.
-   * <p>
-   * This method is blocking and could potentially cause the writer to block for a long time as we
-   * write the index updates. We only return when either:
-   * <ol>
-   * <li>All index writes have returned, OR</li>
-   * <li>Any single index write has failed</li>
-   * </ol>
-   * We attempt to quickly determine if any write has failed and not write to the remaining indexes
-   * to ensure a timely recovery of the failed index writes.
-   * @param toWrite Updates to write
-   * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
-   *           stop early depends on the {@link IndexCommitter}.
-   */
-  public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
-    write(resolveTableReferences(toWrite));
-  }
-
-  /**
-   * see {@link #write(Collection)}
-   * @param toWrite
-   * @throws IndexWriteException
-   */
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws IndexWriteException {
-    this.writer.write(toWrite);
-  }
-
-
-  /**
-   * Convert the passed index updates to {@link HTableInterfaceReference}s.
-   * @param indexUpdates from the index builder
-   * @return pairs that can then be written by an {@link IndexWriter}.
-   */
-  public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
-      Collection<Pair<Mutation, byte[]>> indexUpdates) {
-    Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
-        .<HTableInterfaceReference, Mutation> create();
-    // simple map to make lookups easy while we build the map of tables to create
-    Map<ImmutableBytesPtr, HTableInterfaceReference> tables =
-        new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(updates.size());
-    for (Pair<Mutation, byte[]> entry : indexUpdates) {
-      byte[] tableName = entry.getSecond();
-      ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
-      HTableInterfaceReference table = tables.get(ptr);
-      if (table == null) {
-        table = new HTableInterfaceReference(ptr);
-        tables.put(ptr, table);
-      }
-      updates.put(table, entry.getFirst());
-    }
-
-    return updates;
-  }
-
-  @Override
-  public void stop(String why) {
-    if (!this.stopped.compareAndSet(false, true)) {
-      // already stopped
-      return;
-    }
-    LOG.debug("Stopping because " + why);
-    this.writer.stop(why);
-    this.failurePolicy.stop(why);
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.stopped.get();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java
deleted file mode 100644
index b56a23e..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-
-import org.apache.hadoop.hbase.index.table.CoprocessorHTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
-
-public class IndexWriterUtils {
-
-  private static final Log LOG = LogFactory.getLog(IndexWriterUtils.class);
-
-  /**
-   * Maximum number of threads to allow per-table when writing. Each writer thread (from
-   * {@link IndexWriterUtils#NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY}) has a single HTable.
-   * However, each table is backed by a threadpool to manage the updates to that table. this
-   * specifies the number of threads to allow in each of those tables. Generally, you shouldn't need
-   * to change this, unless you have a small number of indexes to which most of the writes go.
-   * Defaults to: {@value #DEFAULT_NUM_PER_TABLE_THREADS}.
-   * <p>
-   * For tables to which there are not a lot of writes, the thread pool automatically will decrease
-   * the number of threads to one (though it can burst up to the specified max for any given table),
-   * so increasing this to meet the max case is reasonable.
-   * <p>
-   * Setting this value too small can cause <b>catastrophic cluster failure</b>. The way HTable's
-   * underlying pool works is such that is does direct hand-off of tasks to threads. This works fine
-   * because HTables are assumed to work in a single-threaded context, so we never get more threads
-   * than regionservers. In a multi-threaded context, we can easily grow to more than that number of
-   * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the
-   * coprocesor hooks, so we can't modify this behavior.
-   */
-  private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
-      "index.writer.threads.pertable.max";
-  private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
-
-  /** Configuration key that HBase uses to set the max number of threads for an HTable */
-  public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
-  private IndexWriterUtils() {
-    // private ctor for utilites
-  }
-
-  public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
-    // create a simple delegate factory, setup the way we need
-    Configuration conf = env.getConfiguration();
-    // set the number of threads allowed per table.
-    int htableThreads =
-        conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
-    LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
-    IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
-    return new CoprocessorHTableFactory(env);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java
deleted file mode 100644
index c043a54..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Naive failure policy - kills the server on which it resides
- */
-public class KillServerOnFailurePolicy implements IndexFailurePolicy {
-
-  private static final Log LOG = LogFactory.getLog(KillServerOnFailurePolicy.class);
-  private Abortable abortable;
-  private Stoppable stoppable;
-
-  @Override
-  public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
-    setup(parent, env.getRegionServerServices());
-  }
-
-  public void setup(Stoppable parent, Abortable abort) {
-    this.stoppable = parent;
-    this.abortable = abort;
-  }
-
-  @Override
-  public void stop(String why) {
-    // noop
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.stoppable.isStopped();
-  }
-
-  @Override
-  public void
-      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
-    // cleanup resources
-    this.stop("Killing ourselves because of an error:" + cause);
-    // notify the regionserver of the failure
-    String msg =
-        "Could not update the index table, killing server region because couldn't write to an index table";
-    LOG.error(msg, cause);
-    try {
-      this.abortable.abort(msg, cause);
-    } catch (Exception e) {
-      LOG.fatal("Couldn't abort this server to preserve index writes, "
-          + "attempting to hard kill the server");
-      System.exit(1);
-    }
-
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java
deleted file mode 100644
index b06ecf6..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.parallel.EarlyExitFailure;
-import org.apache.hadoop.hbase.index.parallel.QuickFailingTaskRunner;
-import org.apache.hadoop.hbase.index.parallel.Task;
-import org.apache.hadoop.hbase.index.parallel.TaskBatch;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolManager;
-import org.apache.hadoop.hbase.index.table.CachingHTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
- * any of the index updates fails. Completion is determined by the following criteria: *
- * <ol>
- * <li>All index writes have returned, OR</li>
- * <li>Any single index write has failed</li>
- * </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
- * ensure a timely recovery of the failed index writes.
- */
-public class ParallelWriterIndexCommitter implements IndexCommitter {
-
-  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
-  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
-      "index.writer.threads.keepalivetime";
-  private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
-
-  private HTableFactory factory;
-  private Stoppable stopped;
-  private QuickFailingTaskRunner pool;
-
-  @Override
-  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
-    Configuration conf = env.getConfiguration();
-    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
-      ThreadPoolManager.getExecutor(
-        new ThreadPoolBuilder(name, conf).
-          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
-            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
-          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
-      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
-  }
-
-  /**
-   * Setup <tt>this</tt>.
-   * <p>
-   * Exposed for TESTING
-   */
-  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
-      int cacheSize) {
-    this.factory = new CachingHTableFactory(factory, cacheSize);
-    this.pool = new QuickFailingTaskRunner(pool);
-    this.stopped = stop;
-  }
-
-  @Override
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws SingleIndexWriteFailureException {
-    /*
-     * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
-     * writes in parallel to each index table, so each table gets its own task and is submitted to
-     * the pool. Where it gets tricky is that we want to block the calling thread until one of two
-     * things happens: (1) all index tables get successfully updated, or (2) any one of the index
-     * table writes fail; in either case, we should return as quickly as possible. We get a little
-     * more complicated in that if we do get a single failure, but any of the index writes hasn't
-     * been started yet (its been queued up, but not submitted to a thread) we want to that task to
-     * fail immediately as we know that write is a waste and will need to be replayed anyways.
-     */
-
-    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-    TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
-    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
-      // get the mutations for each table. We leak the implementation here a little bit to save
-      // doing a complete copy over of all the index update for each table.
-      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
-      final HTableInterfaceReference tableReference = entry.getKey();
-      /*
-       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
-       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
-       * running thread. The former will only work if we are not in the midst of writing the current
-       * batch to the table, though we do check these status variables before starting and before
-       * writing the batch. The latter usage, interrupting the thread, will work in the previous
-       * situations as was at some points while writing the batch, depending on the underlying
-       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
-       * supports an interrupt).
-       */
-      tasks.add(new Task<Void>() {
-
-        /**
-         * Do the actual write to the primary table. We don't need to worry about closing the table
-         * because that is handled the {@link CachingHTableFactory}.
-         */
-        @Override
-        public Void call() throws Exception {
-          // this may have been queued, so another task infront of us may have failed, so we should
-          // early exit, if that's the case
-          throwFailureIfDone();
-
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
-          }
-          try {
-            HTableInterface table = factory.getTable(tableReference.get());
-            throwFailureIfDone();
-            table.batch(mutations);
-          } catch (SingleIndexWriteFailureException e) {
-            throw e;
-          } catch (IOException e) {
-            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
-          } catch (InterruptedException e) {
-            // reset the interrupt status on the thread
-            Thread.currentThread().interrupt();
-            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
-          }
-          return null;
-        }
-
-        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-          if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
-            throw new SingleIndexWriteFailureException(
-                "Pool closed, not attempting to write to the index!", null);
-          }
-
-        }
-      });
-    }
-
-    // actually submit the tasks to the pool and wait for them to finish/fail
-    try {
-      pool.submitUninterruptible(tasks);
-    } catch (EarlyExitFailure e) {
-      propagateFailure(e);
-    } catch (ExecutionException e) {
-      LOG.error("Found a failed index update!");
-      propagateFailure(e.getCause());
-    }
-
-  }
-
-  private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
-    try {
-      throw throwable;
-    } catch (SingleIndexWriteFailureException e1) {
-      throw e1;
-    } catch (Throwable e1) {
-      throw new SingleIndexWriteFailureException(
-          "Got an abort notification while writing to the index!", e1);
-    }
-
-  }
-
-  /**
-   * {@inheritDoc}
-   * <p>
-   * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
-   * by the external {@link Stoppable}. This call does not delegate the stop down to the
-   * {@link Stoppable} passed in the constructor.
-   * @param why the reason for stopping
-   */
-  @Override
-  public void stop(String why) {
-    LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
-    this.pool.stop(why);
-    this.factory.shutdown();
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.stopped.isStopped();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java
deleted file mode 100644
index b0f9d68..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write.recovery;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-
-public class PerRegionIndexWriteCache {
-
-  private Map<HRegion, Multimap<HTableInterfaceReference, Mutation>> cache =
-      new HashMap<HRegion, Multimap<HTableInterfaceReference, Mutation>>();
-
-
-  /**
-   * Get the edits for the current region. Removes the edits from the cache. To add them back, call
-   * {@link #addEdits(HRegion, HTableInterfaceReference, Collection)}.
-   * @param region
-   * @return Get the edits for the given region. Returns <tt>null</tt> if there are no pending edits
-   *         for the region
-   */
-  public Multimap<HTableInterfaceReference, Mutation> getEdits(HRegion region) {
-    return cache.remove(region);
-  }
-
-  /**
-   * @param region
-   * @param table
-   * @param collection
-   */
-  public void addEdits(HRegion region, HTableInterfaceReference table,
-      Collection<Mutation> collection) {
-    Multimap<HTableInterfaceReference, Mutation> edits = cache.get(region);
-    if (edits == null) {
-      edits = ArrayListMultimap.<HTableInterfaceReference, Mutation> create();
-      cache.put(region, edits);
-    }
-    edits.putAll(table, collection);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
deleted file mode 100644
index a17395e..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write.recovery;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-import org.apache.hadoop.hbase.index.write.IndexFailurePolicy;
-import org.apache.hadoop.hbase.index.write.KillServerOnFailurePolicy;
-
-/**
- * Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a
- * {@link MultiIndexWriteFailureException} (which is thrown from the
- * {@link TrackingParallelWriterIndexCommitter}. Any other exception failure causes the a server
- * abort via the usual {@link KillServerOnFailurePolicy}.
- */
-public class StoreFailuresInCachePolicy implements IndexFailurePolicy {
-
-  private KillServerOnFailurePolicy delegate;
-  private PerRegionIndexWriteCache cache;
-  private HRegion region;
-
-  /**
-   * @param failedIndexEdits cache to update when we find a failure
-   */
-  public StoreFailuresInCachePolicy(PerRegionIndexWriteCache failedIndexEdits) {
-    this.cache = failedIndexEdits;
-  }
-
-  @Override
-  public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
-    this.region = env.getRegion();
-    this.delegate = new KillServerOnFailurePolicy();
-    this.delegate.setup(parent, env);
-
-  }
-
-  @Override
-  public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
-    // if its not an exception we can handle, let the delegate take care of it
-    if (!(cause instanceof MultiIndexWriteFailureException)) {
-      delegate.handleFailure(attempted, cause);
-    }
-    List<HTableInterfaceReference> failedTables =
-        ((MultiIndexWriteFailureException) cause).getFailedTables();
-    for (HTableInterfaceReference table : failedTables) {
-      cache.addEdits(this.region, table, attempted.get(table));
-    }
-  }
-
-
-  @Override
-  public void stop(String why) {
-    this.delegate.stop(why);
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.delegate.isStopped();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
deleted file mode 100644
index 43cc19f..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write.recovery;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.CapturingAbortable;
-import org.apache.hadoop.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.parallel.EarlyExitFailure;
-import org.apache.hadoop.hbase.index.parallel.Task;
-import org.apache.hadoop.hbase.index.parallel.TaskBatch;
-import org.apache.hadoop.hbase.index.parallel.TaskRunner;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolManager;
-import org.apache.hadoop.hbase.index.parallel.WaitForCompletionTaskRunner;
-import org.apache.hadoop.hbase.index.table.CachingHTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-import org.apache.hadoop.hbase.index.write.IndexCommitter;
-import org.apache.hadoop.hbase.index.write.IndexWriter;
-import org.apache.hadoop.hbase.index.write.IndexWriterUtils;
-import org.apache.hadoop.hbase.index.write.ParallelWriterIndexCommitter;
-
-/**
- * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
- * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
- * be a lot slower, in the face of failures, when compared to the
- * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
- * you need to at least attempt all writes and know their result; for instance, this is fine for
- * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
- * edits we need to retry.
- * <p>
- * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
- * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
- * <p>
- * Failures to write to the index can happen several different ways:
- * <ol>
- * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
- * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
- * that have not begun are not even attempted and marked as failures.</li>
- * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
- * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
- * exceptions.</li>
- * </ol>
- * Regardless of how the write fails, we still wait for all writes to complete before passing the
- * failure back to the client.
- */
-public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
-  private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
-
-  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
-  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
-  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
-      "index.trackingwriter.threads.keepalivetime";
-  
-  private TaskRunner pool;
-  private HTableFactory factory;
-  private CapturingAbortable abortable;
-  private Stoppable stopped;
-
-  @Override
-  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
-    Configuration conf = env.getConfiguration();
-    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
-      ThreadPoolManager.getExecutor(
-        new ThreadPoolBuilder(name, conf).
-          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
-            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
-          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
-      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
-  }
-
-  /**
-   * Setup <tt>this</tt>.
-   * <p>
-   * Exposed for TESTING
-   */
-  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
-      int cacheSize) {
-    this.pool = new WaitForCompletionTaskRunner(pool);
-    this.factory = new CachingHTableFactory(factory, cacheSize);
-    this.abortable = new CapturingAbortable(abortable);
-    this.stopped = stop;
-  }
-
-  @Override
-  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
-      throws MultiIndexWriteFailureException {
-    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
-    TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
-    List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
-    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
-      // get the mutations for each table. We leak the implementation here a little bit to save
-      // doing a complete copy over of all the index update for each table.
-      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
-      // track each reference so we can get at it easily later, when determing failures
-      final HTableInterfaceReference tableReference = entry.getKey();
-      tables.add(tableReference);
-
-      /*
-       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
-       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
-       * running thread. The former will only work if we are not in the midst of writing the current
-       * batch to the table, though we do check these status variables before starting and before
-       * writing the batch. The latter usage, interrupting the thread, will work in the previous
-       * situations as was at some points while writing the batch, depending on the underlying
-       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
-       * supports an interrupt).
-       */
-      tasks.add(new Task<Boolean>() {
-
-        /**
-         * Do the actual write to the primary table. We don't need to worry about closing the table
-         * because that is handled the {@link CachingHTableFactory}.
-         */
-        @Override
-        public Boolean call() throws Exception {
-          try {
-            // this may have been queued, but there was an abort/stop so we try to early exit
-            throwFailureIfDone();
-
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
-            }
-            HTableInterface table = factory.getTable(tableReference.get());
-            throwFailureIfDone();
-            table.batch(mutations);
-          } catch (InterruptedException e) {
-            // reset the interrupt status on the thread
-            Thread.currentThread().interrupt();
-            throw e;
-          } catch (Exception e) {
-            throw e;
-          }
-          return Boolean.TRUE;
-        }
-
-        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
-          if (stopped.isStopped() || abortable.isAborted()
-              || Thread.currentThread().isInterrupted()) {
-            throw new SingleIndexWriteFailureException(
-                "Pool closed, not attempting to write to the index!", null);
-          }
-
-        }
-      });
-    }
-
-    List<Boolean> results = null;
-    try {
-      LOG.debug("Waiting on index update tasks to complete...");
-      results = this.pool.submitUninterruptible(tasks);
-    } catch (ExecutionException e) {
-      throw new RuntimeException(
-          "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
-    } catch (EarlyExitFailure e) {
-      throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
-    }
-    
-    // track the failures. We only ever access this on return from our calls, so no extra
-    // synchronization is needed. We could update all the failures as we find them, but that add a
-    // lot of locking overhead, and just doing the copy later is about as efficient.
-    List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
-    int index = 0;
-    for (Boolean result : results) {
-      // there was a failure
-      if (result == null) {
-        // we know which table failed by the index of the result
-        failures.add(tables.get(index));
-      }
-      index++;
-    }
-
-    // if any of the tasks failed, then we need to propagate the failure
-    if (failures.size() > 0) {
-      // make the list unmodifiable to avoid any more synchronization concerns
-      throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
-    }
-    return;
-  }
-
-  @Override
-  public void stop(String why) {
-    LOG.info("Shutting down " + this.getClass().getSimpleName());
-    this.pool.stop(why);
-    this.factory.shutdown();
-  }
-
-  @Override
-  public boolean isStopped() {
-    return this.stopped.isStopped();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
index 9c31900..59b1aa8 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
@@ -46,7 +46,7 @@ import java.util.TreeMap;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+import org.apache.phoenix.hbase.index.wal.KeyValueCodec;
 
 /**
  * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
index 5e78d3b..5d87433 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.codec.BaseEncoder;
 import org.apache.hadoop.hbase.codec.Decoder;
 import org.apache.hadoop.hbase.codec.Encoder;
 
-import org.apache.hadoop.hbase.index.wal.IndexedKeyValue;
-import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
+import org.apache.phoenix.hbase.index.wal.KeyValueCodec;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index 624a0e5..01bbf06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.ChildMemoryManager;
 import org.apache.phoenix.memory.GlobalMemoryManager;
 import org.apache.phoenix.query.QueryServices;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
index 4c13210..e604f63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -22,7 +22,7 @@ import java.util.List;
 
 import org.apache.http.annotation.Immutable;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index 6a3e8a0..b968a9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -22,8 +22,8 @@ import java.sql.SQLException;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 8d6e81c..d9800c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -25,8 +25,8 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
 import com.google.common.cache.*;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.MemoryManager;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.util.Closeables;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
index ab8bcbd..02ecb05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
@@ -35,10 +35,10 @@ import org.apache.hadoop.io.WritableUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.io.Closeables;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.ValueBitSet;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
index 9bcb6c8..bb4ce2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
@@ -36,7 +36,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.hash.BloomFilter;
 import com.google.common.hash.Funnels;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
  * Class implements an active spilled partition serialized tuples are first written into an in-memory data structure

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index b6e3949..fdc2b1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -44,7 +44,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.io.Closeables;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.cache.aggcache.SpillManager.CacheEntry;
@@ -53,6 +52,7 @@ import org.apache.phoenix.coprocessor.GroupByCache;
 import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.memory.InsufficientMemoryException;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.util.KeyValueUtil;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
index 8f91c2f..27124bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
-import static org.apache.hadoop.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
+import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
 
 /**
  * {@link KeyValueBuilder} that does simple byte[] copies to build the underlying key-value. This is

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 9004bb5..c5ddcdf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -39,6 +38,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
 import org.apache.phoenix.exception.SQLExceptionInfo;
 import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ResultIterator;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 324a557..9ead4c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -23,7 +23,6 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
@@ -38,6 +37,7 @@ import org.apache.phoenix.execute.DegenerateQueryPlan;
 import org.apache.phoenix.execute.HashJoinPlan;
 import org.apache.phoenix.execute.ScanPlan;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 86857f4..1a85c38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -30,7 +30,6 @@ import java.util.Map;
 
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -43,6 +42,7 @@ import org.apache.phoenix.execute.AggregatePlan;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 4b4c213..d6c0063 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -52,7 +52,6 @@ import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.cache.aggcache.SpillableGroupByCache;
@@ -60,6 +59,7 @@ import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.aggregator.Aggregator;
 import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.ScanProjector;
 import org.apache.phoenix.memory.MemoryManager.MemoryChunk;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index fd93df9..528c97c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -31,11 +31,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.HashCache;
 import org.apache.phoenix.cache.TenantCache;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.join.HashJoinInfo;
 import org.apache.phoenix.join.ScanProjector;
 import org.apache.phoenix.join.ScanProjector.ProjectedValueTuple;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index e85f8c9..9811511 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -75,13 +75,13 @@ import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
-import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.schema.AmbiguousColumnException;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 4df68a4..6dea838 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 06347a4..2e5f2d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.exception;
 import java.sql.SQLException;
 import java.util.Map;
 
-import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
 import org.apache.phoenix.schema.AmbiguousColumnException;
 import org.apache.phoenix.schema.AmbiguousTableException;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index ccc9ef4..692b9e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Future;
 import org.apache.hadoop.hbase.client.Scan;
 
 import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.compile.ExplainPlan;
 import org.apache.phoenix.compile.QueryPlan;
@@ -37,6 +36,7 @@ import org.apache.phoenix.compile.StatementContext;
 import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
 import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.iterate.ResultIterator;
 import org.apache.phoenix.job.JobManager.JobCallable;
 import org.apache.phoenix.join.HashCacheClient;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8701f2a..19dec71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.cache.ServerCacheClient;
 import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
 import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
 import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.IndexMetaDataCacheClient;
 import org.apache.phoenix.index.PhoenixIndexCodec;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
index f8b4f2d..1a51107 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
@@ -27,11 +27,11 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.ConstraintViolationException;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
index 7044848..576ce7c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
@@ -21,11 +21,11 @@ import java.math.*;
 import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.expression.ColumnExpression;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
index 33794d4..a1ed1df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
@@ -21,9 +21,9 @@ import java.math.BigDecimal;
 import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.tuple.Tuple;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
index b7dc554..f29f46a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -27,10 +27,10 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.TreeMap;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.schema.PDataType;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 146bfb3..fa44038 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PDataType;

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
index e691c41..e46b435 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
@@ -19,8 +19,8 @@ package org.apache.phoenix.filter;
 
 import org.apache.hadoop.hbase.util.Bytes;
 
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 /**
  *

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java
new file mode 100644
index 0000000..c52e749
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * {@link Abortable} that can rethrow the cause of the abort.
+ */
+public class CapturingAbortable implements Abortable {
+
+  private Abortable delegate;
+  private Throwable cause;
+  private String why;
+
+  public CapturingAbortable(Abortable delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    if (delegate.isAborted()) {
+      return;
+    }
+    this.why = why;
+    this.cause = e;
+    delegate.abort(why, e);
+
+  }
+
+  @Override
+  public boolean isAborted() {
+    return delegate.isAborted();
+  }
+
+  /**
+   * Throw the cause of the abort, if <tt>this</tt> was aborted. If there was an exception causing
+   * the abort, re-throws that. Otherwise, just throws a generic {@link Exception} with the reason
+   * why the abort was caused.
+   * @throws Throwable the cause of the abort.
+   */
+  public void throwCauseIfAborted() throws Throwable {
+    if (!this.isAborted()) {
+      return;
+    }
+    if (cause == null) {
+      throw new Exception(why);
+    }
+    throw cause;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java
new file mode 100644
index 0000000..93f2c3e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Ensure that the log isn't rolled while we are the in middle of doing a pending index write.
+ * <p>
+ * The problem we are trying to solve is the following sequence:
+ * <ol>
+ * <li>Write to the indexed table</li>
+ * <li>Write the index-containing WALEdit</li>
+ * <li>Start writing to the index tables in the postXXX hook</li>
+ * <li>WAL gets rolled and archived</li>
+ * <li>An index update fails, in which case we should kill ourselves to get WAL replay</li>
+ * <li>Since the WAL got archived, we won't get the replay of the index writes</li>
+ * </ol>
+ * <p>
+ * The usual course of events should be:
+ * <ol>
+ * <li>In a preXXX hook,
+ * <ol>
+ * <li>Build the {@link WALEdit} + index information</li>
+ * <li>Lock the {@link IndexLogRollSynchronizer#logArchiveLock}</li>
+ * <ul>
+ * <li>This is a reentrant readlock on the WAL archiving, so we can make multiple WAL/index updates
+ * concurrently</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * </li>
+ * <li>Pass that {@link WALEdit} to the WAL, ensuring its durable and replayable</li>
+ * <li>In the corresponding postXXX,
+ * <ol>
+ * <li>make the updates to the index tables</li>
+ * <li>Unlock {@link IndexLogRollSynchronizer#logArchiveLock}</li>
+ * </ol>
+ * </li> </ol>
+ * <p>
+ * <tt>this</tt> should be added as a {@link WALActionsListener} by updating
+ */
+public class IndexLogRollSynchronizer implements WALActionsListener {
+
+  private static final Log LOG = LogFactory.getLog(IndexLogRollSynchronizer.class);
+  private WriteLock logArchiveLock;
+
+  public IndexLogRollSynchronizer(WriteLock logWriteLock){
+    this.logArchiveLock = logWriteLock;
+  }
+
+
+  @Override
+  public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+    //take a write lock on the index - any pending index updates will complete before we finish
+    LOG.debug("Taking INDEX_UPDATE writelock");
+    logArchiveLock.lock();
+    LOG.debug("Got the INDEX_UPDATE writelock");
+  }
+  
+  @Override
+  public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+    // done archiving the logs, any WAL updates will be replayed on failure
+    LOG.debug("Releasing INDEX_UPDATE writelock");
+    logArchiveLock.unlock();
+  }
+
+  @Override
+  public void logCloseRequested() {
+    // don't care- before this is called, all the HRegions are closed, so we can't get any new
+    // requests and all pending request can finish before the WAL closes.
+  }
+
+  @Override
+  public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+    // noop
+  }
+
+  @Override
+  public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+    // noop
+  }
+
+  @Override
+  public void logRollRequested() {
+    // noop
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
+    // noop
+  }
+
+  @Override
+  public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
+    // noop
+  }
+}
\ No newline at end of file