You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2014/02/15 01:07:44 UTC
[11/15] Rename package from org.apache.hadoop.hbase.index.* to
org.apache.phoenix.index.* to fix classloader issue causing mutable index
performance regression - https://issues.apache.org/jira/browse/PHOENIX-38
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java
deleted file mode 100644
index 48e10ce..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexCommitter.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.IndexWriteException;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Write the index updates to the index tables
- */
-public interface IndexCommitter extends Stoppable {
-
- void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
-
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws IndexWriteException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java
deleted file mode 100644
index 653e8d6..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexFailurePolicy.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Handle failures to write to the index tables.
- */
-public interface IndexFailurePolicy extends Stoppable {
-
- public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
-
- /**
- * Handle the failure of the attempted index updates
- * @param attempted map of index table -> mutations to apply
- * @param cause reason why there was a failure
- * @throws IOException
- */
- public void
- handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java
deleted file mode 100644
index f41c55f..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriter.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.IndexWriteException;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
-
-/**
- * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
- * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
- * <p>
- * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
- * threads, so it will not block the region from shutting down.
- */
-public class IndexWriter implements Stoppable {
-
- private static final Log LOG = LogFactory.getLog(IndexWriter.class);
- private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
- public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
- private AtomicBoolean stopped = new AtomicBoolean(false);
- private IndexCommitter writer;
- private IndexFailurePolicy failurePolicy;
-
- /**
- * @throws IOException if the {@link IndexWriter} or {@link IndexFailurePolicy} cannot be
- * instantiated
- */
- public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
- this(getCommitter(env), getFailurePolicy(env), env, name);
- }
-
- public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
- Configuration conf = env.getConfiguration();
- try {
- IndexCommitter committer =
- conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
- IndexCommitter.class).newInstance();
- return committer;
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- }
- }
-
- public static IndexFailurePolicy getFailurePolicy(RegionCoprocessorEnvironment env)
- throws IOException {
- Configuration conf = env.getConfiguration();
- try {
- IndexFailurePolicy committer =
- conf.getClass(INDEX_FAILURE_POLICY_CONF_KEY, KillServerOnFailurePolicy.class,
- IndexFailurePolicy.class).newInstance();
- return committer;
- } catch (InstantiationException e) {
- throw new IOException(e);
- } catch (IllegalAccessException e) {
- throw new IOException(e);
- }
- }
-
- /**
- * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
- * to be fully setup before calling.
- * @param committer
- * @param policy
- * @param env
- */
- public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
- RegionCoprocessorEnvironment env, String name) {
- this(committer, policy);
- this.writer.setup(this, env, name);
- this.failurePolicy.setup(this, env);
- }
-
- /**
- * Create an {@link IndexWriter} with an already setup {@link IndexCommitter} and
- * {@link IndexFailurePolicy}.
- * @param committer to write updates
- * @param policy to handle failures
- */
- IndexWriter(IndexCommitter committer, IndexFailurePolicy policy) {
- this.writer = committer;
- this.failurePolicy = policy;
- }
-
- /**
- * Write the mutations to their respective table.
- * <p>
- * This method is blocking and could potentially cause the writer to block for a long time as we
- * write the index updates. When we return depends on the specified {@link IndexCommitter}.
- * <p>
- * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
- * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
- * which ensures that the server crashes when an index write fails, ensuring that we get WAL
- * replay of the index edits.
- * @param indexUpdates Updates to write
- * @throws IOException
- */
- public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
- // convert the strings to htableinterfaces to which we can talk and group by TABLE
- Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
- writeAndKillYourselfOnFailure(toWrite);
- }
-
- /**
- * see {@link #writeAndKillYourselfOnFailure(Collection)}.
- * @param toWrite
- * @throws IOException
- */
- public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
- try {
- write(toWrite);
- if (LOG.isTraceEnabled()) {
- LOG.trace("Done writing all index updates!\n\t" + toWrite);
- }
- } catch (Exception e) {
- this.failurePolicy.handleFailure(toWrite, e);
- }
- }
-
- /**
- * Write the mutations to their respective table.
- * <p>
- * This method is blocking and could potentially cause the writer to block for a long time as we
- * write the index updates. We only return when either:
- * <ol>
- * <li>All index writes have returned, OR</li>
- * <li>Any single index write has failed</li>
- * </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes
- * to ensure a timely recovery of the failed index writes.
- * @param toWrite Updates to write
- * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
- * stop early depends on the {@link IndexCommitter}.
- */
- public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
- write(resolveTableReferences(toWrite));
- }
-
- /**
- * see {@link #write(Collection)}
- * @param toWrite
- * @throws IndexWriteException
- */
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws IndexWriteException {
- this.writer.write(toWrite);
- }
-
-
- /**
- * Convert the passed index updates to {@link HTableInterfaceReference}s.
- * @param indexUpdates from the index builder
- * @return pairs that can then be written by an {@link IndexWriter}.
- */
- public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
- Collection<Pair<Mutation, byte[]>> indexUpdates) {
- Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
- .<HTableInterfaceReference, Mutation> create();
- // simple map to make lookups easy while we build the map of tables to create
- Map<ImmutableBytesPtr, HTableInterfaceReference> tables =
- new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(updates.size());
- for (Pair<Mutation, byte[]> entry : indexUpdates) {
- byte[] tableName = entry.getSecond();
- ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
- HTableInterfaceReference table = tables.get(ptr);
- if (table == null) {
- table = new HTableInterfaceReference(ptr);
- tables.put(ptr, table);
- }
- updates.put(table, entry.getFirst());
- }
-
- return updates;
- }
-
- @Override
- public void stop(String why) {
- if (!this.stopped.compareAndSet(false, true)) {
- // already stopped
- return;
- }
- LOG.debug("Stopping because " + why);
- this.writer.stop(why);
- this.failurePolicy.stop(why);
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.get();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java
deleted file mode 100644
index b56a23e..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/IndexWriterUtils.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CoprocessorEnvironment;
-
-import org.apache.hadoop.hbase.index.table.CoprocessorHTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
-
-public class IndexWriterUtils {
-
- private static final Log LOG = LogFactory.getLog(IndexWriterUtils.class);
-
- /**
- * Maximum number of threads to allow per-table when writing. Each writer thread (from
- * {@link IndexWriterUtils#NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY}) has a single HTable.
- * However, each table is backed by a threadpool to manage the updates to that table. this
- * specifies the number of threads to allow in each of those tables. Generally, you shouldn't need
- * to change this, unless you have a small number of indexes to which most of the writes go.
- * Defaults to: {@value #DEFAULT_NUM_PER_TABLE_THREADS}.
- * <p>
- * For tables to which there are not a lot of writes, the thread pool automatically will decrease
- * the number of threads to one (though it can burst up to the specified max for any given table),
- * so increasing this to meet the max case is reasonable.
- * <p>
- * Setting this value too small can cause <b>catastrophic cluster failure</b>. The way HTable's
- * underlying pool works is such that is does direct hand-off of tasks to threads. This works fine
- * because HTables are assumed to work in a single-threaded context, so we never get more threads
- * than regionservers. In a multi-threaded context, we can easily grow to more than that number of
- * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the
- * coprocesor hooks, so we can't modify this behavior.
- */
- private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
- "index.writer.threads.pertable.max";
- private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
-
- /** Configuration key that HBase uses to set the max number of threads for an HTable */
- public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
- private IndexWriterUtils() {
- // private ctor for utilites
- }
-
- public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
- // create a simple delegate factory, setup the way we need
- Configuration conf = env.getConfiguration();
- // set the number of threads allowed per table.
- int htableThreads =
- conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
- LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
- IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
- return new CoprocessorHTableFactory(env);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java
deleted file mode 100644
index c043a54..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/KillServerOnFailurePolicy.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Naive failure policy - kills the server on which it resides
- */
-public class KillServerOnFailurePolicy implements IndexFailurePolicy {
-
- private static final Log LOG = LogFactory.getLog(KillServerOnFailurePolicy.class);
- private Abortable abortable;
- private Stoppable stoppable;
-
- @Override
- public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
- setup(parent, env.getRegionServerServices());
- }
-
- public void setup(Stoppable parent, Abortable abort) {
- this.stoppable = parent;
- this.abortable = abort;
- }
-
- @Override
- public void stop(String why) {
- // noop
- }
-
- @Override
- public boolean isStopped() {
- return this.stoppable.isStopped();
- }
-
- @Override
- public void
- handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
- // cleanup resources
- this.stop("Killing ourselves because of an error:" + cause);
- // notify the regionserver of the failure
- String msg =
- "Could not update the index table, killing server region because couldn't write to an index table";
- LOG.error(msg, cause);
- try {
- this.abortable.abort(msg, cause);
- } catch (Exception e) {
- LOG.fatal("Couldn't abort this server to preserve index writes, "
- + "attempting to hard kill the server");
- System.exit(1);
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java
deleted file mode 100644
index b06ecf6..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/ParallelWriterIndexCommitter.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.parallel.EarlyExitFailure;
-import org.apache.hadoop.hbase.index.parallel.QuickFailingTaskRunner;
-import org.apache.hadoop.hbase.index.parallel.Task;
-import org.apache.hadoop.hbase.index.parallel.TaskBatch;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolManager;
-import org.apache.hadoop.hbase.index.table.CachingHTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-/**
- * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
- * any of the index updates fails. Completion is determined by the following criteria: *
- * <ol>
- * <li>All index writes have returned, OR</li>
- * <li>Any single index write has failed</li>
- * </ol>
- * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
- * ensure a timely recovery of the failed index writes.
- */
-public class ParallelWriterIndexCommitter implements IndexCommitter {
-
- public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.writer.threads.keepalivetime";
- private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
-
- private HTableFactory factory;
- private Stoppable stopped;
- private QuickFailingTaskRunner pool;
-
- @Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
- Configuration conf = env.getConfiguration();
- setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
- ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder(name, conf).
- setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
- setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
- env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
- }
-
- /**
- * Setup <tt>this</tt>.
- * <p>
- * Exposed for TESTING
- */
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
- int cacheSize) {
- this.factory = new CachingHTableFactory(factory, cacheSize);
- this.pool = new QuickFailingTaskRunner(pool);
- this.stopped = stop;
- }
-
- @Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws SingleIndexWriteFailureException {
- /*
- * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
- * writes in parallel to each index table, so each table gets its own task and is submitted to
- * the pool. Where it gets tricky is that we want to block the calling thread until one of two
- * things happens: (1) all index tables get successfully updated, or (2) any one of the index
- * table writes fail; in either case, we should return as quickly as possible. We get a little
- * more complicated in that if we do get a single failure, but any of the index writes hasn't
- * been started yet (its been queued up, but not submitted to a thread) we want to that task to
- * fail immediately as we know that write is a waste and will need to be replayed anyways.
- */
-
- Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
- for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
- // get the mutations for each table. We leak the implementation here a little bit to save
- // doing a complete copy over of all the index update for each table.
- final List<Mutation> mutations = (List<Mutation>) entry.getValue();
- final HTableInterfaceReference tableReference = entry.getKey();
- /*
- * Write a batch of index updates to an index table. This operation stops (is cancelable) via
- * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
- * running thread. The former will only work if we are not in the midst of writing the current
- * batch to the table, though we do check these status variables before starting and before
- * writing the batch. The latter usage, interrupting the thread, will work in the previous
- * situations as was at some points while writing the batch, depending on the underlying
- * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
- * supports an interrupt).
- */
- tasks.add(new Task<Void>() {
-
- /**
- * Do the actual write to the primary table. We don't need to worry about closing the table
- * because that is handled the {@link CachingHTableFactory}.
- */
- @Override
- public Void call() throws Exception {
- // this may have been queued, so another task infront of us may have failed, so we should
- // early exit, if that's the case
- throwFailureIfDone();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
- }
- try {
- HTableInterface table = factory.getTable(tableReference.get());
- throwFailureIfDone();
- table.batch(mutations);
- } catch (SingleIndexWriteFailureException e) {
- throw e;
- } catch (IOException e) {
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
- } catch (InterruptedException e) {
- // reset the interrupt status on the thread
- Thread.currentThread().interrupt();
- throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
- }
- return null;
- }
-
- private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
- throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null);
- }
-
- }
- });
- }
-
- // actually submit the tasks to the pool and wait for them to finish/fail
- try {
- pool.submitUninterruptible(tasks);
- } catch (EarlyExitFailure e) {
- propagateFailure(e);
- } catch (ExecutionException e) {
- LOG.error("Found a failed index update!");
- propagateFailure(e.getCause());
- }
-
- }
-
- private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
- try {
- throw throwable;
- } catch (SingleIndexWriteFailureException e1) {
- throw e1;
- } catch (Throwable e1) {
- throw new SingleIndexWriteFailureException(
- "Got an abort notification while writing to the index!", e1);
- }
-
- }
-
- /**
- * {@inheritDoc}
- * <p>
- * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
- * by the external {@link Stoppable}. This call does not delegate the stop down to the
- * {@link Stoppable} passed in the constructor.
- * @param why the reason for stopping
- */
- @Override
- public void stop(String why) {
- LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
- this.pool.stop(why);
- this.factory.shutdown();
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.isStopped();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java
deleted file mode 100644
index b0f9d68..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/PerRegionIndexWriteCache.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write.recovery;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-
-
-public class PerRegionIndexWriteCache {
-
- private Map<HRegion, Multimap<HTableInterfaceReference, Mutation>> cache =
- new HashMap<HRegion, Multimap<HTableInterfaceReference, Mutation>>();
-
-
- /**
- * Get the edits for the current region. Removes the edits from the cache. To add them back, call
- * {@link #addEdits(HRegion, HTableInterfaceReference, Collection)}.
- * @param region
- * @return Get the edits for the given region. Returns <tt>null</tt> if there are no pending edits
- * for the region
- */
- public Multimap<HTableInterfaceReference, Mutation> getEdits(HRegion region) {
- return cache.remove(region);
- }
-
- /**
- * @param region
- * @param table
- * @param collection
- */
- public void addEdits(HRegion region, HTableInterfaceReference table,
- Collection<Mutation> collection) {
- Multimap<HTableInterfaceReference, Mutation> edits = cache.get(region);
- if (edits == null) {
- edits = ArrayListMultimap.<HTableInterfaceReference, Mutation> create();
- cache.put(region, edits);
- }
- edits.putAll(table, collection);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
deleted file mode 100644
index a17395e..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write.recovery;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-import org.apache.hadoop.hbase.index.write.IndexFailurePolicy;
-import org.apache.hadoop.hbase.index.write.KillServerOnFailurePolicy;
-
-/**
- * Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a
- * {@link MultiIndexWriteFailureException} (which is thrown from the
- * {@link TrackingParallelWriterIndexCommitter}. Any other exception failure causes the a server
- * abort via the usual {@link KillServerOnFailurePolicy}.
- */
-public class StoreFailuresInCachePolicy implements IndexFailurePolicy {
-
- private KillServerOnFailurePolicy delegate;
- private PerRegionIndexWriteCache cache;
- private HRegion region;
-
- /**
- * @param failedIndexEdits cache to update when we find a failure
- */
- public StoreFailuresInCachePolicy(PerRegionIndexWriteCache failedIndexEdits) {
- this.cache = failedIndexEdits;
- }
-
- @Override
- public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
- this.region = env.getRegion();
- this.delegate = new KillServerOnFailurePolicy();
- this.delegate.setup(parent, env);
-
- }
-
- @Override
- public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
- // if its not an exception we can handle, let the delegate take care of it
- if (!(cause instanceof MultiIndexWriteFailureException)) {
- delegate.handleFailure(attempted, cause);
- }
- List<HTableInterfaceReference> failedTables =
- ((MultiIndexWriteFailureException) cause).getFailedTables();
- for (HTableInterfaceReference table : failedTables) {
- cache.addEdits(this.region, table, attempted.get(table));
- }
- }
-
-
- @Override
- public void stop(String why) {
- this.delegate.stop(why);
- }
-
- @Override
- public boolean isStopped() {
- return this.delegate.isStopped();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
deleted file mode 100644
index 43cc19f..0000000
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.index.write.recovery;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
-import com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.index.CapturingAbortable;
-import org.apache.hadoop.hbase.index.exception.MultiIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.exception.SingleIndexWriteFailureException;
-import org.apache.hadoop.hbase.index.parallel.EarlyExitFailure;
-import org.apache.hadoop.hbase.index.parallel.Task;
-import org.apache.hadoop.hbase.index.parallel.TaskBatch;
-import org.apache.hadoop.hbase.index.parallel.TaskRunner;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolBuilder;
-import org.apache.hadoop.hbase.index.parallel.ThreadPoolManager;
-import org.apache.hadoop.hbase.index.parallel.WaitForCompletionTaskRunner;
-import org.apache.hadoop.hbase.index.table.CachingHTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableFactory;
-import org.apache.hadoop.hbase.index.table.HTableInterfaceReference;
-import org.apache.hadoop.hbase.index.write.IndexCommitter;
-import org.apache.hadoop.hbase.index.write.IndexWriter;
-import org.apache.hadoop.hbase.index.write.IndexWriterUtils;
-import org.apache.hadoop.hbase.index.write.ParallelWriterIndexCommitter;
-
-/**
- * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
- * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
- * be a lot slower, in the face of failures, when compared to the
- * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
- * you need to at least attempt all writes and know their result; for instance, this is fine for
- * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
- * edits we need to retry.
- * <p>
- * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
- * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
- * <p>
- * Failures to write to the index can happen several different ways:
- * <ol>
- * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
- * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
- * that have not begun are not even attempted and marked as failures.</li>
- * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
- * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
- * exceptions.</li>
- * </ol>
- * Regardless of how the write fails, we still wait for all writes to complete before passing the
- * failure back to the client.
- */
-public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
- private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
-
- public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
- private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
- private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
- "index.trackingwriter.threads.keepalivetime";
-
- private TaskRunner pool;
- private HTableFactory factory;
- private CapturingAbortable abortable;
- private Stoppable stopped;
-
- @Override
- public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
- Configuration conf = env.getConfiguration();
- setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
- ThreadPoolManager.getExecutor(
- new ThreadPoolBuilder(name, conf).
- setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
- DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
- setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
- env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
- }
-
- /**
- * Setup <tt>this</tt>.
- * <p>
- * Exposed for TESTING
- */
- void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
- int cacheSize) {
- this.pool = new WaitForCompletionTaskRunner(pool);
- this.factory = new CachingHTableFactory(factory, cacheSize);
- this.abortable = new CapturingAbortable(abortable);
- this.stopped = stop;
- }
-
- @Override
- public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
- throws MultiIndexWriteFailureException {
- Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
- TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
- List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
- for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
- // get the mutations for each table. We leak the implementation here a little bit to save
- // doing a complete copy over of all the index update for each table.
- final List<Mutation> mutations = (List<Mutation>) entry.getValue();
- // track each reference so we can get at it easily later, when determing failures
- final HTableInterfaceReference tableReference = entry.getKey();
- tables.add(tableReference);
-
- /*
- * Write a batch of index updates to an index table. This operation stops (is cancelable) via
- * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
- * running thread. The former will only work if we are not in the midst of writing the current
- * batch to the table, though we do check these status variables before starting and before
- * writing the batch. The latter usage, interrupting the thread, will work in the previous
- * situations as was at some points while writing the batch, depending on the underlying
- * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
- * supports an interrupt).
- */
- tasks.add(new Task<Boolean>() {
-
- /**
- * Do the actual write to the primary table. We don't need to worry about closing the table
- * because that is handled the {@link CachingHTableFactory}.
- */
- @Override
- public Boolean call() throws Exception {
- try {
- // this may have been queued, but there was an abort/stop so we try to early exit
- throwFailureIfDone();
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
- }
- HTableInterface table = factory.getTable(tableReference.get());
- throwFailureIfDone();
- table.batch(mutations);
- } catch (InterruptedException e) {
- // reset the interrupt status on the thread
- Thread.currentThread().interrupt();
- throw e;
- } catch (Exception e) {
- throw e;
- }
- return Boolean.TRUE;
- }
-
- private void throwFailureIfDone() throws SingleIndexWriteFailureException {
- if (stopped.isStopped() || abortable.isAborted()
- || Thread.currentThread().isInterrupted()) {
- throw new SingleIndexWriteFailureException(
- "Pool closed, not attempting to write to the index!", null);
- }
-
- }
- });
- }
-
- List<Boolean> results = null;
- try {
- LOG.debug("Waiting on index update tasks to complete...");
- results = this.pool.submitUninterruptible(tasks);
- } catch (ExecutionException e) {
- throw new RuntimeException(
- "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
- } catch (EarlyExitFailure e) {
- throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
- }
-
- // track the failures. We only ever access this on return from our calls, so no extra
- // synchronization is needed. We could update all the failures as we find them, but that add a
- // lot of locking overhead, and just doing the copy later is about as efficient.
- List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
- int index = 0;
- for (Boolean result : results) {
- // there was a failure
- if (result == null) {
- // we know which table failed by the index of the result
- failures.add(tables.get(index));
- }
- index++;
- }
-
- // if any of the tasks failed, then we need to propagate the failure
- if (failures.size() > 0) {
- // make the list unmodifiable to avoid any more synchronization concerns
- throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
- }
- return;
- }
-
- @Override
- public void stop(String why) {
- LOG.info("Shutting down " + this.getClass().getSimpleName());
- this.pool.stop(why);
- this.factory.shutdown();
- }
-
- @Override
- public boolean isStopped() {
- return this.stopped.isStopped();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
index 9c31900..59b1aa8 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java
@@ -46,7 +46,7 @@ import java.util.TreeMap;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+import org.apache.phoenix.hbase.index.wal.KeyValueCodec;
/**
* Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
index 5e78d3b..5d87433 100644
--- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
+++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hbase.codec.BaseEncoder;
import org.apache.hadoop.hbase.codec.Decoder;
import org.apache.hadoop.hbase.codec.Encoder;
-import org.apache.hadoop.hbase.index.wal.IndexedKeyValue;
-import org.apache.hadoop.hbase.index.wal.KeyValueCodec;
+import org.apache.phoenix.hbase.index.wal.IndexedKeyValue;
+import org.apache.phoenix.hbase.index.wal.KeyValueCodec;
/**
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
index 624a0e5..01bbf06 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.ChildMemoryManager;
import org.apache.phoenix.memory.GlobalMemoryManager;
import org.apache.phoenix.query.QueryServices;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
index 4c13210..e604f63 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/HashCache.java
@@ -22,7 +22,7 @@ import java.util.List;
import org.apache.http.annotation.Immutable;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
index 6a3e8a0..b968a9b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCache.java
@@ -22,8 +22,8 @@ import java.sql.SQLException;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.MemoryManager;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
index 8d6e81c..d9800c0 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/TenantCacheImpl.java
@@ -25,8 +25,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import com.google.common.cache.*;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.coprocessor.ServerCachingProtocol.ServerCacheFactory;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.MemoryManager;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.util.Closeables;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
index ab8bcbd..02ecb05 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillManager.java
@@ -35,10 +35,10 @@ import org.apache.hadoop.io.WritableUtils;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Closeables;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.KeyValueSchema;
import org.apache.phoenix.schema.ValueBitSet;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
index 9bcb6c8..bb4ce2e 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillMap.java
@@ -36,7 +36,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
* Class implements an active spilled partition serialized tuples are first written into an in-memory data structure
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
index b6e3949..fdc2b1d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java
@@ -44,7 +44,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.io.Closeables;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillManager.CacheEntry;
@@ -53,6 +52,7 @@ import org.apache.phoenix.coprocessor.GroupByCache;
import org.apache.phoenix.coprocessor.GroupedAggregateRegionObserver;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.memory.InsufficientMemoryException;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
import org.apache.phoenix.util.KeyValueUtil;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
index 8f91c2f..27124bb 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/client/GenericKeyValueBuilder.java
@@ -23,7 +23,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import static org.apache.hadoop.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
+import static org.apache.phoenix.hbase.index.util.ImmutableBytesPtr.copyBytesIfNecessary;
/**
* {@link KeyValueBuilder} that does simple byte[] copies to build the underlying key-value. This is
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
index 9004bb5..c5ddcdf 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/DeleteCompiler.java
@@ -29,7 +29,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
@@ -39,6 +38,7 @@ import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ResultIterator;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
index 324a557..9ead4c5 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/QueryCompiler.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.JoinCompiler.JoinSpec;
@@ -38,6 +37,7 @@ import org.apache.phoenix.execute.DegenerateQueryPlan;
import org.apache.phoenix.execute.HashJoinPlan;
import org.apache.phoenix.execute.ScanPlan;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
index 86857f4..1a85c38 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/UpsertCompiler.java
@@ -30,7 +30,6 @@ import java.util.Map;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
@@ -43,6 +42,7 @@ import org.apache.phoenix.execute.AggregatePlan;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.LiteralExpression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.iterate.ParallelIterators.ParallelIteratorFactory;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 4b4c213..d6c0063 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -52,7 +52,6 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.cache.aggcache.SpillableGroupByCache;
@@ -60,6 +59,7 @@ import org.apache.phoenix.expression.Expression;
import org.apache.phoenix.expression.ExpressionType;
import org.apache.phoenix.expression.aggregator.Aggregator;
import org.apache.phoenix.expression.aggregator.ServerAggregators;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.join.ScanProjector;
import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
index fd93df9..528c97c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java
@@ -31,11 +31,11 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.HashCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.join.ScanProjector;
import org.apache.phoenix.join.ScanProjector.ProjectedValueTuple;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
index e85f8c9..9811511 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataEndpointImpl.java
@@ -75,13 +75,13 @@ import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
-import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.phoenix.cache.GlobalCache;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.AmbiguousColumnException;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
index 4df68a4..6dea838 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ServerCachingEndpointImpl.java
@@ -23,9 +23,9 @@ import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
index 06347a4..2e5f2d7 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
@@ -20,7 +20,7 @@ package org.apache.phoenix.exception;
import java.sql.SQLException;
import java.util.Map;
-import org.apache.hadoop.hbase.index.util.IndexManagementUtil;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.AmbiguousTableException;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
index ccc9ef4..692b9e2 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/HashJoinPlan.java
@@ -27,7 +27,6 @@ import java.util.concurrent.Future;
import org.apache.hadoop.hbase.client.Scan;
import com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.ExplainPlan;
import org.apache.phoenix.compile.QueryPlan;
@@ -37,6 +36,7 @@ import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.compile.GroupByCompiler.GroupBy;
import org.apache.phoenix.compile.OrderByCompiler.OrderBy;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.iterate.ResultIterator;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.join.HashCacheClient;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
index 8701f2a..19dec71 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java
@@ -36,11 +36,11 @@ import org.slf4j.LoggerFactory;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.cache.ServerCacheClient;
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.coprocessor.MetaDataProtocol.MetaDataMutationResult;
import org.apache.phoenix.exception.SQLExceptionCode;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.IndexMetaDataCacheClient;
import org.apache.phoenix.index.PhoenixIndexCodec;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
index f8b4f2d..1a51107 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/InListExpression.java
@@ -27,11 +27,11 @@ import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
import org.apache.phoenix.expression.visitor.ExpressionVisitor;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.ConstraintViolationException;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.SortOrder;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
index 7044848..576ce7c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseDecimalStddevAggregator.java
@@ -21,11 +21,11 @@ import java.math.*;
import java.util.List;
import java.util.Map.Entry;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.expression.ColumnExpression;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
index 33794d4..a1ed1df 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseStddevAggregator.java
@@ -21,9 +21,9 @@ import java.math.BigDecimal;
import java.util.List;
import java.util.Map.Entry;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.tuple.Tuple;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
index b7dc554..f29f46a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountClientAggregator.java
@@ -27,10 +27,10 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.WritableUtils;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.schema.PDataType;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.tuple.SingleKeyValueTuple;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index 146bfb3..fa44038 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
import org.apache.phoenix.schema.PDataType;
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
index e691c41..e46b435 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/filter/MultiCQKeyValueComparisonFilter.java
@@ -19,8 +19,8 @@ package org.apache.phoenix.filter;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
/**
*
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java
new file mode 100644
index 0000000..c52e749
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/CapturingAbortable.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import org.apache.hadoop.hbase.Abortable;
+
+/**
+ * {@link Abortable} that can rethrow the cause of the abort.
+ */
+public class CapturingAbortable implements Abortable {
+
+ private Abortable delegate;
+ private Throwable cause;
+ private String why;
+
+ public CapturingAbortable(Abortable delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ if (delegate.isAborted()) {
+ return;
+ }
+ this.why = why;
+ this.cause = e;
+ delegate.abort(why, e);
+
+ }
+
+ @Override
+ public boolean isAborted() {
+ return delegate.isAborted();
+ }
+
+ /**
+ * Throw the cause of the abort, if <tt>this</tt> was aborted. If there was an exception causing
+ * the abort, re-throws that. Otherwise, just throws a generic {@link Exception} with the reason
+ * why the abort was caused.
+ * @throws Throwable the cause of the abort.
+ */
+ public void throwCauseIfAborted() throws Throwable {
+ if (!this.isAborted()) {
+ return;
+ }
+ if (cause == null) {
+ throw new Exception(why);
+ }
+ throw cause;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java
new file mode 100644
index 0000000..93f2c3e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/IndexLogRollSynchronizer.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index;
+
+import java.io.IOException;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Ensure that the log isn't rolled while we are the in middle of doing a pending index write.
+ * <p>
+ * The problem we are trying to solve is the following sequence:
+ * <ol>
+ * <li>Write to the indexed table</li>
+ * <li>Write the index-containing WALEdit</li>
+ * <li>Start writing to the index tables in the postXXX hook</li>
+ * <li>WAL gets rolled and archived</li>
+ * <li>An index update fails, in which case we should kill ourselves to get WAL replay</li>
+ * <li>Since the WAL got archived, we won't get the replay of the index writes</li>
+ * </ol>
+ * <p>
+ * The usual course of events should be:
+ * <ol>
+ * <li>In a preXXX hook,
+ * <ol>
+ * <li>Build the {@link WALEdit} + index information</li>
+ * <li>Lock the {@link IndexLogRollSynchronizer#logArchiveLock}</li>
+ * <ul>
+ * <li>This is a reentrant readlock on the WAL archiving, so we can make multiple WAL/index updates
+ * concurrently</li>
+ * </ul>
+ * </li>
+ * </ol>
+ * </li>
+ * <li>Pass that {@link WALEdit} to the WAL, ensuring its durable and replayable</li>
+ * <li>In the corresponding postXXX,
+ * <ol>
+ * <li>make the updates to the index tables</li>
+ * <li>Unlock {@link IndexLogRollSynchronizer#logArchiveLock}</li>
+ * </ol>
+ * </li> </ol>
+ * <p>
+ * <tt>this</tt> should be added as a {@link WALActionsListener} by updating
+ */
+public class IndexLogRollSynchronizer implements WALActionsListener {
+
+ private static final Log LOG = LogFactory.getLog(IndexLogRollSynchronizer.class);
+ private WriteLock logArchiveLock;
+
+ public IndexLogRollSynchronizer(WriteLock logWriteLock){
+ this.logArchiveLock = logWriteLock;
+ }
+
+
+ @Override
+ public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+ //take a write lock on the index - any pending index updates will complete before we finish
+ LOG.debug("Taking INDEX_UPDATE writelock");
+ logArchiveLock.lock();
+ LOG.debug("Got the INDEX_UPDATE writelock");
+ }
+
+ @Override
+ public void postLogArchive(Path oldPath, Path newPath) throws IOException {
+ // done archiving the logs, any WAL updates will be replayed on failure
+ LOG.debug("Releasing INDEX_UPDATE writelock");
+ logArchiveLock.unlock();
+ }
+
+ @Override
+ public void logCloseRequested() {
+ // don't care- before this is called, all the HRegions are closed, so we can't get any new
+ // requests and all pending request can finish before the WAL closes.
+ }
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ // noop
+ }
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+ // noop
+ }
+
+ @Override
+ public void logRollRequested() {
+ // noop
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) {
+ // noop
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) {
+ // noop
+ }
+}
\ No newline at end of file