You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by mu...@apache.org on 2014/02/15 01:07:40 UTC

[07/15] Rename package from org.apache.hadoop.hbase.index.* to org.apache.phoenix.index.* to fix classloader issue causing mutable index performance regression - https://issues.apache.org/jira/browse/PHOENIX-38

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
new file mode 100644
index 0000000..a28268c
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/EmptyScanner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+
+/**
+ * {@link Scanner} that has no underlying data
+ */
+public class EmptyScanner implements Scanner {
+
+  @Override
+  public KeyValue next() throws IOException {
+    return null;
+  }
+
+  @Override
+  public boolean seek(KeyValue next) throws IOException {
+    return false;
+  }
+
+  @Override
+  public KeyValue peek() throws IOException {
+    return null;
+  }
+
+  @Override
+  public void close() throws IOException {
+    // noop
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
new file mode 100644
index 0000000..d91aaaf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.IOException;
+import java.util.SortedSet;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.Filter.ReturnCode;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+
+import org.apache.phoenix.hbase.index.covered.KeyValueStore;
+
+/**
+ * Combine a simplified version of the logic in the ScanQueryMatcher and the KeyValueScanner. We can get away with this
+ * here because we are only concerned with a single MemStore for the index; we don't need to worry about multiple column
+ * families or minimizing seeking through file - we just want to iterate the kvs quickly, in-memory.
+ */
+public class FilteredKeyValueScanner implements KeyValueScanner {
+
+    private KeyValueScanner delegate;
+    private Filter filter;
+
+    public FilteredKeyValueScanner(Filter filter, KeyValueStore store) {
+        this(filter, store.getScanner());
+    }
+
+    private FilteredKeyValueScanner(Filter filter, KeyValueScanner delegate) {
+        this.delegate = delegate;
+        this.filter = filter;
+    }
+
+    @Override
+    public KeyValue peek() {
+        return delegate.peek();
+    }
+
+    /**
+     * Same a {@link KeyValueScanner#next()} except that we filter out the next {@link KeyValue} until we find one that
+     * passes the filter.
+     * 
+     * @return the next {@link KeyValue} or <tt>null</tt> if no next {@link KeyValue} is present and passes all the
+     *         filters.
+     */
+    @Override
+    public KeyValue next() throws IOException {
+        seekToNextUnfilteredKeyValue();
+        return delegate.next();
+    }
+
+    @Override
+    public boolean seek(KeyValue key) throws IOException {
+        if (filter.filterAllRemaining()) { return false; }
+        // see if we can seek to the next key
+        if (!delegate.seek(key)) { return false; }
+
+        return seekToNextUnfilteredKeyValue();
+    }
+
+    private boolean seekToNextUnfilteredKeyValue() throws IOException {
+        while (true) {
+            KeyValue peeked = delegate.peek();
+            // no more key values, so we are done
+            if (peeked == null) { return false; }
+
+            // filter the peeked value to see if it should be served
+            ReturnCode code = filter.filterKeyValue(peeked);
+            switch (code) {
+            // included, so we are done
+            case INCLUDE:
+            case INCLUDE_AND_NEXT_COL:
+                return true;
+                // not included, so we need to go to the next row
+            case SKIP:
+            case NEXT_COL:
+            case NEXT_ROW:
+                delegate.next();
+                break;
+            // use a seek hint to find out where we should go
+            case SEEK_NEXT_USING_HINT:
+                delegate.seek(filter.getNextKeyHint(peeked));
+            }
+        }
+    }
+
+    @Override
+    public boolean reseek(KeyValue key) throws IOException {
+        this.delegate.reseek(key);
+        return this.seekToNextUnfilteredKeyValue();
+    }
+
+    @Override
+    public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom) throws IOException {
+        return this.reseek(kv);
+    }
+
+    @Override
+    public boolean isFileScanner() {
+        return false;
+    }
+
+    @Override
+    public long getSequenceID() {
+        return this.delegate.getSequenceID();
+    }
+
+    @Override
+    public boolean shouldUseScanner(Scan scan, SortedSet<byte[]> columns, long oldestUnexpiredTS) {
+        throw new UnsupportedOperationException(this.getClass().getName()
+                + " doesn't support checking to see if it should use a scanner!");
+    }
+
+    @Override
+    public boolean realSeekDone() {
+        return this.delegate.realSeekDone();
+    }
+
+    @Override
+    public void enforceSeek() throws IOException {
+        this.delegate.enforceSeek();
+    }
+
+    @Override
+    public void close() {
+        this.delegate.close();
+    }
+
+    /*
+    @Override
+    public boolean backwardSeek(KeyValue arg0) throws IOException {
+        return this.delegate.backwardSeek(arg0);
+    }
+
+    @Override
+    public boolean seekToLastRow() throws IOException {
+        return this.delegate.seekToLastRow();
+    }
+
+    @Override
+    public boolean seekToPreviousRow(KeyValue arg0) throws IOException {
+        return this.delegate.seekToPreviousRow(arg0);
+    }
+    */
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
new file mode 100644
index 0000000..868e892
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/Scanner.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * Scan the primary table. This is similar to HBase's scanner, but ensures that you will never see
+ * deleted columns/rows
+ */
+public interface Scanner extends Closeable {
+
+  /**
+   * @return the next keyvalue in the scanner or <tt>null</tt> if there is no next {@link KeyValue}
+   * @throws IOException if there is an underlying error reading the data
+   */
+  public KeyValue next() throws IOException;
+
+  /**
+   * Seek to immediately before the given {@link KeyValue}. If that exact {@link KeyValue} is
+   * present in <tt>this</tt>, it will be returned by the next call to {@link #next()}. Otherwise,
+   * returns the next {@link KeyValue} after the seeked {@link KeyValue}.
+   * @param next {@link KeyValue} to seek to. Doesn't need to already be present in <tt>this</tt>
+   * @return <tt>true</tt> if there are values left in <tt>this</tt>, <tt>false</tt> otherwise
+   * @throws IOException if there is an error reading the underlying data.
+   */
+  public boolean seek(KeyValue next) throws IOException;
+
+  /**
+   * Read the {@link KeyValue} at the top of <tt>this</tt> without 'popping' it off the top of the
+   * scanner.
+   * @return the next {@link KeyValue} or <tt>null</tt> if there are no more values in <tt>this</tt>
+   * @throws IOException if there is an error reading the underlying data.
+   */
+  public KeyValue peek() throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
new file mode 100644
index 0000000..edc26d5
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/ScannerBuilder.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.scanner;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.FamilyFilter;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterList;
+import org.apache.hadoop.hbase.filter.QualifierFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import com.google.common.collect.Lists;
+import org.apache.phoenix.hbase.index.covered.KeyValueStore;
+import org.apache.phoenix.hbase.index.covered.filter.ApplyAndFilterDeletesFilter;
+import org.apache.phoenix.hbase.index.covered.filter.ColumnTrackingNextLargestTimestampFilter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ *
+ */
+public class ScannerBuilder {
+
+  private KeyValueStore memstore;
+  private Mutation update;
+
+
+  public ScannerBuilder(KeyValueStore memstore, Mutation update) {
+    this.memstore = memstore;
+    this.update = update;
+  }
+
+  public Scanner buildIndexedColumnScanner(Collection<? extends ColumnReference> indexedColumns, ColumnTracker tracker, long ts) {
+
+    Filter columnFilters = getColumnFilters(indexedColumns);
+    FilterList filters = new FilterList(Lists.newArrayList(columnFilters));
+
+    // skip to the right TS. This needs to come before the deletes since the deletes will hide any
+    // state that comes before the actual kvs, so we need to capture those TS as they change the row
+    // state.
+    filters.addFilter(new ColumnTrackingNextLargestTimestampFilter(ts, tracker));
+
+    // filter out kvs based on deletes
+    filters.addFilter(new ApplyAndFilterDeletesFilter(getAllFamilies(indexedColumns)));
+
+    // combine the family filters and the rest of the filters as a
+    return getFilteredScanner(filters);
+  }
+
+  /**
+   * @param columns columns to filter
+   * @return filter that will skip any {@link KeyValue} that doesn't match one of the passed columns
+   *         and the
+   */
+  private Filter
+      getColumnFilters(Collection<? extends ColumnReference> columns) {
+    // each column needs to be added as an OR, so we need to separate them out
+    FilterList columnFilters = new FilterList(FilterList.Operator.MUST_PASS_ONE);
+
+    // create a filter that matches each column reference
+    for (ColumnReference ref : columns) {
+      Filter columnFilter =
+          new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(ref.getFamily()));
+      // combine with a match for the qualifier, if the qualifier is a specific qualifier
+      if (!Bytes.equals(ColumnReference.ALL_QUALIFIERS, ref.getQualifier())) {
+        columnFilter =
+            new FilterList(columnFilter, new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(
+                ref.getQualifier())));
+      }
+      columnFilters.addFilter(columnFilter);
+    }
+    return columnFilters;
+  }
+
+  private Set<ImmutableBytesPtr>
+      getAllFamilies(Collection<? extends ColumnReference> columns) {
+    Set<ImmutableBytesPtr> families = new HashSet<ImmutableBytesPtr>();
+    for (ColumnReference ref : columns) {
+      families.add(new ImmutableBytesPtr(ref.getFamily()));
+    }
+    return families;
+  }
+
+  private Scanner getFilteredScanner(Filter filters) {
+    // create a scanner and wrap it as an iterator, meaning you can only go forward
+    final FilteredKeyValueScanner kvScanner = new FilteredKeyValueScanner(filters, memstore);
+    // seek the scanner to initialize it
+    KeyValue start = KeyValue.createFirstOnRow(update.getRow());
+    try {
+      if (!kvScanner.seek(start)) {
+        return new EmptyScanner();
+      }
+    } catch (IOException e) {
+      // This should never happen - everything should explode if so.
+      throw new RuntimeException(
+          "Failed to seek to first key from update on the memstore scanner!", e);
+    }
+
+    // we have some info in the scanner, so wrap it in an iterator and return.
+    return new Scanner() {
+
+      @Override
+      public KeyValue next() {
+        try {
+          return kvScanner.next();
+        } catch (IOException e) {
+          throw new RuntimeException("Error reading kvs from local memstore!");
+        }
+      }
+
+      @Override
+      public boolean seek(KeyValue next) throws IOException {
+        // check to see if the next kv is after the current key, in which case we can use reseek,
+        // which will be more efficient
+        KeyValue peek = kvScanner.peek();
+        // there is another value and its before the requested one - we can do a reseek!
+        if (peek != null) {
+          int compare = KeyValue.COMPARATOR.compare(peek, next);
+          if (compare < 0) {
+            return kvScanner.reseek(next);
+          } else if (compare == 0) {
+            // we are already at the given key!
+            return true;
+          }
+        }
+        return kvScanner.seek(next);
+      }
+
+      @Override
+      public KeyValue peek() throws IOException {
+        return kvScanner.peek();
+      }
+
+      @Override
+      public void close() {
+        kvScanner.close();
+      }
+    };
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
new file mode 100644
index 0000000..0c06e2b
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CachingHTableFactory.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.table;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s.
+ * When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as
+ * last time, or it may be a new table.
+ * <p>
+ * You <b>should not call {@link HTableInterface#close()} </b> that is handled when the table goes
+ * out of scope. Along the same lines, you must ensure to not keep a reference to the table for
+ * longer than necessary - this leak will ensure that the table never gets closed.
+ */
+public class CachingHTableFactory implements HTableFactory {
+
+  /**
+   * LRUMap that closes the {@link HTableInterface} when the table is evicted
+   */
+  @SuppressWarnings("serial")
+  public class HTableInterfaceLRUMap extends LRUMap {
+
+    public HTableInterfaceLRUMap(int cacheSize) {
+      super(cacheSize);
+    }
+
+    @Override
+    protected boolean removeLRU(LinkEntry entry) {
+      HTableInterface table = (HTableInterface) entry.getValue();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closing connection to table: " + Bytes.toString(table.getTableName())
+            + " because it was evicted from the cache.");
+      }
+      try {
+        table.close();
+      } catch (IOException e) {
+        LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName())
+            + " ignoring since being removed from queue.");
+      }
+      return true;
+    }
+  }
+
+  public static int getCacheSize(Configuration conf) {
+    return conf.getInt(CACHE_SIZE_KEY, DEFAULT_CACHE_SIZE);
+  }
+
+  private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class);
+  private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size";
+  private static final int DEFAULT_CACHE_SIZE = 10;
+
+  private HTableFactory delegate;
+
+  @SuppressWarnings("rawtypes")
+  Map openTables;
+
+  public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) {
+    this(tableFactory, getCacheSize(conf));
+  }
+
+  public CachingHTableFactory(HTableFactory factory, int cacheSize) {
+    this.delegate = factory;
+    openTables = new HTableInterfaceLRUMap(cacheSize);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+    ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename);
+    synchronized (openTables) {
+      HTableInterface table = (HTableInterface) openTables.get(tableBytes);
+      if (table == null) {
+        table = delegate.getTable(tablename);
+        openTables.put(tableBytes, table);
+      }
+      return table;
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    this.delegate.shutdown();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
new file mode 100644
index 0000000..33559da
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/CoprocessorHTableFactory.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+public class CoprocessorHTableFactory implements HTableFactory {
+
+  /** Number of milliseconds per-interval to retry zookeeper */
+  private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill";
+  /** Number of retries for zookeeper */
+  private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
+  private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
+  private CoprocessorEnvironment e;
+
+  public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+    this.e = e;
+  }
+
+  @Override
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+    Configuration conf = e.getConfiguration();
+    // make sure writers fail fast
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
+    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
+    }
+    return this.e.getTable(tablename.copyBytesIfNecessary());
+  }
+
+  @Override
+  public void shutdown() {
+    // noop
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
new file mode 100644
index 0000000..bef3d34
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public interface HTableFactory {
+
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException;
+
+  public void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java
new file mode 100644
index 0000000..b6d8d8e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/table/HTableInterfaceReference.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.table;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Reference to an HTableInterface. Currently, its pretty simple in that it is just a wrapper around
+ * the table name.
+ */
+public class HTableInterfaceReference {
+
+  private ImmutableBytesPtr tableName;
+
+
+  public HTableInterfaceReference(ImmutableBytesPtr tableName) {
+    this.tableName = tableName;
+  }
+
+  public ImmutableBytesPtr get() {
+    return this.tableName;
+  }
+
+  public String getTableName() {
+    return Bytes.toString(this.tableName.get(),this.tableName.getOffset(), this.tableName.getLength());
+  }
+
+  @Override
+  public int hashCode() {
+      return tableName.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      HTableInterfaceReference other = (HTableInterfaceReference)obj;
+      return tableName.equals(other.tableName);
+  }
+
+  @Override
+  public String toString() {
+    return Bytes.toString(this.tableName.get());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
new file mode 100644
index 0000000..9825c77
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/ImmutableBytesPtr.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.util;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class ImmutableBytesPtr extends ImmutableBytesWritable {
+    private int hashCode;
+    
+    public ImmutableBytesPtr() {
+    }
+
+    public ImmutableBytesPtr(byte[] bytes) {
+        super(bytes);
+        hashCode = super.hashCode();
+    }
+
+    public ImmutableBytesPtr(ImmutableBytesWritable ibw) {
+        super(ibw.get(), ibw.getOffset(), ibw.getLength());
+        hashCode = super.hashCode();
+    }
+
+    public ImmutableBytesPtr(ImmutableBytesPtr ibp) {
+        super(ibp.get(), ibp.getOffset(), ibp.getLength());
+        hashCode = ibp.hashCode;
+    }
+
+    public ImmutableBytesPtr(byte[] bytes, int offset, int length) {
+        super(bytes, offset, length);
+        hashCode = super.hashCode();
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        ImmutableBytesPtr that = (ImmutableBytesPtr)obj;
+        if (this.hashCode != that.hashCode) return false;
+        if (Bytes.compareTo(this.get(), this.getOffset(), this.getLength(), that.get(), that.getOffset(), that.getLength()) != 0) return false;
+        return true;
+    }
+
+    public void set(ImmutableBytesWritable ptr) {
+        set(ptr.get(),ptr.getOffset(),ptr.getLength());
+      }
+
+    /**
+     * @param b Use passed bytes as backing array for this instance.
+     */
+    @Override
+    public void set(final byte [] b) {
+      super.set(b);
+      hashCode = super.hashCode();
+    }
+
+    /**
+     * @param b Use passed bytes as backing array for this instance.
+     * @param offset
+     * @param length
+     */
+    @Override
+    public void set(final byte [] b, final int offset, final int length) {
+        super.set(b,offset,length);
+        hashCode = super.hashCode();
+    }
+
+    @Override
+    public void readFields(final DataInput in) throws IOException {
+        super.readFields(in);
+        hashCode = super.hashCode();
+    }
+    
+    /**
+     * @return the backing byte array, copying only if necessary
+     */
+    public byte[] copyBytesIfNecessary() {
+    return copyBytesIfNecessary(this);
+    }
+
+  public static byte[] copyBytesIfNecessary(ImmutableBytesWritable ptr) {
+    if (ptr.getOffset() == 0 && ptr.getLength() == ptr.get().length) {
+      return ptr.get();
+    }
+    return ptr.copyBytes();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
new file mode 100644
index 0000000..76ec9ce
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/util/IndexManagementUtil.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.util;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.collect.Maps;
+import org.apache.phoenix.hbase.index.ValueGetter;
+import org.apache.phoenix.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.phoenix.hbase.index.covered.data.LazyValueGetter;
+import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.scanner.Scanner;
+
+/**
+ * Utility class to help manage indexes
+ */
+public class IndexManagementUtil {
+
+    private IndexManagementUtil() {
+        // private ctor for util classes
+    }
+
+    // Don't rely on statically defined classes constants from classes that may not exist
+    // in earlier HBase versions
+    public static final String INDEX_WAL_EDIT_CODEC_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec";
+    public static final String HLOG_READER_IMPL_KEY = "hbase.regionserver.hlog.reader.impl";
+    public static final String WAL_EDIT_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
+
+    private static final String INDEX_HLOG_READER_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedHLogReader";
+    private static final Log LOG = LogFactory.getLog(IndexManagementUtil.class);
+
+    public static boolean isWALEditCodecSet(Configuration conf) {
+        // check to see if the WALEditCodec is installed
+        try {
+            // Use reflection to load the IndexedWALEditCodec, since it may not load with an older version
+            // of HBase
+            Class.forName(INDEX_WAL_EDIT_CODEC_CLASS_NAME);
+        } catch (Throwable t) {
+            return false;
+        }
+        if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) {
+            // its installed, and it can handle compression and non-compression cases
+            return true;
+        }
+        return false;
+    }
+
+    public static void ensureMutableIndexingCorrectlyConfigured(Configuration conf) throws IllegalStateException {
+
+        // check to see if the WALEditCodec is installed
+        if (isWALEditCodecSet(conf)) { return; }
+
+        // otherwise, we have to install the indexedhlogreader, but it cannot have compression
+        String codecClass = INDEX_WAL_EDIT_CODEC_CLASS_NAME;
+        String indexLogReaderName = INDEX_HLOG_READER_CLASS_NAME;
+        try {
+            // Use reflection to load the IndexedHLogReader, since it may not load with an older version
+            // of HBase
+            Class.forName(indexLogReaderName);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException(codecClass + " is not installed, but "
+                    + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+        }
+        if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
+            if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
+                    "WAL Compression is only supported with " + codecClass
+                            + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); }
+        } else {
+            throw new IllegalStateException(codecClass + " is not installed, but "
+                    + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+        }
+
+    }
+
+    public static ValueGetter createGetterFromKeyValues(Collection<KeyValue> pendingUpdates) {
+        final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
+                .size());
+        for (KeyValue kv : pendingUpdates) {
+            // create new pointers to each part of the kv
+            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
+            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getBuffer(), kv.getQualifierOffset(),
+                    kv.getQualifierLength());
+            ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+            valueMap.put(new ReferencingColumn(family, qual), value);
+        }
+        return new ValueGetter() {
+            @Override
+            public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+                return valueMap.get(ReferencingColumn.wrap(ref));
+            }
+        };
+    }
+
+    private static class ReferencingColumn {
+        ImmutableBytesPtr family;
+        ImmutableBytesPtr qual;
+
+        static ReferencingColumn wrap(ColumnReference ref) {
+            ImmutableBytesPtr family = new ImmutableBytesPtr(ref.getFamily());
+            ImmutableBytesPtr qual = new ImmutableBytesPtr(ref.getQualifier());
+            return new ReferencingColumn(family, qual);
+        }
+
+        public ReferencingColumn(ImmutableBytesPtr family, ImmutableBytesPtr qual) {
+            this.family = family;
+            this.qual = qual;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((family == null) ? 0 : family.hashCode());
+            result = prime * result + ((qual == null) ? 0 : qual.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            ReferencingColumn other = (ReferencingColumn)obj;
+            if (family == null) {
+                if (other.family != null) return false;
+            } else if (!family.equals(other.family)) return false;
+            if (qual == null) {
+                if (other.qual != null) return false;
+            } else if (!qual.equals(other.qual)) return false;
+            return true;
+        }
+    }
+
+    public static ValueGetter createGetterFromScanner(Scanner scanner, byte[] currentRow) {
+        return new LazyValueGetter(scanner, currentRow);
+    }
+
+    /**
+     * check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+     * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+     * small number of columns, versus the number of kvs in any one batch.
+     */
+    public static boolean updateMatchesColumns(Collection<KeyValue> update, List<ColumnReference> columns) {
+        // check to see if the kvs in the new update even match any of the columns requested
+        // assuming that for any index, there are going to small number of columns, versus the number of
+        // kvs in any one batch.
+        boolean matches = false;
+        outer: for (KeyValue kv : update) {
+            for (ColumnReference ref : columns) {
+                if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+                    matches = true;
+                    // if a single column matches a single kv, we need to build a whole scanner
+                    break outer;
+                }
+            }
+        }
+        return matches;
+    }
+
+    /**
+     * Check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+     * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+     * small number of kvs, versus the number of columns in any one batch.
+     * <p>
+     * This employs the same logic as {@link #updateMatchesColumns(Collection, List)}, but is flips the iteration logic
+     * to search columns before kvs.
+     */
+    public static boolean columnMatchesUpdate(List<ColumnReference> columns, Collection<KeyValue> update) {
+        boolean matches = false;
+        outer: for (ColumnReference ref : columns) {
+            for (KeyValue kv : update) {
+                if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+                    matches = true;
+                    // if a single column matches a single kv, we need to build a whole scanner
+                    break outer;
+                }
+            }
+        }
+        return matches;
+    }
+
+    public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) {
+        Scan s = new Scan();
+        s.setRaw(true);
+        // add the necessary columns to the scan
+        for (Iterable<? extends ColumnReference> refs : refsArray) {
+            for (ColumnReference ref : refs) {
+                s.addFamily(ref.getFamily());
+            }
+        }
+        s.setMaxVersions();
+        return s;
+    }
+
+    /**
+     * Propagate the given failure as a generic {@link IOException}, if it isn't already
+     * 
+     * @param e
+     *            reason indexing failed. If ,tt>null</tt>, throws a {@link NullPointerException}, which should unload
+     *            the coprocessor.
+     */
+    public static void rethrowIndexingException(Throwable e) throws IOException {
+        try {
+            throw e;
+        } catch (IOException e1) {
+            LOG.info("Rethrowing " + e);
+            throw e1;
+        } catch (Throwable e1) {
+            LOG.info("Rethrowing " + e1 + " as a " + IndexBuildingFailureException.class.getSimpleName());
+            throw new IndexBuildingFailureException("Failed to build index for unexpected reason!", e1);
+        }
+    }
+
+    public static void setIfNotSet(Configuration conf, String key, int value) {
+        if (conf.get(key) == null) {
+            conf.setInt(key, value);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
new file mode 100644
index 0000000..253fd0d
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/IndexedKeyValue.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.wal;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+public class IndexedKeyValue extends KeyValue {
+    private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutation) {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + indexTableName.hashCode();
+        result = prime * result + Arrays.hashCode(mutation.getRow());
+        return result;
+    }
+
+    private ImmutableBytesPtr indexTableName;
+    private Mutation mutation;
+    // optimization check to ensure that batches don't get replayed to the index more than once
+    private boolean batchFinished = false;
+    private int hashCode;
+
+    public IndexedKeyValue() {}
+
+    public IndexedKeyValue(byte[] bs, Mutation mutation) {
+        this.indexTableName = new ImmutableBytesPtr(bs);
+        this.mutation = mutation;
+        this.hashCode = calcHashCode(indexTableName, mutation);
+    }
+
+    public byte[] getIndexTable() {
+        return this.indexTableName.get();
+    }
+
+    public Mutation getMutation() {
+        return mutation;
+    }
+
+    /**
+     * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it
+     * isn't replayed via the normal replay mechanism
+     */
+    @Override
+    public boolean matchingFamily(final byte[] family) {
+        return Bytes.equals(family, HLog.METAFAMILY);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation;
+    }
+
+    /**
+     * This is a very heavy-weight operation and should only be done when absolutely necessary - it does a full
+     * serialization of the underyling mutation to compare the underlying data.
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if(obj == null) return false;
+        if (this == obj) return true;
+        if (getClass() != obj.getClass()) return false;
+        IndexedKeyValue other = (IndexedKeyValue)obj;
+        if (hashCode() != other.hashCode()) return false;
+        if (!other.indexTableName.equals(this.indexTableName)) return false;
+        byte[] current = this.getMutationBytes();
+        byte[] otherMutation = other.getMutationBytes();
+        return Bytes.equals(current, otherMutation);
+    }
+
+    private byte[] getMutationBytes() {
+        ByteArrayOutputStream bos = null;
+        try {
+            bos = new ByteArrayOutputStream();
+            this.mutation.write(new DataOutputStream(bos));
+            bos.flush();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+        } finally {
+            if (bos != null) {
+                try {
+                    bos.close();
+                } catch (IOException e) {
+                    throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        KeyValueCodec.write(out, this);
+    }
+
+    /**
+     * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
+     * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
+     * {@link IndexedKeyValue}s.
+     * 
+     * @param out
+     *            to write data to. Does not close or flush the passed object.
+     * @throws IOException
+     *             if there is a problem writing the underlying data
+     */
+    void writeData(DataOutput out) throws IOException {
+        Bytes.writeByteArray(out, this.indexTableName.get());
+        out.writeUTF(this.mutation.getClass().getName());
+        this.mutation.write(out);
+    }
+
+    /**
+     * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the
+     * complement to {@link #writeData(DataOutput)}.
+     */
+    @SuppressWarnings("javadoc")
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
+        Class<? extends Mutation> clazz;
+        try {
+            clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class);
+            this.mutation = clazz.newInstance();
+            this.mutation.readFields(in);
+            this.hashCode = calcHashCode(indexTableName, mutation);
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        } catch (InstantiationException e) {
+            throw new IOException(e);
+        } catch (IllegalAccessException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public boolean getBatchFinished() {
+        return this.batchFinished;
+    }
+
+    public void markBatchFinished() {
+        this.batchFinished = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
new file mode 100644
index 0000000..3815937
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/wal/KeyValueCodec.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.hbase.index.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
+ */
+public class KeyValueCodec {
+
+  /**
+   * KeyValue length marker specifying that its actually an {@link IndexedKeyValue} rather than a
+   * regular {@link KeyValue}.
+   */
+  public static final int INDEX_TYPE_LENGTH_MARKER = -1;
+
+  /**
+   * Read a {@link List} of {@link KeyValue} from the input stream - may contain regular
+   * {@link KeyValue}s or {@link IndexedKeyValue}s.
+   * @param in to read from
+   * @return the next {@link KeyValue}s
+   * @throws IOException if the next {@link KeyValue} cannot be read
+   */
+  public static List<KeyValue> readKeyValues(DataInput in) throws IOException {
+    int size = in.readInt();
+    if (size == 0) {
+      return Collections.<KeyValue>emptyList();
+    }
+    List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+    for (int i = 0; i < size; i++) {
+      kvs.add(readKeyValue(in));
+    }
+    return kvs;
+  }
+
+  /**
+   * Read a single {@link KeyValue} from the input stream - may either be a regular {@link KeyValue}
+   * or an {@link IndexedKeyValue}.
+   * @param in to read from
+   * @return the next {@link KeyValue}, if one is available
+   * @throws IOException if the next {@link KeyValue} cannot be read
+   */
+  public static KeyValue readKeyValue(DataInput in) throws IOException {
+    int length = in.readInt();
+    KeyValue kv;
+    // its a special IndexedKeyValue
+    if (length == INDEX_TYPE_LENGTH_MARKER) {
+      kv = new IndexedKeyValue();
+      kv.readFields(in);
+    } else {
+      kv = new KeyValue();
+      kv.readFields(length, in);
+    }
+    return kv;
+  }
+
+  /**
+   * Write a {@link KeyValue} or an {@link IndexedKeyValue} to the output stream. These can be read
+   * back via {@link #readKeyValue(DataInput)} or {@link #readKeyValues(DataInput)}.
+   * @param out to write to
+   * @param kv {@link KeyValue} to which to write
+   * @throws IOException if there is an error writing
+   */
+  public static void write(DataOutput out, KeyValue kv) throws IOException {
+    if (kv instanceof IndexedKeyValue) {
+      out.writeInt(INDEX_TYPE_LENGTH_MARKER);
+      ((IndexedKeyValue) kv).writeData(out);
+    } else {
+      kv.write(out);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
new file mode 100644
index 0000000..d7fef5e
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexCommitter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.phoenix.hbase.index.exception.IndexWriteException;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write the index updates to the index tables
+ */
+public interface IndexCommitter extends Stoppable {
+
+  void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
+
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws IndexWriteException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
new file mode 100644
index 0000000..5964647
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexFailurePolicy.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Handle failures to write to the index tables.
+ */
+public interface IndexFailurePolicy extends Stoppable {
+
+  public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
+
+  /**
+   * Handle the failure of the attempted index updates
+   * @param attempted map of index table -> mutations to apply
+   * @param cause reason why there was a failure
+ * @throws IOException 
+   */
+  public void
+      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
new file mode 100644
index 0000000..30797b2
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriter.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.phoenix.hbase.index.exception.IndexWriteException;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
+ * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
+ * <p>
+ * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
+ * threads, so it will not block the region from shutting down.
+ */
+public class IndexWriter implements Stoppable {
+
+  private static final Log LOG = LogFactory.getLog(IndexWriter.class);
+  private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
+  public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private IndexCommitter writer;
+  private IndexFailurePolicy failurePolicy;
+
+  /**
+   * @throws IOException if the {@link IndexWriter} or {@link IndexFailurePolicy} cannot be
+   *           instantiated
+   */
+  public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
+    this(getCommitter(env), getFailurePolicy(env), env, name);
+  }
+
+  public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
+    Configuration conf = env.getConfiguration();
+    try {
+      IndexCommitter committer =
+          conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
+            IndexCommitter.class).newInstance();
+      return committer;
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static IndexFailurePolicy getFailurePolicy(RegionCoprocessorEnvironment env)
+      throws IOException {
+    Configuration conf = env.getConfiguration();
+    try {
+      IndexFailurePolicy committer =
+          conf.getClass(INDEX_FAILURE_POLICY_CONF_KEY, KillServerOnFailurePolicy.class,
+            IndexFailurePolicy.class).newInstance();
+      return committer;
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
+   * to be fully setup before calling.
+   * @param committer
+   * @param policy
+   * @param env
+   */
+  public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
+      RegionCoprocessorEnvironment env, String name) {
+    this(committer, policy);
+    this.writer.setup(this, env, name);
+    this.failurePolicy.setup(this, env);
+  }
+
+  /**
+   * Create an {@link IndexWriter} with an already setup {@link IndexCommitter} and
+   * {@link IndexFailurePolicy}.
+   * @param committer to write updates
+   * @param policy to handle failures
+   */
+  IndexWriter(IndexCommitter committer, IndexFailurePolicy policy) {
+    this.writer = committer;
+    this.failurePolicy = policy;
+  }
+  
+  /**
+   * Write the mutations to their respective table.
+   * <p>
+   * This method is blocking and could potentially cause the writer to block for a long time as we
+   * write the index updates. When we return depends on the specified {@link IndexCommitter}.
+   * <p>
+   * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
+   * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
+   * which ensures that the server crashes when an index write fails, ensuring that we get WAL
+   * replay of the index edits.
+   * @param indexUpdates Updates to write
+ * @throws IOException 
+   */
+  public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException  {
+    // convert the strings to htableinterfaces to which we can talk and group by TABLE
+    Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
+    writeAndKillYourselfOnFailure(toWrite);
+  }
+
+  /**
+   * see {@link #writeAndKillYourselfOnFailure(Collection)}.
+   * @param toWrite
+ * @throws IOException 
+   */
+  public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
+    try {
+      write(toWrite);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Done writing all index updates!\n\t" + toWrite);
+      }
+    } catch (Exception e) {
+      this.failurePolicy.handleFailure(toWrite, e);
+    }
+  }
+
+  /**
+   * Write the mutations to their respective table.
+   * <p>
+   * This method is blocking and could potentially cause the writer to block for a long time as we
+   * write the index updates. We only return when either:
+   * <ol>
+   * <li>All index writes have returned, OR</li>
+   * <li>Any single index write has failed</li>
+   * </ol>
+   * We attempt to quickly determine if any write has failed and not write to the remaining indexes
+   * to ensure a timely recovery of the failed index writes.
+   * @param toWrite Updates to write
+   * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
+   *           stop early depends on the {@link IndexCommitter}.
+   */
+  public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
+    write(resolveTableReferences(toWrite));
+  }
+
+  /**
+   * see {@link #write(Collection)}
+   * @param toWrite
+   * @throws IndexWriteException
+   */
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws IndexWriteException {
+    this.writer.write(toWrite);
+  }
+
+
+  /**
+   * Convert the passed index updates to {@link HTableInterfaceReference}s.
+   * @param indexUpdates from the index builder
+   * @return pairs that can then be written by an {@link IndexWriter}.
+   */
+  public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+      Collection<Pair<Mutation, byte[]>> indexUpdates) {
+    Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
+        .<HTableInterfaceReference, Mutation> create();
+    // simple map to make lookups easy while we build the map of tables to create
+    Map<ImmutableBytesPtr, HTableInterfaceReference> tables =
+        new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(updates.size());
+    for (Pair<Mutation, byte[]> entry : indexUpdates) {
+      byte[] tableName = entry.getSecond();
+      ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
+      HTableInterfaceReference table = tables.get(ptr);
+      if (table == null) {
+        table = new HTableInterfaceReference(ptr);
+        tables.put(ptr, table);
+      }
+      updates.put(table, entry.getFirst());
+    }
+
+    return updates;
+  }
+
+  @Override
+  public void stop(String why) {
+    if (!this.stopped.compareAndSet(false, true)) {
+      // already stopped
+      return;
+    }
+    LOG.debug("Stopping because " + why);
+    this.writer.stop(why);
+    this.failurePolicy.stop(why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped.get();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
new file mode 100644
index 0000000..db95970
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/IndexWriterUtils.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+
+import org.apache.phoenix.hbase.index.table.CoprocessorHTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+public class IndexWriterUtils {
+
+  private static final Log LOG = LogFactory.getLog(IndexWriterUtils.class);
+
+  /**
+   * Maximum number of threads to allow per-table when writing. Each writer thread (from
+   * {@link IndexWriterUtils#NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY}) has a single HTable.
+   * However, each table is backed by a threadpool to manage the updates to that table. this
+   * specifies the number of threads to allow in each of those tables. Generally, you shouldn't need
+   * to change this, unless you have a small number of indexes to which most of the writes go.
+   * Defaults to: {@value #DEFAULT_NUM_PER_TABLE_THREADS}.
+   * <p>
+   * For tables to which there are not a lot of writes, the thread pool automatically will decrease
+   * the number of threads to one (though it can burst up to the specified max for any given table),
+   * so increasing this to meet the max case is reasonable.
+   * <p>
+   * Setting this value too small can cause <b>catastrophic cluster failure</b>. The way HTable's
+   * underlying pool works is such that is does direct hand-off of tasks to threads. This works fine
+   * because HTables are assumed to work in a single-threaded context, so we never get more threads
+   * than regionservers. In a multi-threaded context, we can easily grow to more than that number of
+   * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the
+   * coprocesor hooks, so we can't modify this behavior.
+   */
+  private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
+      "index.writer.threads.pertable.max";
+  private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
+
+  /** Configuration key that HBase uses to set the max number of threads for an HTable */
+  public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
+  private IndexWriterUtils() {
+    // private ctor for utilites
+  }
+
+  public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
+    // create a simple delegate factory, setup the way we need
+    Configuration conf = env.getConfiguration();
+    // set the number of threads allowed per table.
+    int htableThreads =
+        conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+    LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
+    IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+    return new CoprocessorHTableFactory(env);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
new file mode 100644
index 0000000..0b84cdf
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/KillServerOnFailurePolicy.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Naive failure policy - kills the server on which it resides
+ */
+public class KillServerOnFailurePolicy implements IndexFailurePolicy {
+
+  private static final Log LOG = LogFactory.getLog(KillServerOnFailurePolicy.class);
+  private Abortable abortable;
+  private Stoppable stoppable;
+
+  @Override
+  public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+    setup(parent, env.getRegionServerServices());
+  }
+
+  public void setup(Stoppable parent, Abortable abort) {
+    this.stoppable = parent;
+    this.abortable = abort;
+  }
+
+  @Override
+  public void stop(String why) {
+    // noop
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stoppable.isStopped();
+  }
+
+  @Override
+  public void
+      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+    // cleanup resources
+    this.stop("Killing ourselves because of an error:" + cause);
+    // notify the regionserver of the failure
+    String msg =
+        "Could not update the index table, killing server region because couldn't write to an index table";
+    LOG.error(msg, cause);
+    try {
+      this.abortable.abort(msg, cause);
+    } catch (Exception e) {
+      LOG.fatal("Couldn't abort this server to preserve index writes, "
+          + "attempting to hard kill the server");
+      System.exit(1);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/bbacf6e0/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..55695ff
--- /dev/null
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.phoenix.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.phoenix.hbase.index.parallel.EarlyExitFailure;
+import org.apache.phoenix.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.phoenix.hbase.index.parallel.Task;
+import org.apache.phoenix.hbase.index.parallel.TaskBatch;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager;
+import org.apache.phoenix.hbase.index.table.CachingHTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableFactory;
+import org.apache.phoenix.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
+ * any of the index updates fails. Completion is determined by the following criteria: *
+ * <ol>
+ * <li>All index writes have returned, OR</li>
+ * <li>Any single index write has failed</li>
+ * </ol>
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
+ * ensure a timely recovery of the failed index writes.
+ */
+public class ParallelWriterIndexCommitter implements IndexCommitter {
+
+  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+      "index.writer.threads.keepalivetime";
+  private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+
+  private HTableFactory factory;
+  private Stoppable stopped;
+  private QuickFailingTaskRunner pool;
+
+  @Override
+  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+    Configuration conf = env.getConfiguration();
+    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+      ThreadPoolManager.getExecutor(
+        new ThreadPoolBuilder(name, conf).
+          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+  }
+
+  /**
+   * Setup <tt>this</tt>.
+   * <p>
+   * Exposed for TESTING
+   */
+  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+      int cacheSize) {
+    this.factory = new CachingHTableFactory(factory, cacheSize);
+    this.pool = new QuickFailingTaskRunner(pool);
+    this.stopped = stop;
+  }
+
+  @Override
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws SingleIndexWriteFailureException {
+    /*
+     * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
+     * writes in parallel to each index table, so each table gets its own task and is submitted to
+     * the pool. Where it gets tricky is that we want to block the calling thread until one of two
+     * things happens: (1) all index tables get successfully updated, or (2) any one of the index
+     * table writes fail; in either case, we should return as quickly as possible. We get a little
+     * more complicated in that if we do get a single failure, but any of the index writes hasn't
+     * been started yet (its been queued up, but not submitted to a thread) we want to that task to
+     * fail immediately as we know that write is a waste and will need to be replayed anyways.
+     */
+
+    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+    TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+      // get the mutations for each table. We leak the implementation here a little bit to save
+      // doing a complete copy over of all the index update for each table.
+      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+      final HTableInterfaceReference tableReference = entry.getKey();
+      /*
+       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+       * running thread. The former will only work if we are not in the midst of writing the current
+       * batch to the table, though we do check these status variables before starting and before
+       * writing the batch. The latter usage, interrupting the thread, will work in the previous
+       * situations as was at some points while writing the batch, depending on the underlying
+       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+       * supports an interrupt).
+       */
+      tasks.add(new Task<Void>() {
+
+        /**
+         * Do the actual write to the primary table. We don't need to worry about closing the table
+         * because that is handled the {@link CachingHTableFactory}.
+         */
+        @Override
+        public Void call() throws Exception {
+          // this may have been queued, so another task infront of us may have failed, so we should
+          // early exit, if that's the case
+          throwFailureIfDone();
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+          }
+          try {
+            HTableInterface table = factory.getTable(tableReference.get());
+            throwFailureIfDone();
+            table.batch(mutations);
+          } catch (SingleIndexWriteFailureException e) {
+            throw e;
+          } catch (IOException e) {
+            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+          } catch (InterruptedException e) {
+            // reset the interrupt status on the thread
+            Thread.currentThread().interrupt();
+            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+          }
+          return null;
+        }
+
+        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+          if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
+            throw new SingleIndexWriteFailureException(
+                "Pool closed, not attempting to write to the index!", null);
+          }
+
+        }
+      });
+    }
+
+    // actually submit the tasks to the pool and wait for them to finish/fail
+    try {
+      pool.submitUninterruptible(tasks);
+    } catch (EarlyExitFailure e) {
+      propagateFailure(e);
+    } catch (ExecutionException e) {
+      LOG.error("Found a failed index update!");
+      propagateFailure(e.getCause());
+    }
+
+  }
+
+  private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+    try {
+      throw throwable;
+    } catch (SingleIndexWriteFailureException e1) {
+      throw e1;
+    } catch (Throwable e1) {
+      throw new SingleIndexWriteFailureException(
+          "Got an abort notification while writing to the index!", e1);
+    }
+
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
+   * by the external {@link Stoppable}. This call does not delegate the stop down to the
+   * {@link Stoppable} passed in the constructor.
+   * @param why the reason for stopping
+   */
+  @Override
+  public void stop(String why) {
+    LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+    this.pool.stop(why);
+    this.factory.shutdown();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped.isStopped();
+  }
+}
\ No newline at end of file