You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2014/01/27 20:23:47 UTC

[46/51] [partial] Initial commit

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/Indexer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/Indexer.java b/src/main/java/org/apache/hbase/index/Indexer.java
new file mode 100644
index 0000000..6f6ba6a
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/Indexer.java
@@ -0,0 +1,704 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index;
+
+import static org.apache.hbase.index.util.IndexManagementUtil.rethrowIndexingException;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Coprocessor;
+import org.apache.hadoop.hbase.CoprocessorEnvironment;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Multimap;
+import org.apache.hbase.index.builder.IndexBuildManager;
+import org.apache.hbase.index.builder.IndexBuilder;
+import org.apache.hbase.index.builder.IndexBuildingFailureException;
+import org.apache.hbase.index.table.HTableInterfaceReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+import org.apache.hbase.index.util.IndexManagementUtil;
+import org.apache.hbase.index.wal.IndexedKeyValue;
+import org.apache.hbase.index.write.IndexFailurePolicy;
+import org.apache.hbase.index.write.IndexWriter;
+import org.apache.hbase.index.write.recovery.PerRegionIndexWriteCache;
+import org.apache.hbase.index.write.recovery.StoreFailuresInCachePolicy;
+import org.apache.hbase.index.write.recovery.TrackingParallelWriterIndexCommitter;
+
+/**
+ * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed
+ * to an {@link IndexBuilder} to determine the actual updates to make.
+ * <p>
+ * If the WAL is enabled, these updates are then added to the WALEdit and attempted to be written to
+ * the WAL after the WALEdit has been saved. If any of the index updates fail, this server is
+ * immediately terminated and we rely on WAL replay to attempt the index updates again (see
+ * {@link #preWALRestore(ObserverContext, HRegionInfo, HLogKey, WALEdit)}).
+ * <p>
+ * If the WAL is disabled, the updates are attempted immediately. No consistency guarantees are made
+ * if the WAL is disabled - some or none of the index updates may be successful. All updates in a
+ * single batch must have the same durability level - either everything gets written to the WAL or
+ * nothing does. Currently, we do not support mixed-durability updates within a single batch. If you
+ * want to have different durability levels, you only need to split the updates into two different
+ * batches.
+ */
+public class Indexer extends BaseRegionObserver {
+
+  private static final Log LOG = LogFactory.getLog(Indexer.class);
+
+  /** WAL on this server */
+  private HLog log;
+  protected IndexWriter writer;
+  protected IndexBuildManager builder;
+
+  /** Configuration key for the {@link IndexBuilder} to use */
+  public static final String INDEX_BUILDER_CONF_KEY = "index.builder";
+
+  // Setup out locking on the index edits/WAL so we can be sure that we don't lose a roll a WAL edit
+  // before an edit is applied to the index tables
+  private static final ReentrantReadWriteLock INDEX_READ_WRITE_LOCK = new ReentrantReadWriteLock(
+      true);
+  public static final ReadLock INDEX_UPDATE_LOCK = INDEX_READ_WRITE_LOCK.readLock();
+
+  /**
+   * Configuration key for if the indexer should check the version of HBase is running. Generally,
+   * you only want to ignore this for testing or for custom versions of HBase.
+   */
+  public static final String CHECK_VERSION_CONF_KEY = "com.saleforce.hbase.index.checkversion";
+
+  private static final String INDEX_RECOVERY_FAILURE_POLICY_KEY = "org.apache.hbase.index.recovery.failurepolicy";
+
+  /**
+   * Marker {@link KeyValue} to indicate that we are doing a batch operation. Needed because the
+   * coprocessor framework throws away the WALEdit from the prePut/preDelete hooks when checking a
+   * batch if there were no {@link KeyValue}s attached to the {@link WALEdit}. When you get down to
+   * the preBatch hook, there won't be any WALEdits to which to add the index updates.
+   */
+  private static KeyValue BATCH_MARKER = new KeyValue();
+
+  /**
+   * cache the failed updates to the various regions. Used for making the WAL recovery mechanisms
+   * more robust in the face of recoverying index regions that were on the same server as the
+   * primary table region
+   */
+  private PerRegionIndexWriteCache failedIndexEdits = new PerRegionIndexWriteCache();
+
+  /**
+   * IndexWriter for writing the recovered index edits. Separate from the main indexer since we need
+   * different write/failure policies
+   */
+  private IndexWriter recoveryWriter;
+
+  private boolean stopped;
+  private boolean disabled;
+
+  public static final String RecoveryFailurePolicyKeyForTesting = INDEX_RECOVERY_FAILURE_POLICY_KEY;
+
+  @Override
+  public void start(CoprocessorEnvironment e) throws IOException {
+      try {
+        final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e;
+        String serverName = env.getRegionServerServices().getServerName().getServerName();
+        if (env.getConfiguration().getBoolean(CHECK_VERSION_CONF_KEY, true)) {
+          // make sure the right version <-> combinations are allowed.
+          String errormsg = Indexer.validateVersion(env.getHBaseVersion(), env.getConfiguration());
+          if (errormsg != null) {
+            IOException ioe = new IOException(errormsg);
+            env.getRegionServerServices().abort(errormsg, ioe);
+            throw ioe;
+          }
+        }
+    
+        this.builder = new IndexBuildManager(env);
+    
+        // get a reference to the WAL
+        log = env.getRegionServerServices().getWAL();
+        // add a synchronizer so we don't archive a WAL that we need
+        log.registerWALActionsListener(new IndexLogRollSynchronizer(INDEX_READ_WRITE_LOCK.writeLock()));
+    
+        // setup the actual index writer
+        this.writer = new IndexWriter(env, serverName + "-index-writer");
+    
+        // setup the recovery writer that does retries on the failed edits
+        TrackingParallelWriterIndexCommitter recoveryCommmiter =
+            new TrackingParallelWriterIndexCommitter();
+    
+        try {
+          // get the specified failure policy. We only ever override it in tests, but we need to do it
+          // here
+          Class<? extends IndexFailurePolicy> policyClass =
+              env.getConfiguration().getClass(INDEX_RECOVERY_FAILURE_POLICY_KEY,
+                StoreFailuresInCachePolicy.class, IndexFailurePolicy.class);
+          IndexFailurePolicy policy =
+              policyClass.getConstructor(PerRegionIndexWriteCache.class).newInstance(failedIndexEdits);
+          LOG.debug("Setting up recovery writter with committer: " + recoveryCommmiter.getClass()
+              + " and failure policy: " + policy.getClass());
+          recoveryWriter =
+              new IndexWriter(recoveryCommmiter, policy, env, serverName + "-recovery-writer");
+        } catch (Exception ex) {
+          throw new IOException("Could not instantiate recovery failure policy!", ex);
+        }
+      } catch (NoSuchMethodError ex) {
+          disabled = true;
+          super.start(e);
+          LOG.error("Must be too early a version of HBase. Disabled coprocessor ", ex);
+      }
+  }
+
+  @Override
+  public void stop(CoprocessorEnvironment e) throws IOException {
+    if (this.stopped) {
+      return;
+    }
+    if (this.disabled) {
+        super.stop(e);
+        return;
+      }
+    this.stopped = true;
+    String msg = "Indexer is being stopped";
+    this.builder.stop(msg);
+    this.writer.stop(msg);
+    this.recoveryWriter.stop(msg);
+  }
+
+  @Override
+  public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
+      final WALEdit edit, final boolean writeToWAL) throws IOException {
+      if (this.disabled) {
+          super.prePut(c, put, edit, writeToWAL);
+          return;
+        }
+    // just have to add a batch marker to the WALEdit so we get the edit again in the batch
+    // processing step. We let it throw an exception here because something terrible has happened.
+    edit.add(BATCH_MARKER);
+  }
+
+  @Override
+  public void preDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
+      WALEdit edit, boolean writeToWAL) throws IOException {
+      if (this.disabled) {
+          super.preDelete(e, delete, edit, writeToWAL);
+          return;
+        }
+    try {
+      preDeleteWithExceptions(e, delete, edit, writeToWAL);
+      return;
+    } catch (Throwable t) {
+      rethrowIndexingException(t);
+    }
+    throw new RuntimeException(
+        "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+  }
+
+  public void preDeleteWithExceptions(ObserverContext<RegionCoprocessorEnvironment> e,
+      Delete delete, WALEdit edit, boolean writeToWAL) throws Exception {
+    // if we are making the update as part of a batch, we need to add in a batch marker so the WAL
+    // is retained
+    if (this.builder.getBatchId(delete) != null) {
+      edit.add(BATCH_MARKER);
+      return;
+    }
+
+    // get the mapping for index column -> target index table
+    Collection<Pair<Mutation, byte[]>> indexUpdates = this.builder.getIndexUpdate(delete);
+
+    if (doPre(indexUpdates, edit, writeToWAL)) {
+      takeUpdateLock("delete");
+    }
+  }
+
+  @Override
+  public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      if (this.disabled) {
+          super.preBatchMutate(c, miniBatchOp);
+          return;
+        }
+    try {
+      preBatchMutateWithExceptions(c, miniBatchOp);
+      return;
+    } catch (Throwable t) {
+      rethrowIndexingException(t);
+    }
+    throw new RuntimeException(
+        "Somehow didn't return an index update but also didn't propagate the failure to the client!");
+  }
+
+  @SuppressWarnings("deprecation")
+  public void preBatchMutateWithExceptions(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws Throwable {
+
+    // first group all the updates for a single row into a single update to be processed
+    Map<ImmutableBytesPtr, MultiMutation> mutations =
+        new HashMap<ImmutableBytesPtr, MultiMutation>();
+    boolean durable = false;
+    for (int i = 0; i < miniBatchOp.size(); i++) {
+      // remove the batch keyvalue marker - its added for all puts
+      WALEdit edit = miniBatchOp.getWalEdit(i);
+      // we don't have a WALEdit for immutable index cases, which still see this path
+      // we could check is indexing is enable for the mutation in prePut and then just skip this
+      // after checking here, but this saves us the checking again.
+      if (edit != null) {
+        KeyValue kv = edit.getKeyValues().remove(0);
+        assert kv == BATCH_MARKER : "Expected batch marker from the WALEdit, but got: " + kv;
+      }
+      Pair<Mutation, Integer> op = miniBatchOp.getOperation(i);
+      Mutation m = op.getFirst();
+      // skip this mutation if we aren't enabling indexing
+      // unfortunately, we really should ask if the raw mutation (rather than the combined mutation)
+      // should be indexed, which means we need to expose another method on the builder. Such is the
+      // way optimization go though.
+      if (!this.builder.isEnabled(m)) {
+        continue;
+      }
+      
+      // figure out if this is batch is durable or not
+      if(!durable){
+        durable = m.getDurability() != Durability.SKIP_WAL;
+      }
+
+      // add the mutation to the batch set
+      ImmutableBytesPtr row = new ImmutableBytesPtr(m.getRow());
+      MultiMutation stored = mutations.get(row);
+      // we haven't seen this row before, so add it
+      if (stored == null) {
+        stored = new MultiMutation(row, m.getWriteToWAL());
+        mutations.put(row, stored);
+      }
+      stored.addAll(m);
+    }
+    
+    // early exit if it turns out we don't have any edits
+    if (mutations.entrySet().size() == 0) {
+      return;
+    }
+
+    // dump all the index updates into a single WAL. They will get combined in the end anyways, so
+    // don't worry which one we get
+    WALEdit edit = miniBatchOp.getWalEdit(0);
+
+    // get the index updates for all elements in this batch
+    Collection<Pair<Mutation, byte[]>> indexUpdates =
+        this.builder.getIndexUpdate(miniBatchOp, mutations.values());
+    // write them
+    if (doPre(indexUpdates, edit, durable)) {
+      takeUpdateLock("batch mutation");
+    }
+  }
+
+  private void takeUpdateLock(String opDesc) throws IndexBuildingFailureException {
+    boolean interrupted = false;
+    // lock the log, so we are sure that index write gets atomically committed
+    LOG.debug("Taking INDEX_UPDATE readlock for " + opDesc);
+    // wait for the update lock
+    while (!this.stopped) {
+      try {
+        INDEX_UPDATE_LOCK.lockInterruptibly();
+        LOG.debug("Got the INDEX_UPDATE readlock for " + opDesc);
+        // unlock the lock so the server can shutdown, if we find that we have stopped since getting
+        // the lock
+        if (this.stopped) {
+          INDEX_UPDATE_LOCK.unlock();
+          throw new IndexBuildingFailureException(
+              "Found server stop after obtaining the update lock, killing update attempt");
+        }
+        break;
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while waiting for update lock. Ignoring unless stopped");
+        interrupted = true;
+      }
+    }
+    if (interrupted) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  private class MultiMutation extends Mutation {
+
+    private ImmutableBytesPtr rowKey;
+
+    public MultiMutation(ImmutableBytesPtr rowkey, boolean writeToWal) {
+      this.rowKey = rowkey;
+      this.writeToWAL = writeToWal;
+    }
+
+    /**
+     * @param stored
+     */
+    @SuppressWarnings("deprecation")
+    public void addAll(Mutation stored) {
+      // add all the kvs
+      for (Entry<byte[], List<KeyValue>> kvs : stored.getFamilyMap().entrySet()) {
+        byte[] family = kvs.getKey();
+        List<KeyValue> list = getKeyValueList(family, kvs.getValue().size());
+        list.addAll(kvs.getValue());
+        familyMap.put(family, list);
+      }
+
+      // add all the attributes, not overriding already stored ones
+      for (Entry<String, byte[]> attrib : stored.getAttributesMap().entrySet()) {
+        if (this.getAttribute(attrib.getKey()) == null) {
+          this.setAttribute(attrib.getKey(), attrib.getValue());
+        }
+      }
+      if (stored.getWriteToWAL()) {
+        this.writeToWAL = true;
+      }
+    }
+
+    private List<KeyValue> getKeyValueList(byte[] family, int hint) {
+      List<KeyValue> list = familyMap.get(family);
+      if (list == null) {
+        list = new ArrayList<KeyValue>(hint);
+      }
+      return list;
+    }
+
+    @Override
+    public byte[] getRow(){
+      return this.rowKey.copyBytesIfNecessary();
+    }
+
+    @Override
+    public int hashCode() {
+      return this.rowKey.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      return o == null ? false : o.hashCode() == this.hashCode();
+    }
+
+    @Override
+    public void readFields(DataInput arg0) throws IOException {
+      throw new UnsupportedOperationException("MultiMutations cannot be read/written");
+    }
+
+    @Override
+    public void write(DataOutput arg0) throws IOException {
+      throw new UnsupportedOperationException("MultiMutations cannot be read/written");
+    }
+  }
+
+  /**
+   * Add the index updates to the WAL, or write to the index table, if the WAL has been disabled
+   * @return <tt>true</tt> if the WAL has been updated.
+   * @throws IOException
+   */
+  private boolean doPre(Collection<Pair<Mutation, byte[]>> indexUpdates, final WALEdit edit,
+      final boolean writeToWAL) throws IOException {
+    // no index updates, so we are done
+    if (indexUpdates == null || indexUpdates.size() == 0) {
+      return false;
+    }
+
+    // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index
+    // update right away
+    if (!writeToWAL) {
+      try {
+        this.writer.write(indexUpdates);
+        return false;
+      } catch (Throwable e) {
+        LOG.error("Failed to update index with entries:" + indexUpdates, e);
+        IndexManagementUtil.rethrowIndexingException(e);
+      }
+    }
+
+    // we have all the WAL durability, so we just update the WAL entry and move on
+    for (Pair<Mutation, byte[]> entry : indexUpdates) {
+      edit.add(new IndexedKeyValue(entry.getSecond(), entry.getFirst()));
+    }
+
+    return true;
+  }
+
+  @Override
+  public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+      boolean writeToWAL) throws IOException {
+      if (this.disabled) {
+          super.postPut(e, put, edit, writeToWAL);
+          return;
+        }
+    doPost(edit, put, writeToWAL);
+  }
+
+  @Override
+  public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete,
+      WALEdit edit, boolean writeToWAL) throws IOException {
+      if (this.disabled) {
+          super.postDelete(e, delete, edit, writeToWAL);
+          return;
+        }
+    doPost(edit,delete, writeToWAL);
+  }
+
+  @Override
+  public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+      if (this.disabled) {
+          super.postBatchMutate(c, miniBatchOp);
+          return;
+        }
+    this.builder.batchCompleted(miniBatchOp);
+    // noop for the rest of the indexer - its handled by the first call to put/delete
+  }
+
+  private void doPost(WALEdit edit, Mutation m, boolean writeToWAL) throws IOException {
+    try {
+      doPostWithExceptions(edit, m, writeToWAL);
+      return;
+    } catch (Throwable e) {
+      rethrowIndexingException(e);
+    }
+    throw new RuntimeException(
+        "Somehow didn't complete the index update, but didn't return succesfully either!");
+  }
+
+  private void doPostWithExceptions(WALEdit edit, Mutation m, boolean writeToWAL) throws Exception {
+    //short circuit, if we don't need to do any work
+    if (!writeToWAL || !this.builder.isEnabled(m)) {
+      // already did the index update in prePut, so we are done
+      return;
+    }
+
+    // there is a little bit of excess here- we iterate all the non-indexed kvs for this check first
+    // and then do it again later when getting out the index updates. This should be pretty minor
+    // though, compared to the rest of the runtime
+    IndexedKeyValue ikv = getFirstIndexedKeyValue(edit);
+    /*
+     * early exit - we have nothing to write, so we don't need to do anything else. NOTE: we don't
+     * release the WAL Rolling lock (INDEX_UPDATE_LOCK) since we never take it in doPre if there are
+     * no index updates.
+     */
+    if (ikv == null) {
+      return;
+    }
+
+    /*
+     * only write the update if we haven't already seen this batch. We only want to write the batch
+     * once (this hook gets called with the same WALEdit for each Put/Delete in a batch, which can
+     * lead to writing all the index updates for each Put/Delete).
+     */
+    if (!ikv.getBatchFinished()) {
+      Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(edit);
+
+      // the WAL edit is kept in memory and we already specified the factory when we created the
+      // references originally - therefore, we just pass in a null factory here and use the ones
+      // already specified on each reference
+      try {
+          writer.writeAndKillYourselfOnFailure(indexUpdates);
+      } finally {
+        // With a custom kill policy, we may throw instead of kill the server.
+        // Without doing this in a finally block (at least with the mini cluster),
+        // the region server never goes down.
+
+        // mark the batch as having been written. In the single-update case, this never gets check
+        // again, but in the batch case, we will check it again (see above).
+        ikv.markBatchFinished();
+      
+        // release the lock on the index, we wrote everything properly
+        // we took the lock for each Put/Delete, so we have to release it a matching number of times
+        // batch cases only take the lock once, so we need to make sure we don't over-release the
+        // lock.
+        LOG.debug("Releasing INDEX_UPDATE readlock");
+        INDEX_UPDATE_LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * Search the {@link WALEdit} for the first {@link IndexedKeyValue} present
+   * @param edit {@link WALEdit}
+   * @return the first {@link IndexedKeyValue} in the {@link WALEdit} or <tt>null</tt> if not
+   *         present
+   */
+  private IndexedKeyValue getFirstIndexedKeyValue(WALEdit edit) {
+    for (KeyValue kv : edit.getKeyValues()) {
+      if (kv instanceof IndexedKeyValue) {
+        return (IndexedKeyValue) kv;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Extract the index updates from the WAL Edit
+   * @param edit to search for index updates
+   * @return the mutations to apply to the index tables
+   */
+  private Collection<Pair<Mutation, byte[]>> extractIndexUpdate(WALEdit edit) {
+    Collection<Pair<Mutation, byte[]>> indexUpdates = new ArrayList<Pair<Mutation, byte[]>>();
+    for (KeyValue kv : edit.getKeyValues()) {
+      if (kv instanceof IndexedKeyValue) {
+        IndexedKeyValue ikv = (IndexedKeyValue) kv;
+        indexUpdates.add(new Pair<Mutation, byte[]>(ikv.getMutation(), ikv.getIndexTable()));
+      }
+    }
+
+    return indexUpdates;
+  }
+
+  @Override
+  public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c) {
+    Multimap<HTableInterfaceReference, Mutation> updates = failedIndexEdits.getEdits(c.getEnvironment().getRegion());
+    
+    if (this.disabled) {
+        super.postOpen(c);
+        return;
+      }
+    LOG.info("Found some outstanding index updates that didn't succeed during"
+        + " WAL replay - attempting to replay now.");
+    //if we have no pending edits to complete, then we are done
+    if (updates == null || updates.size() == 0) {
+      return;
+    }
+    
+    // do the usual writer stuff, killing the server again, if we can't manage to make the index
+    // writes succeed again
+    try {
+        writer.writeAndKillYourselfOnFailure(updates);
+    } catch (IOException e) {
+        LOG.error("Exception thrown instead of killing server during index writing", e);
+    }
+  }
+
+  @Override
+  public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env, HRegionInfo info,
+      HLogKey logKey, WALEdit logEdit) throws IOException {
+      if (this.disabled) {
+          super.preWALRestore(env, info, logKey, logEdit);
+          return;
+        }
+    // TODO check the regions in transition. If the server on which the region lives is this one,
+    // then we should rety that write later in postOpen.
+    // we might be able to get even smarter here and pre-split the edits that are server-local
+    // into their own recovered.edits file. This then lets us do a straightforward recovery of each
+    // region (and more efficiently as we aren't writing quite as hectically from this one place).
+
+    /*
+     * Basically, we let the index regions recover for a little while long before retrying in the
+     * hopes they come up before the primary table finishes.
+     */
+    Collection<Pair<Mutation, byte[]>> indexUpdates = extractIndexUpdate(logEdit);
+    recoveryWriter.writeAndKillYourselfOnFailure(indexUpdates);
+  }
+
+  /**
+   * Create a custom {@link InternalScanner} for a compaction that tracks the versions of rows that
+   * are removed so we can clean then up from the the index table(s).
+   * <p>
+   * This is not yet implemented - its not clear if we should even mess around with the Index table
+   * for these rows as those points still existed. TODO: v2 of indexing
+   */
+  @Override
+  public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
+      Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
+      InternalScanner s) throws IOException {
+    return super.preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
+  }
+
+  /**
+   * Exposed for testing!
+   * @return the currently instantiated index builder
+   */
+  public IndexBuilder getBuilderForTesting() {
+    return this.builder.getBuilderForTesting();
+  }
+
+  /**
+   * Validate that the version and configuration parameters are supported
+   * @param hBaseVersion current version of HBase on which <tt>this</tt> coprocessor is installed
+   * @param conf configuration to check for allowed parameters (e.g. WAL Compression only if >=
+   *          0.94.9)
+   */
+  public static String validateVersion(String hBaseVersion, Configuration conf) {
+    String[] versions = hBaseVersion.split("[.]");
+    if (versions.length < 3) {
+      return "HBase version could not be read, expected three parts, but found: "
+          + Arrays.toString(versions);
+    }
+  
+    if (versions[1].equals("94")) {
+      String pointVersion = versions[2];
+      //remove -SNAPSHOT if applicable
+      int snapshot = pointVersion.indexOf("-");
+      if(snapshot > 0){
+        pointVersion = pointVersion.substring(0, snapshot);
+      }
+      // less than 0.94.9, so we need to check if WAL Compression is enabled
+      if (Integer.parseInt(pointVersion) < 9) {
+        if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) {
+          return
+                "Indexing not supported with WAL Compression for versions of HBase older than 0.94.9 - found version:"
+              + Arrays.toString(versions);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Enable indexing on the given table
+   * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled
+   * @param builder class to use when building the index for this table
+   * @param properties map of custom configuration options to make available to your
+   *          {@link IndexBuilder} on the server-side
+   * @throws IOException the Indexer coprocessor cannot be added
+   */
+  public static void enableIndexing(HTableDescriptor desc, Class<? extends IndexBuilder> builder,
+      Map<String, String> properties) throws IOException {
+    if (properties == null) {
+      properties = new HashMap<String, String>();
+    }
+    properties.put(Indexer.INDEX_BUILDER_CONF_KEY, builder.getName());
+    desc.addCoprocessor(Indexer.class.getName(), null, Coprocessor.PRIORITY_USER, properties);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/ValueGetter.java b/src/main/java/org/apache/hbase/index/ValueGetter.java
new file mode 100644
index 0000000..2600bf8
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/ValueGetter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index;
+
+import java.io.IOException;
+
+import org.apache.hbase.index.covered.update.ColumnReference;
+import org.apache.hbase.index.util.ImmutableBytesPtr;
+
+public interface ValueGetter {
+
+  /**
+   * Get the most recent (largest timestamp) for the given column reference
+   * @param ref to match against an underlying key value. Uses the passed object to match the
+   *          keyValue via {@link ColumnReference#matches}
+   * @return the stored value for the given {@link ColumnReference}, or <tt>null</tt> if no value is
+   *         present.
+   * @throws IOException if there is an error accessing the underlying data storage
+   */
+  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java b/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java
new file mode 100644
index 0000000..d0a8624
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/BaseIndexBuilder.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.covered.CoveredColumnsIndexBuilder;
+
+/**
+ * Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing.
+ * <p>
+ * You should extend this class, rather than implementing IndexBuilder directly to maintain
+ * compatability going forward.
+ * <p>
+ * Generally, you should consider using one of the implemented IndexBuilders (e.g
+ * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table
+ * up-to-date.
+ */
+public abstract class BaseIndexBuilder implements IndexBuilder {
+
+  private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
+  protected boolean stopped;
+
+  @Override
+  public void extendBaseIndexBuilderInstead() { }
+  
+  @Override
+  public void setup(RegionCoprocessorEnvironment conf) throws IOException {
+    // noop
+  }
+
+  @Override
+  public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException {
+    // noop
+  }
+
+  @Override
+  public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+    // noop
+  }
+  
+  /**
+   * By default, we always attempt to index the mutation. Commonly this can be slow (because the
+   * framework spends the time to do the indexing, only to realize that you don't need it) or not
+   * ideal (if you want to turn on/off indexing on a table without completely reloading it).
+ * @throws IOException 
+   */
+  @Override
+  public boolean isEnabled(Mutation m) throws IOException {
+    return true; 
+  }
+
+  /**
+   * {@inheritDoc}
+   * <p>
+   * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each
+   * mutation always applies to different rows, even if they are in the same batch, or are
+   * independent updates.
+   */
+  @Override
+  public byte[] getBatchId(Mutation m) {
+    return null;
+  }
+
+  @Override
+  public void stop(String why) {
+    LOG.debug("Stopping because: " + why);
+    this.stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java b/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java
new file mode 100644
index 0000000..c5ebafe
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/IndexBuildManager.java
@@ -0,0 +1,216 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.Indexer;
+import org.apache.hbase.index.parallel.QuickFailingTaskRunner;
+import org.apache.hbase.index.parallel.Task;
+import org.apache.hbase.index.parallel.TaskBatch;
+import org.apache.hbase.index.parallel.ThreadPoolBuilder;
+import org.apache.hbase.index.parallel.ThreadPoolManager;
+
+/**
+ * Manage the building of index updates from primary table updates.
+ * <p>
+ * Internally, parallelizes updates through a thread-pool to a delegate index builder. Underlying
+ * {@link IndexBuilder} <b>must be thread safe</b> for each index update.
+ */
+public class IndexBuildManager implements Stoppable {
+
+  private static final Log LOG = LogFactory.getLog(IndexBuildManager.class);
+  private final IndexBuilder delegate;
+  private QuickFailingTaskRunner pool;
+  private boolean stopped;
+
+  /**
+   * Set the number of threads with which we can concurrently build index updates. Unused threads
+   * will be released, but setting the number of threads too high could cause frequent swapping and
+   * resource contention on the server - <i>tune with care</i>. However, if you are spending a lot
+   * of time building index updates, it could be worthwhile to spend the time to tune this parameter
+   * as it could lead to dramatic increases in speed.
+   */
+  public static final String NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY = "index.builder.threads.max";
+  /** Default to a single thread. This is the safest course of action, but the slowest as well */
+  private static final int DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS = 10;
+  /**
+   * Amount of time to keep idle threads in the pool. After this time (seconds) we expire the
+   * threads and will re-create them as needed, up to the configured max
+   */
+  private static final String INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY =
+      "index.builder.threads.keepalivetime";
+
+  /**
+   * @param env environment in which <tt>this</tt> is running. Used to setup the
+   *          {@link IndexBuilder} and executor
+   * @throws IOException if an {@link IndexBuilder} cannot be correctly steup
+   */
+  public IndexBuildManager(RegionCoprocessorEnvironment env) throws IOException {
+    this(getIndexBuilder(env), new QuickFailingTaskRunner(ThreadPoolManager.getExecutor(
+      getPoolBuilder(env), env)));
+  }
+
+  private static IndexBuilder getIndexBuilder(RegionCoprocessorEnvironment e) throws IOException {
+    Configuration conf = e.getConfiguration();
+    Class<? extends IndexBuilder> builderClass =
+        conf.getClass(Indexer.INDEX_BUILDER_CONF_KEY, null, IndexBuilder.class);
+    try {
+      IndexBuilder builder = builderClass.newInstance();
+      builder.setup(e);
+      return builder;
+    } catch (InstantiationException e1) {
+      throw new IOException("Couldn't instantiate index builder:" + builderClass
+          + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+    } catch (IllegalAccessException e1) {
+      throw new IOException("Couldn't instantiate index builder:" + builderClass
+          + ", disabling indexing on table " + e.getRegion().getTableDesc().getNameAsString());
+    }
+  }
+
+  private static ThreadPoolBuilder getPoolBuilder(RegionCoprocessorEnvironment env) {
+    String serverName = env.getRegionServerServices().getServerName().getServerName();
+    return new ThreadPoolBuilder(serverName + "-index-builder", env.getConfiguration()).
+        setCoreTimeout(INDEX_BUILDER_KEEP_ALIVE_TIME_CONF_KEY).
+        setMaxThread(NUM_CONCURRENT_INDEX_BUILDER_THREADS_CONF_KEY,
+          DEFAULT_CONCURRENT_INDEX_BUILDER_THREADS);
+  }
+
+  public IndexBuildManager(IndexBuilder builder, QuickFailingTaskRunner pool) {
+    this.delegate = builder;
+    this.pool = pool;
+  }
+
+
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(
+      MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp,
+      Collection<? extends Mutation> mutations) throws Throwable {
+    // notify the delegate that we have started processing a batch
+    this.delegate.batchStarted(miniBatchOp);
+
+    // parallelize each mutation into its own task
+    // each task is cancelable via two mechanisms: (1) underlying HRegion is closing (which would
+    // fail lookups/scanning) and (2) by stopping this via the #stop method. Interrupts will only be
+    // acknowledged on each thread before doing the actual lookup, but after that depends on the
+    // underlying builder to look for the closed flag.
+    TaskBatch<Collection<Pair<Mutation, byte[]>>> tasks =
+        new TaskBatch<Collection<Pair<Mutation, byte[]>>>(mutations.size());
+    for (final Mutation m : mutations) {
+      tasks.add(new Task<Collection<Pair<Mutation, byte[]>>>() {
+
+        @Override
+        public Collection<Pair<Mutation, byte[]>> call() throws IOException {
+          return delegate.getIndexUpdate(m);
+        }
+
+      });
+    }
+    List<Collection<Pair<Mutation, byte[]>>> allResults = null;
+    try {
+      allResults = pool.submitUninterruptible(tasks);
+    } catch (CancellationException e) {
+      throw e;
+    } catch (ExecutionException e) {
+      LOG.error("Found a failed index update!");
+      throw e.getCause();
+    }
+
+    // we can only get here if we get successes from each of the tasks, so each of these must have a
+    // correct result
+    Collection<Pair<Mutation, byte[]>> results = new ArrayList<Pair<Mutation, byte[]>>();
+    for (Collection<Pair<Mutation, byte[]>> result : allResults) {
+      assert result != null : "Found an unsuccessful result, but didn't propagate a failure earlier";
+      results.addAll(result);
+    }
+
+    return results;
+  }
+
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
+    // all we get is a single update, so it would probably just go slower if we needed to queue it
+    // up. It will increase underlying resource contention a little bit, but the mutation case is
+    // far more common, so let's not worry about it for now.
+    // short circuit so we don't waste time.
+    if (!this.delegate.isEnabled(delete)) {
+      return null;
+    }
+
+    return delegate.getIndexUpdate(delete);
+
+  }
+
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+      Collection<KeyValue> filtered) throws IOException {
+    // this is run async, so we can take our time here
+    return delegate.getIndexUpdateForFilteredRows(filtered);
+  }
+
+  public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) {
+    delegate.batchCompleted(miniBatchOp);
+  }
+
+  public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp)
+      throws IOException {
+    delegate.batchStarted(miniBatchOp);
+  }
+
+  public boolean isEnabled(Mutation m) throws IOException {
+    return delegate.isEnabled(m);
+  }
+
+  public byte[] getBatchId(Mutation m) {
+    return delegate.getBatchId(m);
+  }
+
+  @Override
+  public void stop(String why) {
+    if (stopped) {
+      return;
+    }
+    this.stopped = true;
+    this.delegate.stop(why);
+    this.pool.stop(why);
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  public IndexBuilder getBuilderForTesting() {
+    return this.delegate;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java b/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java
new file mode 100644
index 0000000..b5ad1db
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/IndexBuilder.java
@@ -0,0 +1,144 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Pair;
+
+import org.apache.hbase.index.Indexer;
+
+/**
+ * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table
+ * updates.
+ * <p>
+ * Either all the index updates will be applied to all tables or the primary table will kill itself
+ * and will attempt to replay the index edits through the WAL replay mechanism.
+ */
+public interface IndexBuilder extends Stoppable {
+
+  /** Helper method signature to ensure people don't attempt to extend this class directly */
+  public void extendBaseIndexBuilderInstead();
+
+  /**
+   * This is always called exactly once on install of {@link Indexer}, before any calls
+   * {@link #getIndexUpdate} on
+   * @param env in which the builder is running
+   * @throws IOException on failure to setup the builder
+   */
+  public void setup(RegionCoprocessorEnvironment env) throws IOException;
+
+  /**
+   * Your opportunity to update any/all index tables based on the update of the primary table row.
+   * Its up to your implementation to ensure that timestamps match between the primary and index
+   * tables.
+   * <p>
+   * The mutation is a generic mutation (not a {@link Put} or a {@link Delete}), as it actually
+   * corresponds to a batch update. Its important to note that {@link Put}s always go through the
+   * batch update code path, so a single {@link Put} will come through here and update the primary
+   * table as the only update in the mutation.
+   * <p>
+   * Implementers must ensure that this method is thread-safe - it could (and probably will) be
+   * called concurrently for different mutations, which may or may not be part of the same batch.
+   * @param mutation update to the primary table to be indexed.
+   * @return a Map of the mutations to make -> target index table name
+   * @throws IOException on failure
+   */
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
+
+  /**
+   * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all
+   * index tables based on the delete of the primary table row. This is only called for cases where
+   * the client sends a single delete ({@link HTable#delete}). We separate this method from
+   * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has
+   * subtly different semantics for updating the families/timestamps from the generic batch path.
+   * <p>
+   * Its up to your implementation to ensure that timestamps match between the primary and index
+   * tables.
+   * <p>
+   * Implementers must ensure that this method is thread-safe - it could (and probably will) be
+   * called concurrently for different mutations, which may or may not be part of the same batch.
+   * @param delete {@link Delete} to the primary table that may be indexed
+   * @return a {@link Map} of the mutations to make -> target index table name
+   * @throws IOException on failure
+   */
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException;
+
+  /**
+   * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal
+   * flush or compaction mechanisms.
+   * @param filtered {@link KeyValue}s that previously existed, but won't be included in further
+   *          output from HBase.
+   * @return a {@link Map} of the mutations to make -> target index table name
+   * @throws IOException on failure
+   */
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+      Collection<KeyValue> filtered)
+      throws IOException;
+
+  /**
+   * Notification that a batch of updates has successfully been written.
+   * @param miniBatchOp the full batch operation that was written
+   */
+  public void batchCompleted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp);
+
+  /**
+   * Notification that a batch has been started.
+   * <p>
+   * Unfortunately, the way HBase has the coprocessor hooks setup, this is actually called
+   * <i>after</i> the {@link #getIndexUpdate} methods. Therefore, you will likely need an attribute
+   * on your {@link Put}/{@link Delete} to indicate it is a batch operation.
+   * @param miniBatchOp the full batch operation to be written
+ * @throws IOException 
+   */
+  public void batchStarted(MiniBatchOperationInProgress<Pair<Mutation, Integer>> miniBatchOp) throws IOException;
+
+  /**
+   * This allows the codec to dynamically change whether or not indexing should take place for a
+   * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
+   * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
+   * <p>
+   * We can also be smart about even indexing a given update here too - if the update doesn't
+   * contain any columns that we care about indexing, we can save the effort of analyzing the put
+   * and further.
+   * @param m mutation that should be indexed.
+   * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
+   *         basis, as each codec is instantiated per-region.
+ * @throws IOException 
+   */
+  public boolean isEnabled(Mutation m) throws IOException;
+
+  /**
+   * @param m mutation that has been received by the indexer and is waiting to be indexed
+   * @return the ID of batch to which the Mutation belongs, or <tt>null</tt> if the mutation is not
+   *         part of a batch.
+   */
+  public byte[] getBatchId(Mutation m);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java b/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java
new file mode 100644
index 0000000..5fd40a8
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/builder/IndexBuildingFailureException.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.builder;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+
+/**
+ * Unexpected failure while building index updates that wasn't caused by an {@link IOException}.
+ * This should be used if there is some basic issue with indexing - and no matter of retries will
+ * fix it.
+ */
+@SuppressWarnings("serial")
+public class IndexBuildingFailureException extends DoNotRetryIOException {
+
+  /**
+   * Constructor for over the wire propagation. Generally, shouldn't be used since index failure
+   * should have an underlying cause to propagate.
+   * @param msg reason for the failure
+   */
+  public IndexBuildingFailureException(String msg) {
+    super(msg);
+  }
+
+  /**
+   * @param msg reason
+   * @param cause underlying cause for the failure
+   */
+  public IndexBuildingFailureException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/Batch.java b/src/main/java/org/apache/hbase/index/covered/Batch.java
new file mode 100644
index 0000000..a89fc49
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/Batch.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * A collection of {@link KeyValue KeyValues} to the primary table
+ */
+public class Batch {
+
+  private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
+  private final long timestamp;
+  private List<KeyValue> batch = new ArrayList<KeyValue>();
+  private boolean allPointDeletes = true;
+
+  /**
+   * @param ts
+   */
+  public Batch(long ts) {
+    this.timestamp = ts;
+  }
+
+  public void add(KeyValue kv){
+    if (pointDeleteCode != kv.getType()) {
+      allPointDeletes = false;
+    }
+    batch.add(kv);
+  }
+
+  public boolean isAllPointDeletes() {
+    return allPointDeletes;
+  }
+
+  public long getTimestamp() {
+    return this.timestamp;
+  }
+
+  public List<KeyValue> getKvs() {
+    return this.batch;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java b/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java
new file mode 100644
index 0000000..4c33391
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/CoveredColumns.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hbase.index.covered.update.ColumnReference;
+
+/**
+ * Manage a set of {@link ColumnReference}s for the {@link LocalTableState}.
+ */
+public class CoveredColumns {
+
+  Set<ColumnReference> columns = new HashSet<ColumnReference>();
+
+  public Collection<? extends ColumnReference> findNonCoveredColumns(
+      Collection<? extends ColumnReference> columns2) {
+    List<ColumnReference> uncovered = new ArrayList<ColumnReference>();
+    for (ColumnReference column : columns2) {
+      if (!columns.contains(column)) {
+        uncovered.add(column);
+      }
+    }
+    return uncovered;
+  }
+
+  public void addColumn(ColumnReference column) {
+    this.columns.add(column);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java b/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java
new file mode 100644
index 0000000..9a3e4d3
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/CoveredColumnsIndexBuilder.java
@@ -0,0 +1,490 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Longs;
+import org.apache.hbase.index.builder.BaseIndexBuilder;
+import org.apache.hbase.index.covered.data.LocalHBaseState;
+import org.apache.hbase.index.covered.data.LocalTable;
+import org.apache.hbase.index.covered.update.ColumnTracker;
+import org.apache.hbase.index.covered.update.IndexUpdateManager;
+import org.apache.hbase.index.covered.update.IndexedColumnGroup;
+
+/**
+ * Build covered indexes for phoenix updates.
+ * <p>
+ * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't
+ * need to do any extra synchronization in the IndexBuilder.
+ * <p>
+ * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or
+ * flush, leading to a bloated index that needs to be cleaned up by a background process.
+ */
+public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
+
+  private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class);
+  public static final String CODEC_CLASS_NAME_KEY = "org.apache.hbase.index.codec.class";
+
+  protected RegionCoprocessorEnvironment env;
+  protected IndexCodec codec;
+  protected LocalHBaseState localTable;
+
+  @Override
+  public void setup(RegionCoprocessorEnvironment env) throws IOException {
+    this.env = env;
+    // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
+    // so we can use it later when generalizing covered indexes
+    Configuration conf = env.getConfiguration();
+    Class<? extends IndexCodec> codecClass =
+        conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
+    try {
+      Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
+      meth.setAccessible(true);
+      this.codec = meth.newInstance();
+      this.codec.initialize(env);
+    } catch (IOException e) {
+      throw e;
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+    
+    this.localTable = new LocalTable(env);
+  }
+
+  @Override
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
+    // build the index updates for each group
+    IndexUpdateManager updateMap = new IndexUpdateManager();
+
+    batchMutationAndAddUpdates(updateMap, mutation);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
+    }
+
+    return updateMap.toMap();
+  }
+
+  /**
+   * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
+   * key-value in the update to see if it matches the others. Generally, this will be the case, but
+   * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
+   * everything in batches based on timestamp.
+   * <p>
+   * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
+   * @param updateMap index updates into which to add new updates. Modified as a side-effect.
+   * @param state current state of the row for the mutation.
+   * @param m mutation to batch
+ * @throws IOException 
+   */
+  private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
+    // split the mutation into timestamp-based batches
+    Collection<Batch> batches = createTimestampBatchesFromMutation(m);
+
+    // create a state manager, so we can manage each batch
+    LocalTableState state = new LocalTableState(env, localTable, m);
+
+    // go through each batch of keyvalues and build separate index entries for each
+    boolean cleanupCurrentState = true;
+    for (Batch batch : batches) {
+      /*
+       * We have to split the work between the cleanup and the update for each group because when we
+       * update the current state of the row for the current batch (appending the mutations for the
+       * current batch) the next group will see that as the current state, which will can cause the
+       * a delete and a put to be created for the next group.
+       */
+      if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
+        cleanupCurrentState = false;
+      }
+    }
+  }
+
+  /**
+   * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
+   * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
+   * the time the method is called.
+   * @param m {@link Mutation} from which to extract the {@link KeyValue}s
+   * @return the mutation, broken into batches and sorted in ascending order (smallest first)
+   */
+  protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
+    Map<Long, Batch> batches = new HashMap<Long, Batch>();
+    for (List<KeyValue> family : m.getFamilyMap().values()) {
+      createTimestampBatchesFromKeyValues(family, batches);
+    }
+    // sort the batches
+    List<Batch> sorted = new ArrayList<Batch>(batches.values());
+    Collections.sort(sorted, new Comparator<Batch>() {
+      @Override
+      public int compare(Batch o1, Batch o2) {
+        return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
+      }
+    });
+    return sorted;
+  }
+
+  /**
+   * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
+   * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
+   * the time the method is called.
+   * @param kvs {@link KeyValue}s to break into batches
+   * @param batches to update with the given kvs
+   */
+  protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
+      Map<Long, Batch> batches) {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    byte[] nowBytes = Bytes.toBytes(now);
+
+    // batch kvs by timestamp
+    for (KeyValue kv : kvs) {
+      long ts = kv.getTimestamp();
+      // override the timestamp to the current time, so the index and primary tables match
+      // all the keys with LATEST_TIMESTAMP will then be put into the same batch
+      if (kv.updateLatestStamp(nowBytes)) {
+        ts = now;
+      }
+      Batch batch = batches.get(ts);
+      if (batch == null) {
+        batch = new Batch(ts);
+        batches.put(ts, batch);
+      }
+      batch.add(kv);
+    }
+  }
+
+  /**
+   * For a single batch, get all the index updates and add them to the updateMap
+   * <p>
+   * This method manages cleaning up the entire history of the row from the given timestamp forward
+   * for out-of-order (e.g. 'back in time') updates.
+   * <p>
+   * If things arrive out of order (client is using custom timestamps) we should still see the index
+   * in the correct order (assuming we scan after the out-of-order update in finished). Therefore,
+   * we when we aren't the most recent update to the index, we need to delete the state at the
+   * current timestamp (similar to above), but also issue a delete for the added index updates at
+   * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so
+   * it looks like it was also deleted at that next newest timestamp. However, its not enough to
+   * just update the one in front of us - that column will likely be applied to index entries up the
+   * entire history in front of us, which also needs to be fixed up.
+   * <p>
+   * However, the current update usually will be the most recent thing to be added. In that case,
+   * all we need to is issue a delete for the previous index row (the state of the row, without the
+   * update applied) at the current timestamp. This gets rid of anything currently in the index for
+   * the current state of the row (at the timestamp). Then we can just follow that by applying the
+   * pending update and building the index update based on the new row state.
+   * @param updateMap map to update with new index elements
+   * @param batch timestamp-based batch of edits
+   * @param state local state to update and pass to the codec
+   * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the
+   *          current state of the table, in the event of a 'back in time' batch. <tt>false</tt>
+   *          indicates we should not attempt the cleanup, e.g. an earlier batch already did the
+   *          cleanup.
+   * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put),
+   *         <tt>false</tt> otherwise
+ * @throws IOException 
+   */
+  private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch,
+      LocalTableState state, boolean requireCurrentStateCleanup) throws IOException {
+
+    // need a temporary manager for the current batch. It should resolve any conflicts for the
+    // current batch. Essentially, we can get the case where a batch doesn't change the current
+    // state of the index (all Puts are covered by deletes), in which case we don't want to add
+    // anything
+    // A. Get the correct values for the pending state in the batch
+    // A.1 start by cleaning up the current state - as long as there are key-values in the batch
+    // that are indexed, we need to change the current state of the index. Its up to the codec to
+    // determine if we need to make any cleanup given the pending update.
+    long batchTs = batch.getTimestamp();
+    state.setPendingUpdates(batch.getKvs());
+    addCleanupForCurrentBatch(updateMap, batchTs, state);
+
+    // A.2 do a single pass first for the updates to the current state
+    state.applyPendingUpdates();
+    long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
+    // if all the updates are the latest thing in the index, we are done - don't go and fix history
+    if (ColumnTracker.isNewestTime(minTs)) {
+      return false;
+    }
+
+    // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
+    // index. after this, we have the correct view of the index, from the batch up to the index
+    while(!ColumnTracker.isNewestTime(minTs) ){
+      minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
+    }
+
+    // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
+   if (requireCurrentStateCleanup) {
+      // roll back the pending update. This is needed so we can remove all the 'old' index entries.
+      // We don't need to do the puts here, but just the deletes at the given timestamps since we
+      // just want to completely hide the incorrect entries.
+      state.rollback(batch.getKvs());
+      // setup state
+      state.setPendingUpdates(batch.getKvs());
+
+      // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
+      // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
+      // because the update may have a different set of columns or value based on the update).
+      cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
+
+      // have to roll the state forward again, so the current state is correct
+      state.applyPendingUpdates();
+      return true;
+    }
+    return false;
+  }
+
+  private long addUpdateForGivenTimestamp(long ts, LocalTableState state,
+      IndexUpdateManager updateMap) throws IOException {
+    state.setCurrentTimestamp(ts);
+    ts = addCurrentStateMutationsForBatch(updateMap, state);
+    return ts;
+  }
+
+  private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs,
+      LocalTableState state) throws IOException {
+    // get the cleanup for the current state
+    state.setCurrentTimestamp(batchTs);
+    addDeleteUpdatesToMap(updateMap, state, batchTs);
+    // ignore any index tracking from the delete
+    state.resetTrackedColumns();
+  }
+  
+  /**
+   * Add the necessary mutations for the pending batch on the local state. Handles rolling up
+   * through history to determine the index changes after applying the batch (for the case where the
+   * batch is back in time).
+   * @param updateMap to update with index mutations
+   * @param batch to apply to the current state
+   * @param state current state of the table
+   * @return the minimum timestamp across all index columns requested. If
+   *         {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
+   *         timestamp, we know that this <i>was not a back-in-time update</i>.
+ * @throws IOException 
+   */
+  private long
+      addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
+
+    // get the index updates for this current batch
+    Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
+    state.resetTrackedColumns();
+
+    /*
+     * go through all the pending updates. If we are sure that all the entries are the latest
+     * timestamp, we can just add the index updates and move on. However, if there are columns that
+     * we skip past (based on the timestamp of the batch), we need to roll back up the history.
+     * Regardless of whether or not they are the latest timestamp, the entries here are going to be
+     * correct for the current batch timestamp, so we add them to the updates. The only thing we
+     * really care about it if we need to roll up the history and fix it as we go.
+     */
+    // timestamp of the next update we need to track
+    long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+    List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
+    for (IndexUpdate update : upserts) {
+      // this is the one bit where we check the timestamps
+      final ColumnTracker tracker = update.getIndexedColumns();
+      long trackerTs = tracker.getTS();
+      // update the next min TS we need to track
+      if (trackerTs < minTs) {
+        minTs = tracker.getTS();
+      }
+      // track index hints for the next round. Hint if we need an update for that column for the
+      // next timestamp. These columns clearly won't need to update as we go through time as they
+      // already match the most recent possible thing.
+      boolean needsCleanup = false;
+      if (tracker.hasNewerTimestamps()) {
+        columnHints.add(tracker);
+        // this update also needs to be cleaned up at the next timestamp because it not the latest.
+        needsCleanup = true;
+      }
+
+
+      // only make the put if the index update has been setup
+      if (update.isValid()) {
+        byte[] table = update.getTableName();
+        Mutation mutation = update.getUpdate();
+        updateMap.addIndexUpdate(table, mutation);
+
+        // only make the cleanup if we made a put and need cleanup
+        if (needsCleanup) {
+          // there is a TS for the interested columns that is greater than the columns in the
+          // put. Therefore, we need to issue a delete at the same timestamp
+          Delete d = new Delete(mutation.getRow());
+          d.setTimestamp(tracker.getTS());
+          updateMap.addIndexUpdate(table, d);
+        }
+      }
+    }
+    return minTs;
+  }
+
+  /**
+   * Cleanup the index based on the current state from the given batch. Iterates over each timestamp
+   * (for the indexed rows) for the current state of the table and cleans up all the existing
+   * entries generated by the codec.
+   * <p>
+   * Adds all pending updates to the updateMap
+   * @param updateMap updated with the pending index updates from the codec
+   * @param batchTs timestamp from which we should cleanup
+   * @param state current state of the primary table. Should already by setup to the correct state
+   *          from which we want to cleanup.
+ * @throws IOException 
+   */
+  private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap,
+      long batchTs, LocalTableState state) throws IOException {
+    // get the cleanup for the current state
+    state.setCurrentTimestamp(batchTs);
+    addDeleteUpdatesToMap(updateMap, state, batchTs);
+    Set<ColumnTracker> trackers = state.getTrackedColumns();
+    long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
+    for (ColumnTracker tracker : trackers) {
+      if (tracker.getTS() < minTs) {
+        minTs = tracker.getTS();
+      }
+    }
+    state.resetTrackedColumns();
+    if (!ColumnTracker.isNewestTime(minTs)) {
+      state.setHints(Lists.newArrayList(trackers));
+      cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
+    }
+  }
+
+
+  /**
+   * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
+   * add them to the update map.
+   * <p>
+   * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
+   * applied, etc).
+ * @throws IOException 
+   */
+  protected void
+      addDeleteUpdatesToMap(IndexUpdateManager updateMap,
+      LocalTableState state, long ts) throws IOException {
+    Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
+    if (cleanup != null) {
+      for (IndexUpdate d : cleanup) {
+        if (!d.isValid()) {
+          continue;
+        }
+        // override the timestamps in the delete to match the current batch.
+        Delete remove = (Delete)d.getUpdate();
+        remove.setTimestamp(ts);
+        updateMap.addIndexUpdate(d.getTableName(), remove);
+      }
+    }
+  }
+
+  @Override
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
+    // stores all the return values
+    IndexUpdateManager updateMap = new IndexUpdateManager();
+
+    // We have to figure out which kind of delete it is, since we need to do different things if its
+    // a general (row) delete, versus a delete of just a single column or family
+    Map<byte[], List<KeyValue>> families = d.getFamilyMap();
+
+    /*
+     * Option 1: its a row delete marker, so we just need to delete the most recent state for each
+     * group, as of the specified timestamp in the delete. This can happen if we have a single row
+     * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
+     * bug?). In a single delete, this delete gets all the column families appended, so the family
+     * map won't be empty by the time it gets here.
+     */
+    if (families.size() == 0) {
+      LocalTableState state = new LocalTableState(env, localTable, d);
+      // get a consistent view of name
+      long now = d.getTimeStamp();
+      if (now == HConstants.LATEST_TIMESTAMP) {
+        now = EnvironmentEdgeManager.currentTimeMillis();
+        // update the delete's idea of 'now' to be consistent with the index
+        d.setTimestamp(now);
+      }
+      // get deletes from the codec
+      // we only need to get deletes and not add puts because this delete covers all columns
+      addDeleteUpdatesToMap(updateMap, state, now);
+
+      /*
+       * Update the current state for all the kvs in the delete. Generally, we would just iterate
+       * the family map, but since we go here, the family map is empty! Therefore, we need to fake a
+       * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
+       * for current version of HBase that has an issue where the batch update doesn't update the
+       * deletes before calling the hook.
+       */
+      byte[] deleteRow = d.getRow();
+      for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
+        state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
+            KeyValue.Type.DeleteFamily));
+      }
+    } else {
+      // Option 2: Its actually a bunch single updates, which can have different timestamps.
+      // Therefore, we need to do something similar to the put case and batch by timestamp
+      batchMutationAndAddUpdates(updateMap, d);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
+    }
+
+    return updateMap.toMap();
+  }
+
+  @Override
+  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
+      Collection<KeyValue> filtered) throws IOException {
+    // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
+    return null;
+  }
+
+  /**
+   * Exposed for testing!
+   * @param codec codec to use for this instance of the builder
+   */
+  public void setIndexCodecForTesting(IndexCodec codec) {
+    this.codec = codec;
+  }
+
+  @Override
+  public boolean isEnabled(Mutation m) throws IOException {
+    // ask the codec to see if we should even attempt indexing
+    return this.codec.isEnabled(m);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-phoenix/blob/c5b80246/src/main/java/org/apache/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/hbase/index/covered/IndexCodec.java b/src/main/java/org/apache/hbase/index/covered/IndexCodec.java
new file mode 100644
index 0000000..267ea33
--- /dev/null
+++ b/src/main/java/org/apache/hbase/index/covered/IndexCodec.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hbase.index.covered;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+
+import org.apache.phoenix.index.BaseIndexCodec;
+
+
+/**
+ * Codec for creating index updates from the current state of a table.
+ * <p>
+ * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as
+ * features need to be added to the codec, as well as potentially not haivng to implement some
+ * methods.
+ */
+public interface IndexCodec {
+
+  /**
+   * Do any code initialization necessary
+   * @param env environment in which the codec is operating
+   * @throws IOException if the codec cannot be initalized correctly
+   */
+  public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+
+  /**
+   * Get the index cleanup entries. Currently, this must return just single row deletes (where just
+   * the row-key is specified and no columns are returned) mapped to the table name. For instance,
+   * to you have an index 'myIndex' with row :
+   * 
+   * <pre>
+   * v1,v2,v3 | CF:CQ0  | rowkey
+   *          | CF:CQ1  | rowkey
+   * </pre>
+   * 
+   * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
+   * @param state the current state of the table that needs to be cleaned up. Generally, you only
+   *          care about the latest column values, for each column you are indexing for each index
+   *          table.
+   * @return the pairs of (deletes, index table name) that should be applied.
+ * @throws IOException 
+   */
+  public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+
+  // table state has the pending update already applied, before calling
+  // get the new index entries
+  /**
+   * Get the index updates for the primary table state, for each index table. The returned
+   * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same
+   * key-values multiple times.
+   * <p>
+   * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so
+   * the index entries match the primary table row. This could be managed at a higher level, but
+   * would require iterating all the kvs in the Put again - very inefficient when compared to the
+   * current interface where you must provide a timestamp anyways (so you might as well provide the
+   * right one).
+   * @param state the current state of the table that needs to an index update Generally, you only
+   *          care about the latest column values, for each column you are indexing for each index
+   *          table.
+   * @return the pairs of (updates,index table name) that should be applied.
+ * @throws IOException 
+   */
+  public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+
+  /**
+   * This allows the codec to dynamically change whether or not indexing should take place for a
+   * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
+   * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
+   * <p>
+   * We can also be smart about even indexing a given update here too - if the update doesn't
+   * contain any columns that we care about indexing, we can save the effort of analyzing the put
+   * and further.
+   * @param m mutation that should be indexed.
+   * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
+   *         basis, as each codec is instantiated per-region.
+ * @throws IOException 
+   */
+  public boolean isEnabled(Mutation m) throws IOException;
+
+  /**
+   * Get the batch identifier of the given mutation. Generally, updates to the table will take place
+   * in a batch of updates; if we know that the mutation is part of a batch, we can build the state
+   * much more intelligently.
+   * <p>
+   * <b>If you have batches that have multiple updates to the same row state, you must specify a
+   * batch id for each batch. Otherwise, we cannot guarantee index correctness</b>
+   * @param m mutation that may or may not be part of the batch
+   * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
+   */
+  public byte[] getBatchId(Mutation m);
+}
\ No newline at end of file