You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:44 UTC

[43/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java b/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java
new file mode 100644
index 0000000..acb78be
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/CachingHTableFactory.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.table;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * A simple cache that just uses usual GC mechanisms to cleanup unused {@link HTableInterface}s.
+ * When requesting an {@link HTableInterface} via {@link #getTable}, you may get the same table as
+ * last time, or it may be a new table.
+ * <p>
+ * You <b>should not call {@link HTableInterface#close()} </b> that is handled when the table goes
+ * out of scope. Along the same lines, you must ensure to not keep a reference to the table for
+ * longer than necessary - this leak will ensure that the table never gets closed.
+ */
+public class CachingHTableFactory implements HTableFactory {
+
+  /**
+   * LRUMap that closes the {@link HTableInterface} when the table is evicted
+   */
+  @SuppressWarnings("serial")
+  public class HTableInterfaceLRUMap extends LRUMap {
+
+    public HTableInterfaceLRUMap(int cacheSize) {
+      super(cacheSize);
+    }
+
+    @Override
+    protected boolean removeLRU(LinkEntry entry) {
+      HTableInterface table = (HTableInterface) entry.getValue();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Closing connection to table: " + Bytes.toString(table.getTableName())
+            + " because it was evicted from the cache.");
+      }
+      try {
+        table.close();
+      } catch (IOException e) {
+        LOG.info("Failed to correctly close HTable: " + Bytes.toString(table.getTableName())
+            + " ignoring since being removed from queue.");
+      }
+      return true;
+    }
+  }
+
+  public static int getCacheSize(Configuration conf) {
+    return conf.getInt(CACHE_SIZE_KEY, DEFAULT_CACHE_SIZE);
+  }
+
+  private static final Log LOG = LogFactory.getLog(CachingHTableFactory.class);
+  private static final String CACHE_SIZE_KEY = "index.tablefactory.cache.size";
+  private static final int DEFAULT_CACHE_SIZE = 10;
+
+  private HTableFactory delegate;
+
+  @SuppressWarnings("rawtypes")
+  Map openTables;
+
+  public CachingHTableFactory(HTableFactory tableFactory, Configuration conf) {
+    this(tableFactory, getCacheSize(conf));
+  }
+
+  public CachingHTableFactory(HTableFactory factory, int cacheSize) {
+    this.delegate = factory;
+    openTables = new HTableInterfaceLRUMap(cacheSize);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+    ImmutableBytesPtr tableBytes = new ImmutableBytesPtr(tablename);
+    synchronized (openTables) {
+      HTableInterface table = (HTableInterface) openTables.get(tableBytes);
+      if (table == null) {
+        table = delegate.getTable(tablename);
+        openTables.put(tableBytes, table);
+      }
+      return table;
+    }
+  }
+
+  @Override
+  public void shutdown() {
+    this.delegate.shutdown();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java b/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java
new file mode 100644
index 0000000..e773105
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/CoprocessorHTableFactory.java
@@ -0,0 +1,50 @@
+package org.apache.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hbase.index.util.IndexManagementUtil;
+
+public class CoprocessorHTableFactory implements HTableFactory {
+
+  /** Number of milliseconds per-interval to retry zookeeper */
+  private static final String ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL = "zookeeper.recovery.retry.intervalmill";
+  /** Number of retries for zookeeper */
+  private static final String ZOOKEEPER_RECOVERY_RETRY_KEY = "zookeeper.recovery.retry";
+  private static final Log LOG = LogFactory.getLog(CoprocessorHTableFactory.class);
+  private CoprocessorEnvironment e;
+
+  public CoprocessorHTableFactory(CoprocessorEnvironment e) {
+    this.e = e;
+  }
+
+  @Override
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException {
+    Configuration conf = e.getConfiguration();
+    // make sure writers fail fast
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_CLIENT_PAUSE, 1000);
+    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_KEY, 3);
+    IndexManagementUtil.setIfNotSet(conf, ZOOKEEPER_RECOVERY_RETRY_INTERVALMILL, 100);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.ZK_SESSION_TIMEOUT, 30000);
+    IndexManagementUtil.setIfNotSet(conf, HConstants.HBASE_RPC_TIMEOUT_KEY, 5000);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating new HTable: " + Bytes.toString(tablename.copyBytesIfNecessary()));
+    }
+    return this.e.getTable(tablename.copyBytesIfNecessary());
+  }
+
+  @Override
+  public void shutdown() {
+    // noop
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/HTableFactory.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/HTableFactory.java b/src/main/java/org/apache/hbase/index/table/HTableFactory.java
new file mode 100644
index 0000000..f9e524f
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/HTableFactory.java
@@ -0,0 +1,14 @@
+package org.apache.hbase.index.table;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HTableInterface;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+public interface HTableFactory {
+
+  public HTableInterface getTable(ImmutableBytesPtr tablename) throws IOException;
+
+  public void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java b/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java
new file mode 100644
index 0000000..6854003
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/table/HTableInterfaceReference.java
@@ -0,0 +1,46 @@
+package org.apache.hbase.index.table;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Reference to an HTableInterface. Currently, its pretty simple in that it is just a wrapper around
+ * the table name.
+ */
+public class HTableInterfaceReference {
+
+  private ImmutableBytesPtr tableName;
+
+
+  public HTableInterfaceReference(ImmutableBytesPtr tableName) {
+    this.tableName = tableName;
+  }
+
+  public ImmutableBytesPtr get() {
+    return this.tableName;
+  }
+
+  public String getTableName() {
+    return Bytes.toString(this.tableName.get(),this.tableName.getOffset(), this.tableName.getLength());
+  }
+
+  @Override
+  public int hashCode() {
+      return tableName.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+      if (this == obj) return true;
+      if (obj == null) return false;
+      if (getClass() != obj.getClass()) return false;
+      HTableInterfaceReference other = (HTableInterfaceReference)obj;
+      return tableName.equals(other.tableName);
+  }
+
+  @Override
+  public String toString() {
+    return Bytes.toString(this.tableName.get());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java b/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java
new file mode 100644
index 0000000..3cb2609
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/util/ImmutableBytesPtr.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.util;
+
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class ImmutableBytesPtr extends ImmutableBytesWritable {
+    private int hashCode;
+    
+    public ImmutableBytesPtr() {
+    }
+
+    public ImmutableBytesPtr(byte[] bytes) {
+        super(bytes);
+        hashCode = super.hashCode();
+    }
+
+    public ImmutableBytesPtr(ImmutableBytesWritable ibw) {
+        super(ibw);
+        hashCode = super.hashCode();
+    }
+
+    public ImmutableBytesPtr(byte[] bytes, int offset, int length) {
+        super(bytes, offset, length);
+        hashCode = super.hashCode();
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) return true;
+        if (obj == null) return false;
+        if (getClass() != obj.getClass()) return false;
+        ImmutableBytesPtr that = (ImmutableBytesPtr)obj;
+        if (this.hashCode != that.hashCode) return false;
+        if (Bytes.compareTo(this.get(), this.getOffset(), this.getLength(), that.get(), that.getOffset(), that.getLength()) != 0) return false;
+        return true;
+    }
+
+    public void set(ImmutableBytesWritable ptr) {
+        set(ptr.get(),ptr.getOffset(),ptr.getLength());
+      }
+
+    /**
+     * @param b Use passed bytes as backing array for this instance.
+     */
+    @Override
+    public void set(final byte [] b) {
+      super.set(b);
+      hashCode = super.hashCode();
+    }
+
+    /**
+     * @param b Use passed bytes as backing array for this instance.
+     * @param offset
+     * @param length
+     */
+    @Override
+    public void set(final byte [] b, final int offset, final int length) {
+        super.set(b,offset,length);
+        hashCode = super.hashCode();
+    }
+
+    /**
+     * @return the backing byte array, copying only if necessary
+     */
+    public byte[] copyBytesIfNecessary() {
+        if (this.getOffset() == 0 && this.getLength() == this.get().length) {
+            return this.get();
+        }
+        return this.copyBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java b/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java
new file mode 100644
index 0000000..f4acb6f
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/util/IndexManagementUtil.java
@@ -0,0 +1,246 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.util;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
+
+import com.google.common.collect.Maps;
+import org.apache.hbase.index.ValueGetter;
+import org.apache.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.hbase.index.covered.data.LazyValueGetter;
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.scanner.Scanner;
+
+/**
+ * Utility class to help manage indexes
+ */
+public class IndexManagementUtil {
+
+    private IndexManagementUtil() {
+        // private ctor for util classes
+    }
+
+    // Don't rely on statically defined classes constants from classes that may not exist
+    // in earlier HBase versions
+    public static final String INDEX_WAL_EDIT_CODEC_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec";
+    public static final String HLOG_READER_IMPL_KEY = "hbase.regionserver.hlog.reader.impl";
+    public static final String WAL_EDIT_CODEC_CLASS_KEY = "hbase.regionserver.wal.codec";
+
+    private static final String INDEX_HLOG_READER_CLASS_NAME = "org.apache.hadoop.hbase.regionserver.wal.IndexedHLogReader";
+    private static final Log LOG = LogFactory.getLog(IndexManagementUtil.class);
+
+    public static boolean isWALEditCodecSet(Configuration conf) {
+        // check to see if the WALEditCodec is installed
+        try {
+            // Use reflection to load the IndexedWALEditCodec, since it may not load with an older version
+            // of HBase
+            Class.forName(INDEX_WAL_EDIT_CODEC_CLASS_NAME);
+        } catch (Throwable t) {
+            return false;
+        }
+        if (INDEX_WAL_EDIT_CODEC_CLASS_NAME.equals(conf.get(WAL_EDIT_CODEC_CLASS_KEY, null))) {
+            // its installed, and it can handle compression and non-compression cases
+            return true;
+        }
+        return false;
+    }
+
+    public static void ensureMutableIndexingCorrectlyConfigured(Configuration conf) throws IllegalStateException {
+
+        // check to see if the WALEditCodec is installed
+        if (isWALEditCodecSet(conf)) { return; }
+
+        // otherwise, we have to install the indexedhlogreader, but it cannot have compression
+        String codecClass = INDEX_WAL_EDIT_CODEC_CLASS_NAME;
+        String indexLogReaderName = INDEX_HLOG_READER_CLASS_NAME;
+        try {
+            // Use reflection to load the IndexedHLogReader, since it may not load with an older version
+            // of HBase
+            Class.forName(indexLogReaderName);
+        } catch (ClassNotFoundException e) {
+            throw new IllegalStateException(codecClass + " is not installed, but "
+                    + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+        }
+        if (indexLogReaderName.equals(conf.get(HLOG_READER_IMPL_KEY, indexLogReaderName))) {
+            if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { throw new IllegalStateException(
+                    "WAL Compression is only supported with " + codecClass
+                            + ". You can install in hbase-site.xml, under " + WAL_EDIT_CODEC_CLASS_KEY); }
+        } else {
+            throw new IllegalStateException(codecClass + " is not installed, but "
+                    + indexLogReaderName + " hasn't been installed in hbase-site.xml under " + HLOG_READER_IMPL_KEY);
+        }
+
+    }
+
+    public static ValueGetter createGetterFromKeyValues(Collection<KeyValue> pendingUpdates) {
+        final Map<ReferencingColumn, ImmutableBytesPtr> valueMap = Maps.newHashMapWithExpectedSize(pendingUpdates
+                .size());
+        for (KeyValue kv : pendingUpdates) {
+            // create new pointers to each part of the kv
+            ImmutableBytesPtr family = new ImmutableBytesPtr(kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength());
+            ImmutableBytesPtr qual = new ImmutableBytesPtr(kv.getBuffer(), kv.getQualifierOffset(),
+                    kv.getQualifierLength());
+            ImmutableBytesPtr value = new ImmutableBytesPtr(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength());
+            valueMap.put(new ReferencingColumn(family, qual), value);
+        }
+        return new ValueGetter() {
+            @Override
+            public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException {
+                return valueMap.get(ReferencingColumn.wrap(ref));
+            }
+        };
+    }
+
+    private static class ReferencingColumn {
+        ImmutableBytesPtr family;
+        ImmutableBytesPtr qual;
+
+        static ReferencingColumn wrap(ColumnReference ref) {
+            ImmutableBytesPtr family = new ImmutableBytesPtr(ref.getFamily());
+            ImmutableBytesPtr qual = new ImmutableBytesPtr(ref.getQualifier());
+            return new ReferencingColumn(family, qual);
+        }
+
+        public ReferencingColumn(ImmutableBytesPtr family, ImmutableBytesPtr qual) {
+            this.family = family;
+            this.qual = qual;
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = 1;
+            result = prime * result + ((family == null) ? 0 : family.hashCode());
+            result = prime * result + ((qual == null) ? 0 : qual.hashCode());
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) return true;
+            if (obj == null) return false;
+            if (getClass() != obj.getClass()) return false;
+            ReferencingColumn other = (ReferencingColumn)obj;
+            if (family == null) {
+                if (other.family != null) return false;
+            } else if (!family.equals(other.family)) return false;
+            if (qual == null) {
+                if (other.qual != null) return false;
+            } else if (!qual.equals(other.qual)) return false;
+            return true;
+        }
+    }
+
+    public static ValueGetter createGetterFromScanner(Scanner scanner, byte[] currentRow) {
+        return new LazyValueGetter(scanner, currentRow);
+    }
+
+    /**
+     * check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+     * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+     * small number of columns, versus the number of kvs in any one batch.
+     */
+    public static boolean updateMatchesColumns(Collection<KeyValue> update, List<ColumnReference> columns) {
+        // check to see if the kvs in the new update even match any of the columns requested
+        // assuming that for any index, there are going to small number of columns, versus the number of
+        // kvs in any one batch.
+        boolean matches = false;
+        outer: for (KeyValue kv : update) {
+            for (ColumnReference ref : columns) {
+                if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+                    matches = true;
+                    // if a single column matches a single kv, we need to build a whole scanner
+                    break outer;
+                }
+            }
+        }
+        return matches;
+    }
+
+    /**
+     * Check to see if the kvs in the update match any of the passed columns. Generally, this is useful to for an index
+     * codec to determine if a given update should even be indexed. This assumes that for any index, there are going to
+     * small number of kvs, versus the number of columns in any one batch.
+     * <p>
+     * This employs the same logic as {@link #updateMatchesColumns(Collection, List)}, but is flips the iteration logic
+     * to search columns before kvs.
+     */
+    public static boolean columnMatchesUpdate(List<ColumnReference> columns, Collection<KeyValue> update) {
+        boolean matches = false;
+        outer: for (ColumnReference ref : columns) {
+            for (KeyValue kv : update) {
+                if (ref.matchesFamily(kv.getFamily()) && ref.matchesQualifier(kv.getQualifier())) {
+                    matches = true;
+                    // if a single column matches a single kv, we need to build a whole scanner
+                    break outer;
+                }
+            }
+        }
+        return matches;
+    }
+
+    public static Scan newLocalStateScan(List<? extends Iterable<? extends ColumnReference>> refsArray) {
+        Scan s = new Scan();
+        s.setRaw(true);
+        // add the necessary columns to the scan
+        for (Iterable<? extends ColumnReference> refs : refsArray) {
+            for (ColumnReference ref : refs) {
+                s.addFamily(ref.getFamily());
+            }
+        }
+        s.setMaxVersions();
+        return s;
+    }
+
+    /**
+     * Propagate the given failure as a generic {@link IOException}, if it isn't already
+     * 
+     * @param e
+     *            reason indexing failed. If ,tt>null</tt>, throws a {@link NullPointerException}, which should unload
+     *            the coprocessor.
+     */
+    public static void rethrowIndexingException(Throwable e) throws IOException {
+        try {
+            throw e;
+        } catch (IOException e1) {
+            LOG.info("Rethrowing " + e);
+            throw e1;
+        } catch (Throwable e1) {
+            LOG.info("Rethrowing " + e1 + " as a " + IndexBuildingFailureException.class.getSimpleName());
+            throw new IndexBuildingFailureException("Failed to build index for unexpected reason!", e1);
+        }
+    }
+
+    public static void setIfNotSet(Configuration conf, String key, int value) {
+        if (conf.get(key) == null) {
+            conf.setInt(key, value);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java b/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java
new file mode 100644
index 0000000..ca46fce
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/wal/IndexedKeyValue.java
@@ -0,0 +1,155 @@
+package org.apache.hbase.index.wal;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+public class IndexedKeyValue extends KeyValue {
+    private static int calcHashCode(ImmutableBytesPtr indexTableName, Mutation mutation) {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + indexTableName.hashCode();
+        result = prime * result + Arrays.hashCode(mutation.getRow());
+        return result;
+    }
+
+    private ImmutableBytesPtr indexTableName;
+    private Mutation mutation;
+    // optimization check to ensure that batches don't get replayed to the index more than once
+    private boolean batchFinished = false;
+    private int hashCode;
+
+    public IndexedKeyValue() {}
+
+    public IndexedKeyValue(byte[] bs, Mutation mutation) {
+        this.indexTableName = new ImmutableBytesPtr(bs);
+        this.mutation = mutation;
+        this.hashCode = calcHashCode(indexTableName, mutation);
+    }
+
+    public byte[] getIndexTable() {
+        return this.indexTableName.get();
+    }
+
+    public Mutation getMutation() {
+        return mutation;
+    }
+
+    /**
+     * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an {@link HLog#METAFAMILY} so it
+     * isn't replayed via the normal replay mechanism
+     */
+    @Override
+    public boolean matchingFamily(final byte[] family) {
+        return Bytes.equals(family, HLog.METAFAMILY);
+    }
+
+    @Override
+    public String toString() {
+        return "IndexWrite:\n\ttable: " + indexTableName + "\n\tmutation:" + mutation;
+    }
+
+    /**
+     * This is a very heavy-weight operation and should only be done when absolutely necessary - it does a full
+     * serialization of the underyling mutation to compare the underlying data.
+     */
+    @Override
+    public boolean equals(Object obj) {
+        if(obj == null) return false;
+        if (this == obj) return true;
+        if (getClass() != obj.getClass()) return false;
+        IndexedKeyValue other = (IndexedKeyValue)obj;
+        if (hashCode() != other.hashCode()) return false;
+        if (!other.indexTableName.equals(this.indexTableName)) return false;
+        byte[] current = this.getMutationBytes();
+        byte[] otherMutation = other.getMutationBytes();
+        return Bytes.equals(current, otherMutation);
+    }
+
+    private byte[] getMutationBytes() {
+        ByteArrayOutputStream bos = null;
+        try {
+            bos = new ByteArrayOutputStream();
+            this.mutation.write(new DataOutputStream(bos));
+            bos.flush();
+            return bos.toByteArray();
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+        } finally {
+            if (bos != null) {
+                try {
+                    bos.close();
+                } catch (IOException e) {
+                    throw new IllegalArgumentException("Failed to get bytes for mutation!", e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return hashCode;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        KeyValueCodec.write(out, this);
+    }
+
+    /**
+     * Internal write the underlying data for the entry - this does not do any special prefixing. Writing should be done
+     * via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure consistent reading/writing of
+     * {@link IndexedKeyValue}s.
+     * 
+     * @param out
+     *            to write data to. Does not close or flush the passed object.
+     * @throws IOException
+     *             if there is a problem writing the underlying data
+     */
+    void writeData(DataOutput out) throws IOException {
+        Bytes.writeByteArray(out, this.indexTableName.get());
+        out.writeUTF(this.mutation.getClass().getName());
+        this.mutation.write(out);
+    }
+
+    /**
+     * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} instead. Its the
+     * complement to {@link #writeData(DataOutput)}.
+     */
+    @SuppressWarnings("javadoc")
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.indexTableName = new ImmutableBytesPtr(Bytes.readByteArray(in));
+        Class<? extends Mutation> clazz;
+        try {
+            clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class);
+            this.mutation = clazz.newInstance();
+            this.mutation.readFields(in);
+            this.hashCode = calcHashCode(indexTableName, mutation);
+        } catch (ClassNotFoundException e) {
+            throw new IOException(e);
+        } catch (InstantiationException e) {
+            throw new IOException(e);
+        } catch (IllegalAccessException e) {
+            throw new IOException(e);
+        }
+    }
+
+    public boolean getBatchFinished() {
+        return this.batchFinished;
+    }
+
+    public void markBatchFinished() {
+        this.batchFinished = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java b/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java
new file mode 100644
index 0000000..2cdb181
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/wal/KeyValueCodec.java
@@ -0,0 +1,79 @@
+package org.apache.hbase.index.wal;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+/**
+ * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit}
+ */
+public class KeyValueCodec {
+
+  /**
+   * KeyValue length marker specifying that its actually an {@link IndexedKeyValue} rather than a
+   * regular {@link KeyValue}.
+   */
+  public static final int INDEX_TYPE_LENGTH_MARKER = -1;
+
+  /**
+   * Read a {@link List} of {@link KeyValue} from the input stream - may contain regular
+   * {@link KeyValue}s or {@link IndexedKeyValue}s.
+   * @param in to read from
+   * @return the next {@link KeyValue}s
+   * @throws IOException if the next {@link KeyValue} cannot be read
+   */
+  public static List<KeyValue> readKeyValues(DataInput in) throws IOException {
+    int size = in.readInt();
+    if (size == 0) {
+      return Collections.<KeyValue>emptyList();
+    }
+    List<KeyValue> kvs = new ArrayList<KeyValue>(size);
+    for (int i = 0; i < size; i++) {
+      kvs.add(readKeyValue(in));
+    }
+    return kvs;
+  }
+
+  /**
+   * Read a single {@link KeyValue} from the input stream - may either be a regular {@link KeyValue}
+   * or an {@link IndexedKeyValue}.
+   * @param in to read from
+   * @return the next {@link KeyValue}, if one is available
+   * @throws IOException if the next {@link KeyValue} cannot be read
+   */
+  public static KeyValue readKeyValue(DataInput in) throws IOException {
+    int length = in.readInt();
+    KeyValue kv;
+    // its a special IndexedKeyValue
+    if (length == INDEX_TYPE_LENGTH_MARKER) {
+      kv = new IndexedKeyValue();
+      kv.readFields(in);
+    } else {
+      kv = new KeyValue();
+      kv.readFields(length, in);
+    }
+    return kv;
+  }
+
+  /**
+   * Write a {@link KeyValue} or an {@link IndexedKeyValue} to the output stream. These can be read
+   * back via {@link #readKeyValue(DataInput)} or {@link #readKeyValues(DataInput)}.
+   * @param out to write to
+   * @param kv {@link KeyValue} to which to write
+   * @throws IOException if there is an error writing
+   */
+  public static void write(DataOutput out, KeyValue kv) throws IOException {
+    if (kv instanceof IndexedKeyValue) {
+      out.writeInt(INDEX_TYPE_LENGTH_MARKER);
+      ((IndexedKeyValue) kv).writeData(out);
+    } else {
+      kv.write(out);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexCommitter.java b/src/main/java/org/apache/hbase/index/write/IndexCommitter.java
new file mode 100644
index 0000000..4f2386e
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexCommitter.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.IndexWriteException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write the index updates to the index tables
+ */
+public interface IndexCommitter extends Stoppable {
+
+  void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name);
+
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws IndexWriteException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java b/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java
new file mode 100644
index 0000000..c2d4b2c
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexFailurePolicy.java
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Handle failures to write to the index tables.
+ */
+public interface IndexFailurePolicy extends Stoppable {
+
+  public void setup(Stoppable parent, RegionCoprocessorEnvironment env);
+
+  /**
+   * Handle the failure of the attempted index updates
+   * @param attempted map of index table -> mutations to apply
+   * @param cause reason why there was a failure
+ * @throws IOException 
+   */
+  public void
+      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexWriter.java b/src/main/java/org/apache/hbase/index/write/IndexWriter.java
new file mode 100644
index 0000000..53a9fd9
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexWriter.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.IndexWriteException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+/**
+ * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the
+ * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed.
+ * <p>
+ * We attempt to do the index updates in parallel using a backing threadpool. All threads are daemon
+ * threads, so it will not block the region from shutting down.
+ */
+public class IndexWriter implements Stoppable {
+
+  private static final Log LOG = LogFactory.getLog(IndexWriter.class);
+  private static final String INDEX_COMMITTER_CONF_KEY = "index.writer.commiter.class";
+  public static final String INDEX_FAILURE_POLICY_CONF_KEY = "index.writer.failurepolicy.class";
+  private AtomicBoolean stopped = new AtomicBoolean(false);
+  private IndexCommitter writer;
+  private IndexFailurePolicy failurePolicy;
+
+  /**
+   * @throws IOException if the {@link IndexWriter} or {@link IndexFailurePolicy} cannot be
+   *           instantiated
+   */
+  public IndexWriter(RegionCoprocessorEnvironment env, String name) throws IOException {
+    this(getCommitter(env), getFailurePolicy(env), env, name);
+  }
+
+  public static IndexCommitter getCommitter(RegionCoprocessorEnvironment env) throws IOException {
+    Configuration conf = env.getConfiguration();
+    try {
+      IndexCommitter committer =
+          conf.getClass(INDEX_COMMITTER_CONF_KEY, ParallelWriterIndexCommitter.class,
+            IndexCommitter.class).newInstance();
+      return committer;
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public static IndexFailurePolicy getFailurePolicy(RegionCoprocessorEnvironment env)
+      throws IOException {
+    Configuration conf = env.getConfiguration();
+    try {
+      IndexFailurePolicy committer =
+          conf.getClass(INDEX_FAILURE_POLICY_CONF_KEY, KillServerOnFailurePolicy.class,
+            IndexFailurePolicy.class).newInstance();
+      return committer;
+    } catch (InstantiationException e) {
+      throw new IOException(e);
+    } catch (IllegalAccessException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Directly specify the {@link IndexCommitter} and {@link IndexFailurePolicy}. Both are expected
+   * to be fully setup before calling.
+   * @param committer
+   * @param policy
+   * @param env
+   */
+  public IndexWriter(IndexCommitter committer, IndexFailurePolicy policy,
+      RegionCoprocessorEnvironment env, String name) {
+    this(committer, policy);
+    this.writer.setup(this, env, name);
+    this.failurePolicy.setup(this, env);
+  }
+
+  /**
+   * Create an {@link IndexWriter} with an already setup {@link IndexCommitter} and
+   * {@link IndexFailurePolicy}.
+   * @param committer to write updates
+   * @param policy to handle failures
+   */
+  IndexWriter(IndexCommitter committer, IndexFailurePolicy policy) {
+    this.writer = committer;
+    this.failurePolicy = policy;
+  }
+  
+  /**
+   * Write the mutations to their respective table.
+   * <p>
+   * This method is blocking and could potentially cause the writer to block for a long time as we
+   * write the index updates. When we return depends on the specified {@link IndexCommitter}.
+   * <p>
+   * If update fails, we pass along the failure to the installed {@link IndexFailurePolicy}, which
+   * then decides how to handle the failure. By default, we use a {@link KillServerOnFailurePolicy},
+   * which ensures that the server crashes when an index write fails, ensuring that we get WAL
+   * replay of the index edits.
+   * @param indexUpdates Updates to write
+ * @throws IOException 
+   */
+  public void writeAndKillYourselfOnFailure(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException  {
+    // convert the strings to htableinterfaces to which we can talk and group by TABLE
+    Multimap<HTableInterfaceReference, Mutation> toWrite = resolveTableReferences(indexUpdates);
+    writeAndKillYourselfOnFailure(toWrite);
+  }
+
+  /**
+   * see {@link #writeAndKillYourselfOnFailure(Collection)}.
+   * @param toWrite
+ * @throws IOException 
+   */
+  public void writeAndKillYourselfOnFailure(Multimap<HTableInterfaceReference, Mutation> toWrite) throws IOException {
+    try {
+      write(toWrite);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Done writing all index updates!\n\t" + toWrite);
+      }
+    } catch (Exception e) {
+      this.failurePolicy.handleFailure(toWrite, e);
+    }
+  }
+
+  /**
+   * Write the mutations to their respective table.
+   * <p>
+   * This method is blocking and could potentially cause the writer to block for a long time as we
+   * write the index updates. We only return when either:
+   * <ol>
+   * <li>All index writes have returned, OR</li>
+   * <li>Any single index write has failed</li>
+   * </ol>
+   * We attempt to quickly determine if any write has failed and not write to the remaining indexes
+   * to ensure a timely recovery of the failed index writes.
+   * @param toWrite Updates to write
+   * @throws IndexWriteException if we cannot successfully write to the index. Whether or not we
+   *           stop early depends on the {@link IndexCommitter}.
+   */
+  public void write(Collection<Pair<Mutation, byte[]>> toWrite) throws IndexWriteException {
+    write(resolveTableReferences(toWrite));
+  }
+
+  /**
+   * see {@link #write(Collection)}
+   * @param toWrite
+   * @throws IndexWriteException
+   */
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws IndexWriteException {
+    this.writer.write(toWrite);
+  }
+
+
+  /**
+   * Convert the passed index updates to {@link HTableInterfaceReference}s.
+   * @param indexUpdates from the index builder
+   * @return pairs that can then be written by an {@link IndexWriter}.
+   */
+  public static Multimap<HTableInterfaceReference, Mutation> resolveTableReferences(
+      Collection<Pair<Mutation, byte[]>> indexUpdates) {
+    Multimap<HTableInterfaceReference, Mutation> updates = ArrayListMultimap
+        .<HTableInterfaceReference, Mutation> create();
+    // simple map to make lookups easy while we build the map of tables to create
+    Map<ImmutableBytesPtr, HTableInterfaceReference> tables =
+        new HashMap<ImmutableBytesPtr, HTableInterfaceReference>(updates.size());
+    for (Pair<Mutation, byte[]> entry : indexUpdates) {
+      byte[] tableName = entry.getSecond();
+      ImmutableBytesPtr ptr = new ImmutableBytesPtr(tableName);
+      HTableInterfaceReference table = tables.get(ptr);
+      if (table == null) {
+        table = new HTableInterfaceReference(ptr);
+        tables.put(ptr, table);
+      }
+      updates.put(table, entry.getFirst());
+    }
+
+    return updates;
+  }
+
+  @Override
+  public void stop(String why) {
+    if (!this.stopped.compareAndSet(false, true)) {
+      // already stopped
+      return;
+    }
+    LOG.debug("Stopping because " + why);
+    this.writer.stop(why);
+    this.failurePolicy.stop(why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped.get();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java b/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java
new file mode 100644
index 0000000..5ec62db
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/IndexWriterUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+
+import org.apache.hbase.index.table.CoprocessorHTableFactory;
+import org.apache.hbase.index.table.HTableFactory;
+import org.apache.hbase.index.util.IndexManagementUtil;
+
+public class IndexWriterUtils {
+
+  private static final Log LOG = LogFactory.getLog(IndexWriterUtils.class);
+
+  /**
+   * Maximum number of threads to allow per-table when writing. Each writer thread (from
+   * {@link IndexWriterUtils#NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY}) has a single HTable.
+   * However, each table is backed by a threadpool to manage the updates to that table. this
+   * specifies the number of threads to allow in each of those tables. Generally, you shouldn't need
+   * to change this, unless you have a small number of indexes to which most of the writes go.
+   * Defaults to: {@value #DEFAULT_NUM_PER_TABLE_THREADS}.
+   * <p>
+   * For tables to which there are not a lot of writes, the thread pool automatically will decrease
+   * the number of threads to one (though it can burst up to the specified max for any given table),
+   * so increasing this to meet the max case is reasonable.
+   * <p>
+   * Setting this value too small can cause <b>catastrophic cluster failure</b>. The way HTable's
+   * underlying pool works is such that is does direct hand-off of tasks to threads. This works fine
+   * because HTables are assumed to work in a single-threaded context, so we never get more threads
+   * than regionservers. In a multi-threaded context, we can easily grow to more than that number of
+   * threads. Currently, HBase doesn't support a custom thread-pool to back the HTable via the
+   * coprocesor hooks, so we can't modify this behavior.
+   */
+  private static final String INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY =
+      "index.writer.threads.pertable.max";
+  private static final int DEFAULT_NUM_PER_TABLE_THREADS = Integer.MAX_VALUE;
+
+  /** Configuration key that HBase uses to set the max number of threads for an HTable */
+  public static final String HTABLE_THREAD_KEY = "hbase.htable.threads.max";
+  private IndexWriterUtils() {
+    // private ctor for utilites
+  }
+
+  public static HTableFactory getDefaultDelegateHTableFactory(CoprocessorEnvironment env) {
+    // create a simple delegate factory, setup the way we need
+    Configuration conf = env.getConfiguration();
+    // set the number of threads allowed per table.
+    int htableThreads =
+        conf.getInt(IndexWriterUtils.INDEX_WRITER_PER_TABLE_THREADS_CONF_KEY, IndexWriterUtils.DEFAULT_NUM_PER_TABLE_THREADS);
+    LOG.trace("Creating HTableFactory with " + htableThreads + " threads for each HTable.");
+    IndexManagementUtil.setIfNotSet(conf, HTABLE_THREAD_KEY, htableThreads);
+    return new CoprocessorHTableFactory(env);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java b/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java
new file mode 100644
index 0000000..ad66f17
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/KillServerOnFailurePolicy.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Naive failure policy - kills the server on which it resides
+ */
+public class KillServerOnFailurePolicy implements IndexFailurePolicy {
+
+  private static final Log LOG = LogFactory.getLog(KillServerOnFailurePolicy.class);
+  private Abortable abortable;
+  private Stoppable stoppable;
+
+  @Override
+  public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+    setup(parent, env.getRegionServerServices());
+  }
+
+  public void setup(Stoppable parent, Abortable abort) {
+    this.stoppable = parent;
+    this.abortable = abort;
+  }
+
+  @Override
+  public void stop(String why) {
+    // noop
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stoppable.isStopped();
+  }
+
+  @Override
+  public void
+      handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+    // cleanup resources
+    this.stop("Killing ourselves because of an error:" + cause);
+    // notify the regionserver of the failure
+    String msg =
+        "Could not update the index table, killing server region because couldn't write to an index table";
+    LOG.error(msg, cause);
+    try {
+      this.abortable.abort(msg, cause);
+    } catch (Exception e) {
+      LOG.fatal("Couldn't abort this server to preserve index writes, "
+          + "attempting to hard kill the server");
+      System.exit(1);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java b/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..b85aa94
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/ParallelWriterIndexCommitter.java
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.hbase.index.parallel.EarlyExitFailure;
+import org.apache.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.hbase.index.parallel.Task;
+import org.apache.hbase.index.parallel.TaskBatch;
+import org.apache.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hbase.index.parallel.ThreadPoolManager;
+import org.apache.hbase.index.table.CachingHTableFactory;
+import org.apache.hbase.index.table.HTableFactory;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+/**
+ * Write index updates to the index tables in parallel. We attempt to early exit from the writes if
+ * any of the index updates fails. Completion is determined by the following criteria: *
+ * <ol>
+ * <li>All index writes have returned, OR</li>
+ * <li>Any single index write has failed</li>
+ * </ol>
+ * We attempt to quickly determine if any write has failed and not write to the remaining indexes to
+ * ensure a timely recovery of the failed index writes.
+ */
+public class ParallelWriterIndexCommitter implements IndexCommitter {
+
+  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.writer.threads.max";
+  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+      "index.writer.threads.keepalivetime";
+  private static final Log LOG = LogFactory.getLog(ParallelWriterIndexCommitter.class);
+
+  private HTableFactory factory;
+  private Stoppable stopped;
+  private QuickFailingTaskRunner pool;
+
+  @Override
+  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+    Configuration conf = env.getConfiguration();
+    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+      ThreadPoolManager.getExecutor(
+        new ThreadPoolBuilder(name, conf).
+          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+  }
+
+  /**
+   * Setup <tt>this</tt>.
+   * <p>
+   * Exposed for TESTING
+   */
+  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+      int cacheSize) {
+    this.factory = new CachingHTableFactory(factory, cacheSize);
+    this.pool = new QuickFailingTaskRunner(pool);
+    this.stopped = stop;
+  }
+
+  @Override
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws SingleIndexWriteFailureException {
+    /*
+     * This bit here is a little odd, so let's explain what's going on. Basically, we want to do the
+     * writes in parallel to each index table, so each table gets its own task and is submitted to
+     * the pool. Where it gets tricky is that we want to block the calling thread until one of two
+     * things happens: (1) all index tables get successfully updated, or (2) any one of the index
+     * table writes fail; in either case, we should return as quickly as possible. We get a little
+     * more complicated in that if we do get a single failure, but any of the index writes hasn't
+     * been started yet (its been queued up, but not submitted to a thread) we want to that task to
+     * fail immediately as we know that write is a waste and will need to be replayed anyways.
+     */
+
+    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+    TaskBatch<Void> tasks = new TaskBatch<Void>(entries.size());
+    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+      // get the mutations for each table. We leak the implementation here a little bit to save
+      // doing a complete copy over of all the index update for each table.
+      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+      final HTableInterfaceReference tableReference = entry.getKey();
+      /*
+       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+       * running thread. The former will only work if we are not in the midst of writing the current
+       * batch to the table, though we do check these status variables before starting and before
+       * writing the batch. The latter usage, interrupting the thread, will work in the previous
+       * situations as was at some points while writing the batch, depending on the underlying
+       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+       * supports an interrupt).
+       */
+      tasks.add(new Task<Void>() {
+
+        /**
+         * Do the actual write to the primary table. We don't need to worry about closing the table
+         * because that is handled the {@link CachingHTableFactory}.
+         */
+        @Override
+        public Void call() throws Exception {
+          // this may have been queued, so another task infront of us may have failed, so we should
+          // early exit, if that's the case
+          throwFailureIfDone();
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+          }
+          try {
+            HTableInterface table = factory.getTable(tableReference.get());
+            throwFailureIfDone();
+            table.batch(mutations);
+          } catch (SingleIndexWriteFailureException e) {
+            throw e;
+          } catch (IOException e) {
+            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+          } catch (InterruptedException e) {
+            // reset the interrupt status on the thread
+            Thread.currentThread().interrupt();
+            throw new SingleIndexWriteFailureException(tableReference.toString(), mutations, e);
+          }
+          return null;
+        }
+
+        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+          if (this.isBatchFailed() || Thread.currentThread().isInterrupted()) {
+            throw new SingleIndexWriteFailureException(
+                "Pool closed, not attempting to write to the index!", null);
+          }
+
+        }
+      });
+    }
+
+    // actually submit the tasks to the pool and wait for them to finish/fail
+    try {
+      pool.submitUninterruptible(tasks);
+    } catch (EarlyExitFailure e) {
+      propagateFailure(e);
+    } catch (ExecutionException e) {
+      LOG.error("Found a failed index update!");
+      propagateFailure(e.getCause());
+    }
+
+  }
+
+  private void propagateFailure(Throwable throwable) throws SingleIndexWriteFailureException {
+    try {
+      throw throwable;
+    } catch (SingleIndexWriteFailureException e1) {
+      throw e1;
+    } catch (Throwable e1) {
+      throw new SingleIndexWriteFailureException(
+          "Got an abort notification while writing to the index!", e1);
+    }
+
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * This method should only be called <b>once</b>. Stopped state ({@link #isStopped()}) is managed
+   * by the external {@link Stoppable}. This call does not delegate the stop down to the
+   * {@link Stoppable} passed in the constructor.
+   * @param why the reason for stopping
+   */
+  @Override
+  public void stop(String why) {
+    LOG.info("Shutting down " + this.getClass().getSimpleName() + " because " + why);
+    this.pool.stop(why);
+    this.factory.shutdown();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped.isStopped();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java b/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java
new file mode 100644
index 0000000..99be157
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/recovery/PerRegionIndexWriteCache.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write.recovery;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+
+
+public class PerRegionIndexWriteCache {
+
+  private Map<HRegion, Multimap<HTableInterfaceReference, Mutation>> cache =
+      new HashMap<HRegion, Multimap<HTableInterfaceReference, Mutation>>();
+
+
+  /**
+   * Get the edits for the current region. Removes the edits from the cache. To add them back, call
+   * {@link #addEdits(HRegion, HTableInterfaceReference, Collection)}.
+   * @param region
+   * @return Get the edits for the given region. Returns <tt>null</tt> if there are no pending edits
+   *         for the region
+   */
+  public Multimap<HTableInterfaceReference, Mutation> getEdits(HRegion region) {
+    return cache.remove(region);
+  }
+
+  /**
+   * @param region
+   * @param table
+   * @param collection
+   */
+  public void addEdits(HRegion region, HTableInterfaceReference table,
+      Collection<Mutation> collection) {
+    Multimap<HTableInterfaceReference, Mutation> edits = cache.get(region);
+    if (edits == null) {
+      edits = ArrayListMultimap.<HTableInterfaceReference, Mutation> create();
+      cache.put(region, edits);
+    }
+    edits.putAll(table, collection);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java b/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
new file mode 100644
index 0000000..642ff37
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/recovery/StoreFailuresInCachePolicy.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write.recovery;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.write.IndexFailurePolicy;
+import org.apache.hbase.index.write.KillServerOnFailurePolicy;
+
+/**
+ * Tracks any failed writes in The {@link PerRegionIndexWriteCache}, given a
+ * {@link MultiIndexWriteFailureException} (which is thrown from the
+ * {@link TrackingParallelWriterIndexCommitter}. Any other exception failure causes the a server
+ * abort via the usual {@link KillServerOnFailurePolicy}.
+ */
+public class StoreFailuresInCachePolicy implements IndexFailurePolicy {
+
+  private KillServerOnFailurePolicy delegate;
+  private PerRegionIndexWriteCache cache;
+  private HRegion region;
+
+  /**
+   * @param failedIndexEdits cache to update when we find a failure
+   */
+  public StoreFailuresInCachePolicy(PerRegionIndexWriteCache failedIndexEdits) {
+    this.cache = failedIndexEdits;
+  }
+
+  @Override
+  public void setup(Stoppable parent, RegionCoprocessorEnvironment env) {
+    this.region = env.getRegion();
+    this.delegate = new KillServerOnFailurePolicy();
+    this.delegate.setup(parent, env);
+
+  }
+
+  @Override
+  public void handleFailure(Multimap<HTableInterfaceReference, Mutation> attempted, Exception cause) throws IOException {
+    // if its not an exception we can handle, let the delegate take care of it
+    if (!(cause instanceof MultiIndexWriteFailureException)) {
+      delegate.handleFailure(attempted, cause);
+    }
+    List<HTableInterfaceReference> failedTables =
+        ((MultiIndexWriteFailureException) cause).getFailedTables();
+    for (HTableInterfaceReference table : failedTables) {
+      cache.addEdits(this.region, table, attempted.get(table));
+    }
+  }
+
+
+  @Override
+  public void stop(String why) {
+    this.delegate.stop(why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.delegate.isStopped();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java b/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
new file mode 100644
index 0000000..1d4f02d
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/write/recovery/TrackingParallelWriterIndexCommitter.java
@@ -0,0 +1,228 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.write.recovery;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.CapturingAbortable;
+import org.apache.hbase.index.exception.MultiIndexWriteFailureException;
+import org.apache.hbase.index.exception.SingleIndexWriteFailureException;
+import org.apache.hbase.index.parallel.EarlyExitFailure;
+import org.apache.hbase.index.parallel.Task;
+import org.apache.hbase.index.parallel.TaskBatch;
+import org.apache.hbase.index.parallel.TaskRunner;
+import org.apache.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hbase.index.parallel.ThreadPoolManager;
+import org.apache.hbase.index.parallel.WaitForCompletionTaskRunner;
+import org.apache.hbase.index.table.CachingHTableFactory;
+import org.apache.hbase.index.table.HTableFactory;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.write.IndexCommitter;
+import org.apache.hbase.index.write.IndexWriter;
+import org.apache.hbase.index.write.IndexWriterUtils;
+import org.apache.hbase.index.write.ParallelWriterIndexCommitter;
+
+/**
+ * Like the {@link ParallelWriterIndexCommitter}, but blocks until all writes have attempted to
+ * allow the caller to retrieve the failed and succeeded index updates. Therefore, this class will
+ * be a lot slower, in the face of failures, when compared to the
+ * {@link ParallelWriterIndexCommitter} (though as fast for writes), so it should be used only when
+ * you need to at least attempt all writes and know their result; for instance, this is fine for
+ * doing WAL recovery - it's not a performance intensive situation and we want to limit the the
+ * edits we need to retry.
+ * <p>
+ * On failure to {@link #write(Multimap)}, we return a {@link MultiIndexWriteFailureException} that
+ * contains the list of {@link HTableInterfaceReference} that didn't complete successfully.
+ * <p>
+ * Failures to write to the index can happen several different ways:
+ * <ol>
+ * <li><tt>this</tt> is {@link #stop(String) stopped} or aborted (via the passed {@link Abortable}.
+ * This causing any pending tasks to fail whatever they are doing as fast as possible. Any writes
+ * that have not begun are not even attempted and marked as failures.</li>
+ * <li>A batch write fails. This is the generic HBase write failure - it may occur because the index
+ * table is not available, .META. or -ROOT- is unavailable, or any other (of many) possible HBase
+ * exceptions.</li>
+ * </ol>
+ * Regardless of how the write fails, we still wait for all writes to complete before passing the
+ * failure back to the client.
+ */
+public class TrackingParallelWriterIndexCommitter implements IndexCommitter {
+  private static final Log LOG = LogFactory.getLog(TrackingParallelWriterIndexCommitter.class);
+
+  public static final String NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY = "index.trackingwriter.threads.max";
+  private static final int DEFAULT_CONCURRENT_INDEX_WRITER_THREADS = 10;
+  private static final String INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY =
+      "index.trackingwriter.threads.keepalivetime";
+  
+  private TaskRunner pool;
+  private HTableFactory factory;
+  private CapturingAbortable abortable;
+  private Stoppable stopped;
+
+  @Override
+  public void setup(IndexWriter parent, RegionCoprocessorEnvironment env, String name) {
+    Configuration conf = env.getConfiguration();
+    setup(IndexWriterUtils.getDefaultDelegateHTableFactory(env),
+      ThreadPoolManager.getExecutor(
+        new ThreadPoolBuilder(name, conf).
+          setMaxThread(NUM_CONCURRENT_INDEX_WRITER_THREADS_CONF_KEY,
+            DEFAULT_CONCURRENT_INDEX_WRITER_THREADS).
+          setCoreTimeout(INDEX_WRITER_KEEP_ALIVE_TIME_CONF_KEY), env),
+      env.getRegionServerServices(), parent, CachingHTableFactory.getCacheSize(conf));
+  }
+
+  /**
+   * Setup <tt>this</tt>.
+   * <p>
+   * Exposed for TESTING
+   */
+  void setup(HTableFactory factory, ExecutorService pool, Abortable abortable, Stoppable stop,
+      int cacheSize) {
+    this.pool = new WaitForCompletionTaskRunner(pool);
+    this.factory = new CachingHTableFactory(factory, cacheSize);
+    this.abortable = new CapturingAbortable(abortable);
+    this.stopped = stop;
+  }
+
+  @Override
+  public void write(Multimap<HTableInterfaceReference, Mutation> toWrite)
+      throws MultiIndexWriteFailureException {
+    Set<Entry<HTableInterfaceReference, Collection<Mutation>>> entries = toWrite.asMap().entrySet();
+    TaskBatch<Boolean> tasks = new TaskBatch<Boolean>(entries.size());
+    List<HTableInterfaceReference> tables = new ArrayList<HTableInterfaceReference>(entries.size());
+    for (Entry<HTableInterfaceReference, Collection<Mutation>> entry : entries) {
+      // get the mutations for each table. We leak the implementation here a little bit to save
+      // doing a complete copy over of all the index update for each table.
+      final List<Mutation> mutations = (List<Mutation>) entry.getValue();
+      // track each reference so we can get at it easily later, when determing failures
+      final HTableInterfaceReference tableReference = entry.getKey();
+      tables.add(tableReference);
+
+      /*
+       * Write a batch of index updates to an index table. This operation stops (is cancelable) via
+       * two mechanisms: (1) setting aborted or stopped on the IndexWriter or, (2) interrupting the
+       * running thread. The former will only work if we are not in the midst of writing the current
+       * batch to the table, though we do check these status variables before starting and before
+       * writing the batch. The latter usage, interrupting the thread, will work in the previous
+       * situations as was at some points while writing the batch, depending on the underlying
+       * writer implementation (HTableInterface#batch is blocking, but doesn't elaborate when is
+       * supports an interrupt).
+       */
+      tasks.add(new Task<Boolean>() {
+
+        /**
+         * Do the actual write to the primary table. We don't need to worry about closing the table
+         * because that is handled the {@link CachingHTableFactory}.
+         */
+        @Override
+        public Boolean call() throws Exception {
+          try {
+            // this may have been queued, but there was an abort/stop so we try to early exit
+            throwFailureIfDone();
+
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Writing index update:" + mutations + " to table: " + tableReference);
+            }
+            HTableInterface table = factory.getTable(tableReference.get());
+            throwFailureIfDone();
+            table.batch(mutations);
+          } catch (InterruptedException e) {
+            // reset the interrupt status on the thread
+            Thread.currentThread().interrupt();
+            throw e;
+          } catch (Exception e) {
+            throw e;
+          }
+          return Boolean.TRUE;
+        }
+
+        private void throwFailureIfDone() throws SingleIndexWriteFailureException {
+          if (stopped.isStopped() || abortable.isAborted()
+              || Thread.currentThread().isInterrupted()) {
+            throw new SingleIndexWriteFailureException(
+                "Pool closed, not attempting to write to the index!", null);
+          }
+
+        }
+      });
+    }
+
+    List<Boolean> results = null;
+    try {
+      LOG.debug("Waiting on index update tasks to complete...");
+      results = this.pool.submitUninterruptible(tasks);
+    } catch (ExecutionException e) {
+      throw new RuntimeException(
+          "Should not fail on the results while using a WaitForCompletionTaskRunner", e);
+    } catch (EarlyExitFailure e) {
+      throw new RuntimeException("Stopped while waiting for batch, quiting!", e);
+    }
+    
+    // track the failures. We only ever access this on return from our calls, so no extra
+    // synchronization is needed. We could update all the failures as we find them, but that add a
+    // lot of locking overhead, and just doing the copy later is about as efficient.
+    List<HTableInterfaceReference> failures = new ArrayList<HTableInterfaceReference>();
+    int index = 0;
+    for (Boolean result : results) {
+      // there was a failure
+      if (result == null) {
+        // we know which table failed by the index of the result
+        failures.add(tables.get(index));
+      }
+      index++;
+    }
+
+    // if any of the tasks failed, then we need to propagate the failure
+    if (failures.size() > 0) {
+      // make the list unmodifiable to avoid any more synchronization concerns
+      throw new MultiIndexWriteFailureException(Collections.unmodifiableList(failures));
+    }
+    return;
+  }
+
+  @Override
+  public void stop(String why) {
+    LOG.info("Shutting down " + this.getClass().getSimpleName());
+    this.pool.stop(why);
+    this.factory.shutdown();
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped.isStopped();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/phoenix/cache/GlobalCache.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/phoenix/cache/GlobalCache.java b/src/main/java/org/apache/phoenix/cache/GlobalCache.java
new file mode 100644
index 0000000..50ae9d9
--- /dev/null
+++ b/src/main/java/org/apache/phoenix/cache/GlobalCache.java
@@ -0,0 +1,114 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.cache;
+
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_PERC_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_MEMORY_WAIT_MS_ATTRIB;
+import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTRIB;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.memory.ChildMemoryManager;
+import org.apache.phoenix.memory.GlobalMemoryManager;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.schema.PTable;
+
+
+/**
+ * 
+ * Global root cache for the server. Each tenant is managed as a child tenant cache of this one. Queries
+ * not associated with a particular tenant use this as their tenant cache.
+ *
+ * @author jtaylor
+ * @since 0.1
+ */
+public class GlobalCache extends TenantCacheImpl {
+    private static GlobalCache INSTANCE; 
+    
+    private final Configuration config;
+    // TODO: Use Guava cache with auto removal after lack of access 
+    private final ConcurrentMap<ImmutableBytesWritable,TenantCache> perTenantCacheMap = new ConcurrentHashMap<ImmutableBytesWritable,TenantCache>();
+    // Cache for lastest PTable for a given Phoenix table
+    private final ConcurrentHashMap<ImmutableBytesPtr,PTable> metaDataCacheMap = new ConcurrentHashMap<ImmutableBytesPtr,PTable>();
+    
+    public static synchronized GlobalCache getInstance(RegionCoprocessorEnvironment env) {
+        // See http://www.cs.umd.edu/~pugh/java/memoryModel/DoubleCheckedLocking.html
+        // for explanation of why double locking doesn't work. 
+        if (INSTANCE == null) {
+            INSTANCE = new GlobalCache(env.getConfiguration());
+        }
+        return INSTANCE;
+    }
+    
+    public ConcurrentHashMap<ImmutableBytesPtr,PTable> getMetaDataCache() {
+        return metaDataCacheMap;
+    }
+    
+    /**
+     * Get the tenant cache associated with the tenantId. If tenantId is not applicable, null may be
+     * used in which case a global tenant cache is returned.
+     * @param env the HBase configuration
+     * @param tenantId the tenant ID or null if not applicable.
+     * @return TenantCache
+     */
+    public static TenantCache getTenantCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId) {
+        GlobalCache globalCache = GlobalCache.getInstance(env);
+        TenantCache tenantCache = tenantId == null ? globalCache : globalCache.getChildTenantCache(tenantId);      
+        return tenantCache;
+    }
+    
+    private GlobalCache(Configuration config) {
+        super(new GlobalMemoryManager(Runtime.getRuntime().totalMemory() * 
+                                          config.getInt(MAX_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_PERC) / 100,
+                                      config.getInt(MAX_MEMORY_WAIT_MS_ATTRIB, QueryServicesOptions.DEFAULT_MAX_MEMORY_WAIT_MS)),
+              config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS));
+        this.config = config;
+    }
+    
+    public Configuration getConfig() {
+        return config;
+    }
+    
+    /**
+     * Retrieve the tenant cache given an tenantId.
+     * @param tenantId the ID that identifies the tenant
+     * @return the existing or newly created TenantCache
+     */
+    public TenantCache getChildTenantCache(ImmutableBytesWritable tenantId) {
+        TenantCache tenantCache = perTenantCacheMap.get(tenantId);
+        if (tenantCache == null) {
+            int maxTenantMemoryPerc = config.getInt(MAX_TENANT_MEMORY_PERC_ATTRIB, QueryServicesOptions.DEFAULT_MAX_TENANT_MEMORY_PERC);
+            int maxServerCacheTimeToLive = config.getInt(QueryServices.MAX_SERVER_CACHE_TIME_TO_LIVE_MS, QueryServicesOptions.DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS);
+            TenantCacheImpl newTenantCache = new TenantCacheImpl(new ChildMemoryManager(getMemoryManager(), maxTenantMemoryPerc), maxServerCacheTimeToLive);
+            tenantCache = perTenantCacheMap.putIfAbsent(tenantId, newTenantCache);
+            if (tenantCache == null) {
+                tenantCache = newTenantCache;
+            }
+        }
+        return tenantCache;
+    }
+}