You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@phoenix.apache.org by ja...@apache.org on 2015/03/27 18:35:46 UTC

[3/3] phoenix git commit: Secondary indexing with txns

Secondary indexing with txns


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/5a558e16
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/5a558e16
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/5a558e16

Branch: refs/heads/txn
Commit: 5a558e16cd7b1e882b274683aff6b655f952dac1
Parents: 826ebf5
Author: James Taylor <jt...@salesforce.com>
Authored: Fri Mar 27 10:35:37 2015 -0700
Committer: James Taylor <jt...@salesforce.com>
Committed: Fri Mar 27 10:35:37 2015 -0700

----------------------------------------------------------------------
 .../EndToEndCoveredColumnsIndexBuilderIT.java   |  12 +-
 .../covered/example/FailWithoutRetriesIT.java   | 198 +++---
 .../apache/phoenix/hbase/index/ValueGetter.java |   4 +-
 .../hbase/index/builder/BaseIndexBuilder.java   | 170 +++--
 .../hbase/index/builder/IndexBuildManager.java  |  14 -
 .../hbase/index/builder/IndexBuilder.java       |  47 +-
 .../phoenix/hbase/index/covered/Batch.java      |  11 +-
 .../covered/CoveredColumnsIndexBuilder.java     | 491 ---------------
 .../phoenix/hbase/index/covered/IndexCodec.java | 172 +++--
 .../hbase/index/covered/LocalTableState.java    | 453 +++++++------
 .../hbase/index/covered/NonTxIndexBuilder.java  | 405 ++++++++++++
 .../phoenix/hbase/index/covered/TableState.java |  45 +-
 .../hbase/index/covered/TxIndexBuilder.java     | 247 ++++++++
 .../index/covered/data/LazyValueGetter.java     |  18 +-
 .../example/CoveredColumnIndexCodec.java        | 628 +++++++++----------
 .../CoveredColumnIndexSpecifierBuilder.java     |   4 +-
 .../covered/example/CoveredColumnIndexer.java   |   9 +-
 .../index/covered/update/ColumnTracker.java     |   3 +-
 .../covered/update/IndexUpdateManager.java      |   5 +-
 .../hbase/index/scanner/EmptyScanner.java       |   8 +-
 .../phoenix/hbase/index/scanner/Scanner.java    |   7 +-
 .../hbase/index/scanner/ScannerBuilder.java     |  12 +-
 .../hbase/index/write/IndexFailurePolicy.java   |   4 +-
 .../apache/phoenix/index/IndexMaintainer.java   |  28 +-
 .../phoenix/index/PhoenixIndexBuilder.java      |  90 +--
 .../apache/phoenix/index/PhoenixIndexCodec.java | 145 ++---
 .../index/PhoenixIndexFailurePolicy.java        |   2 +-
 .../phoenix/index/PhoenixTxIndexBuilder.java    |  53 ++
 .../index/PhoenixTxIndexFailurePolicy.java      |  50 ++
 .../query/ConnectionQueryServicesImpl.java      |  10 +-
 .../phoenix/schema/tuple/ValueGetterTuple.java  |  16 +-
 .../java/org/apache/phoenix/util/IndexUtil.java |  64 +-
 .../covered/CoveredIndexCodecForTesting.java    |  93 ++-
 .../index/covered/TestLocalTableState.java      |  20 +-
 .../example/TestCoveredColumnIndexCodec.java    |  16 +-
 .../phoenix/index/IndexMaintainerTest.java      |   2 +-
 36 files changed, 1904 insertions(+), 1652 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
index 84c6827..fa85f00 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/EndToEndCoveredColumnsIndexBuilderIT.java
@@ -59,7 +59,7 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 /**
- * End-to-End test of just the {@link CoveredColumnsIndexBuilder}, but with a simple
+ * End-to-End test of just the {@link NonTxIndexBuilder}, but with a simple
  * {@link IndexCodec} and BatchCache implementation.
  */
 @Category(NeedsOwnMiniClusterTest.class)
@@ -151,7 +151,7 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
             ((LocalTableState) state).getIndexedColumnsTableState(Arrays.asList(columns)).getFirst();
 
         int count = 0;
-        KeyValue kv;
+        Cell kv;
         while ((kv = kvs.next()) != null) {
           Cell next = expectedKvs.get(count++);
           assertEquals(
@@ -302,9 +302,9 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     Map<String, String> indexerOpts = new HashMap<String, String>();
     // just need to set the codec - we are going to set it later, but we need something here or the
     // initializer blows up.
-    indexerOpts.put(CoveredColumnsIndexBuilder.CODEC_CLASS_NAME_KEY,
+    indexerOpts.put(NonTxIndexBuilder.CODEC_CLASS_NAME_KEY,
       CoveredIndexCodecForTesting.class.getName());
-    Indexer.enableIndexing(desc, CoveredColumnsIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
+    Indexer.enableIndexing(desc, NonTxIndexBuilder.class, indexerOpts, Coprocessor.PRIORITY_USER);
 
     // create the table
     HBaseAdmin admin = UTIL.getHBaseAdmin();
@@ -315,8 +315,8 @@ public class EndToEndCoveredColumnsIndexBuilderIT {
     HRegion region = UTIL.getMiniHBaseCluster().getRegions(tableNameBytes).get(0);
     Indexer indexer =
         (Indexer) region.getCoprocessorHost().findCoprocessor(Indexer.class.getName());
-    CoveredColumnsIndexBuilder builder =
-        (CoveredColumnsIndexBuilder) indexer.getBuilderForTesting();
+    NonTxIndexBuilder builder =
+        (NonTxIndexBuilder) indexer.getBuilderForTesting();
     VerifyingIndexCodec codec = new VerifyingIndexCodec();
     builder.setIndexCodecForTesting(codec);
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
index dbe78e7..281ad63 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/hbase/index/covered/example/FailWithoutRetriesIT.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered.example;
 
@@ -32,6 +24,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -49,105 +42,106 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-
 /**
- * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String}
- * constructor), {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize
- * the exception, and just return <tt>null</tt> to the client, which then just goes and retries.
+ * If {@link DoNotRetryIOException} is not subclassed correctly (with the {@link String} constructor),
+ * {@link MultiResponse#readFields(java.io.DataInput)} will not correctly deserialize the exception, and just return
+ * <tt>null</tt> to the client, which then just goes and retries.
  */
 @Category(NeedsOwnMiniClusterTest.class)
 public class FailWithoutRetriesIT {
 
-  private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
-  @Rule
-  public TableName table = new TableName();
+    private static final Log LOG = LogFactory.getLog(FailWithoutRetriesIT.class);
+    @Rule
+    public TableName table = new TableName();
+
+    private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+    private String getIndexTableName() {
+        return Bytes.toString(table.getTableName()) + "_index";
+    }
 
-  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+    public static class FailingTestCodec extends BaseIndexCodec {
 
-  private String getIndexTableName() {
-    return Bytes.toString(table.getTableName()) + "_index";
-  }
+        @Override
+        public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
+            throw new RuntimeException("Intentionally failing deletes for " + FailWithoutRetriesIT.class.getName());
+        }
 
-  public static class FailingTestCodec extends BaseIndexCodec {
+        @Override
+        public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
+            throw new RuntimeException("Intentionally failing upserts for " + FailWithoutRetriesIT.class.getName());
+        }
+
+        @Override
+        public void setContext(TableState state, Mutation mutation) throws IOException {}
+
+    }
 
-    @Override
-    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException {
-      throw new RuntimeException("Intentionally failing deletes for "
-          + FailWithoutRetriesIT.class.getName());
+    @BeforeClass
+    public static void setupCluster() throws Exception {
+        // setup and verify the config
+        Configuration conf = UTIL.getConfiguration();
+        setUpConfigForMiniCluster(conf);
+        IndexTestingUtils.setupConfig(conf);
+        IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
+        // start the cluster
+        UTIL.startMiniCluster();
     }
 
-    @Override
-    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException {
-      throw new RuntimeException("Intentionally failing upserts for "
-          + FailWithoutRetriesIT.class.getName());
+    @AfterClass
+    public static void teardownCluster() throws Exception {
+        UTIL.shutdownMiniCluster();
     }
 
-  }
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    // setup and verify the config
-    Configuration conf = UTIL.getConfiguration();
-    setUpConfigForMiniCluster(conf);
-    IndexTestingUtils.setupConfig(conf);
-    IndexManagementUtil.ensureMutableIndexingCorrectlyConfigured(conf);
-    // start the cluster
-    UTIL.startMiniCluster();
-  }
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    UTIL.shutdownMiniCluster();
-  }
-
-  /**
-   * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't
-   * rethrowing the exception correctly?
-   * <p>
-   * We use a custom codec to enforce the thrown exception.
-   * @throws Exception
-   */
-  @Test(timeout = 300000)
-  public void testQuickFailure() throws Exception {
-    // incorrectly setup indexing for the primary table - target index table doesn't exist, which
-    // should quickly return to the client
-    byte[] family = Bytes.toBytes("family");
-    ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
-    // values are [col1]
-    fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
-    CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
-    // add the index family
-    builder.addIndexGroup(fam1);
-    // usually, we would create the index table here, but we don't for the sake of the test.
-
-    // setup the primary table
-    String primaryTable = Bytes.toString(table.getTableName());
-    @SuppressWarnings("deprecation")
-    HTableDescriptor pTable = new HTableDescriptor(primaryTable);
-    pTable.addFamily(new HColumnDescriptor(family));
-    // override the codec so we can use our test one
-    builder.build(pTable, FailingTestCodec.class);
-
-    // create the primary table
-    HBaseAdmin admin = UTIL.getHBaseAdmin();
-    admin.createTable(pTable);
-    Configuration conf = new Configuration(UTIL.getConfiguration());
-    // up the number of retries/wait time to make it obvious that we are failing with retries here
-    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
-    conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
-    HTable primary = new HTable(conf, primaryTable);
-    primary.setAutoFlush(false, true);
-
-    // do a simple put that should be indexed
-    Put p = new Put(Bytes.toBytes("row"));
-    p.add(family, null, Bytes.toBytes("value"));
-    primary.put(p);
-    try {
-      primary.flushCommits();
-      fail("Shouldn't have gotten a successful write to the primary table");
-    } catch (RetriesExhaustedWithDetailsException e) {
-      LOG.info("Correclty got a failure of the put!");
+    /**
+     * If this test times out, then we didn't fail quickly enough. {@link Indexer} maybe isn't rethrowing the exception
+     * correctly?
+     * <p>
+     * We use a custom codec to enforce the thrown exception.
+     * 
+     * @throws Exception
+     */
+    @Test(timeout = 300000)
+    public void testQuickFailure() throws Exception {
+        // incorrectly setup indexing for the primary table - target index table doesn't exist, which
+        // should quickly return to the client
+        byte[] family = Bytes.toBytes("family");
+        ColumnGroup fam1 = new ColumnGroup(getIndexTableName());
+        // values are [col1]
+        fam1.add(new CoveredColumn(family, CoveredColumn.ALL_QUALIFIERS));
+        CoveredColumnIndexSpecifierBuilder builder = new CoveredColumnIndexSpecifierBuilder();
+        // add the index family
+        builder.addIndexGroup(fam1);
+        // usually, we would create the index table here, but we don't for the sake of the test.
+
+        // setup the primary table
+        String primaryTable = Bytes.toString(table.getTableName());
+        @SuppressWarnings("deprecation")
+        HTableDescriptor pTable = new HTableDescriptor(primaryTable);
+        pTable.addFamily(new HColumnDescriptor(family));
+        // override the codec so we can use our test one
+        builder.build(pTable, FailingTestCodec.class);
+
+        // create the primary table
+        HBaseAdmin admin = UTIL.getHBaseAdmin();
+        admin.createTable(pTable);
+        Configuration conf = new Configuration(UTIL.getConfiguration());
+        // up the number of retries/wait time to make it obvious that we are failing with retries here
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 20);
+        conf.setLong(HConstants.HBASE_CLIENT_PAUSE, 1000);
+        HTable primary = new HTable(conf, primaryTable);
+        primary.setAutoFlush(false, true);
+
+        // do a simple put that should be indexed
+        Put p = new Put(Bytes.toBytes("row"));
+        p.add(family, null, Bytes.toBytes("value"));
+        primary.put(p);
+        try {
+            primary.flushCommits();
+            fail("Shouldn't have gotten a successful write to the primary table");
+        } catch (RetriesExhaustedWithDetailsException e) {
+            LOG.info("Correclty got a failure of the put!");
+        }
+        primary.close();
     }
-    primary.close();
-  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
index a6e36cb..bcadc2b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/ValueGetter.java
@@ -19,8 +19,8 @@ package org.apache.phoenix.hbase.index;
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
-import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 
 public interface ValueGetter {
 
@@ -32,7 +32,7 @@ public interface ValueGetter {
    *         present.
    * @throws IOException if there is an error accessing the underlying data storage
    */
-  public ImmutableBytesPtr getLatestValue(ColumnReference ref) throws IOException;
+  public ImmutableBytesWritable getLatestValue(ColumnReference ref) throws IOException;
   
   public byte[] getRowKey();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
index f9df296..dfb9ad4 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/BaseIndexBuilder.java
@@ -1,95 +1,127 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.builder;
 
 import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
-import org.apache.phoenix.hbase.index.covered.CoveredColumnsIndexBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.phoenix.hbase.index.covered.IndexCodec;
+import org.apache.phoenix.hbase.index.covered.NonTxIndexBuilder;
 
 /**
  * Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing.
  * <p>
- * You should extend this class, rather than implementing IndexBuilder directly to maintain
- * compatability going forward.
+ * You should extend this class, rather than implementing IndexBuilder directly to maintain compatability going forward.
  * <p>
- * Generally, you should consider using one of the implemented IndexBuilders (e.g
- * {@link CoveredColumnsIndexBuilder}) as there is a lot of work required to keep an index table
- * up-to-date.
+ * Generally, you should consider using one of the implemented IndexBuilders (e.g {@link NonTxIndexBuilder}) as there is
+ * a lot of work required to keep an index table up-to-date.
  */
 public abstract class BaseIndexBuilder implements IndexBuilder {
+    public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
+    private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
 
-  private static final Log LOG = LogFactory.getLog(BaseIndexBuilder.class);
-  protected boolean stopped;
+    protected boolean stopped;
+    protected RegionCoprocessorEnvironment env;
+    protected IndexCodec codec;
 
-  @Override
-  public void extendBaseIndexBuilderInstead() { }
-  
-  @Override
-  public void setup(RegionCoprocessorEnvironment conf) throws IOException {
-    // noop
-  }
+    abstract protected boolean useRawScanToPrimeBlockCache();
+    
+    @Override
+    public void extendBaseIndexBuilderInstead() {}
 
-  @Override
-  public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
-    // noop
-  }
+    @Override
+    public void setup(RegionCoprocessorEnvironment env) throws IOException {
+        this.env = env;
+        // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
+        // so we can use it later when generalizing covered indexes
+        Configuration conf = env.getConfiguration();
+        Class<? extends IndexCodec> codecClass = conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
+        try {
+            Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
+            meth.setAccessible(true);
+            this.codec = meth.newInstance();
+            this.codec.initialize(env);
+        } catch (IOException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new IOException(e);
+        }
+    }
 
-  @Override
-  public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
-    // noop
-  }
-  
-  /**
-   * By default, we always attempt to index the mutation. Commonly this can be slow (because the
-   * framework spends the time to do the indexing, only to realize that you don't need it) or not
-   * ideal (if you want to turn on/off indexing on a table without completely reloading it).
- * @throws IOException 
-   */
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    return true; 
-  }
+    @Override
+    public void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
+        // noop
+    }
 
-  /**
-   * {@inheritDoc}
-   * <p>
-   * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each
-   * mutation always applies to different rows, even if they are in the same batch, or are
-   * independent updates.
-   */
-  @Override
-  public byte[] getBatchId(Mutation m) {
-    return null;
-  }
+    @Override
+    public void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp) {
+        // noop
+    }
 
-  @Override
-  public void stop(String why) {
-    LOG.debug("Stopping because: " + why);
-    this.stopped = true;
-  }
+    /**
+     * By default, we always attempt to index the mutation. Commonly this can be slow (because the framework spends the
+     * time to do the indexing, only to realize that you don't need it) or not ideal (if you want to turn on/off
+     * indexing on a table without completely reloading it).
+     * 
+     * @throws IOException
+     */
+    @Override
+    public boolean isEnabled(Mutation m) throws IOException {
+        // ask the codec to see if we should even attempt indexing
+        return this.codec.isEnabled(m);
+    }
 
-  @Override
-  public boolean isStopped() {
-    return this.stopped;
-  }
+    /**
+     * Exposed for testing!
+     * 
+     * @param codec
+     *            codec to use for this instance of the builder
+     */
+    public void setIndexCodecForTesting(IndexCodec codec) {
+        this.codec = codec;
+    }
+
+    @Override
+    public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(Collection<KeyValue> filtered)
+            throws IOException {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p>
+     * By default, assumes that all mutations should <b>not be batched</b>. That is to say, each mutation always applies
+     * to different rows, even if they are in the same batch, or are independent updates.
+     */
+    @Override
+    public byte[] getBatchId(Mutation m) {
+        return this.codec.getBatchId(m);
+    }
+
+    @Override
+    public void stop(String why) {
+        LOG.debug("Stopping because: " + why);
+        this.stopped = true;
+    }
+
+    @Override
+    public boolean isStopped() {
+        return this.stopped;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
index ba9534c..d5fd34d 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuildManager.java
@@ -29,7 +29,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -156,19 +155,6 @@ public class IndexBuildManager implements Stoppable {
     return results;
   }
 
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException {
-    // all we get is a single update, so it would probably just go slower if we needed to queue it
-    // up. It will increase underlying resource contention a little bit, but the mutation case is
-    // far more common, so let's not worry about it for now.
-    // short circuit so we don't waste time.
-    if (!this.delegate.isEnabled(delete)) {
-      return null;
-    }
-
-    return delegate.getIndexUpdate(delete);
-
-  }
-
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
       Collection<KeyValue> filtered) throws IOException {
     // this is run async, so we can take our time here

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
index b91a52a..194fdcc 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/builder/IndexBuilder.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
@@ -79,32 +78,26 @@ public interface IndexBuilder extends Stoppable {
   
   public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException;
 
-  /**
-   * The counter-part to {@link #getIndexUpdate(Mutation)} - your opportunity to update any/all
-   * index tables based on the delete of the primary table row. This is only called for cases where
-   * the client sends a single delete ({@link HTable#delete}). We separate this method from
-   * {@link #getIndexUpdate(Mutation)} only for the ease of implementation as the delete path has
-   * subtly different semantics for updating the families/timestamps from the generic batch path.
-   * <p>
-   * Its up to your implementation to ensure that timestamps match between the primary and index
-   * tables.
-   * <p>
-   * Implementers must ensure that this method is thread-safe - it could (and probably will) be
-   * called concurrently for different mutations, which may or may not be part of the same batch.
-   * @param delete {@link Delete} to the primary table that may be indexed
-   * @return a {@link Map} of the mutations to make -> target index table name
-   * @throws IOException on failure
-   */
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete) throws IOException;
-
-  /**
-   * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal
-   * flush or compaction mechanisms.
-   * @param filtered {@link KeyValue}s that previously existed, but won't be included in further
-   *          output from HBase.
-   * @return a {@link Map} of the mutations to make -> target index table name
-   * @throws IOException on failure
-   */
+    /**
+     * Build an index update to cleanup the index when we remove {@link KeyValue}s via the normal flush or compaction
+     * mechanisms. Currently not implemented by any implementors nor called, but left here to be implemented if we
+     * ever need it. In Jesse's words:
+     * 
+     * Arguably, this is a correctness piece that should be used, but isn't. Basically, it *could* be that
+     * if a compaction/flush were to remove a key (too old, too many versions) you might want to cleanup the index table
+     * as well, if it were to get out of sync with the primary table. For instance, you might get multiple versions of
+     * the same row, which should eventually age of the oldest version. However, in the index table there would only
+     * ever be two entries for that row - the first one, matching the original row, and the delete marker for the index
+     * update, set when we got a newer version of the primary row. So, a basic HBase scan wouldn't show the index update
+     * b/c its covered by the delete marker, but an older timestamp based read would actually show the index row, even
+     * after the primary table row is gone due to MAX_VERSIONS requirement.
+     *  
+     * @param filtered {@link KeyValue}s that previously existed, but won't be included
+     * in further output from HBase.
+     * 
+     * @return a {@link Map} of the mutations to make -> target index table name
+     * @throws IOException on failure
+     */
   public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
       Collection<KeyValue> filtered)
       throws IOException;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
index e707ea2..5e0da3c 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/Batch.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.hbase.index.covered;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 
 /**
@@ -27,9 +28,9 @@ import org.apache.hadoop.hbase.KeyValue;
  */
 public class Batch {
 
-  private static final long pointDeleteCode = KeyValue.Type.Delete.getCode();
+  private static final byte pointDeleteCode = KeyValue.Type.Delete.getCode();
   private final long timestamp;
-  private List<KeyValue> batch = new ArrayList<KeyValue>();
+  private List<Cell> batch = new ArrayList<Cell>();
   private boolean allPointDeletes = true;
 
   /**
@@ -39,8 +40,8 @@ public class Batch {
     this.timestamp = ts;
   }
 
-  public void add(KeyValue kv){
-    if (pointDeleteCode != kv.getType()) {
+  public void add(Cell kv){
+    if (pointDeleteCode != kv.getTypeByte()) {
       allPointDeletes = false;
     }
     batch.add(kv);
@@ -54,7 +55,7 @@ public class Batch {
     return this.timestamp;
   }
 
-  public List<KeyValue> getKvs() {
+  public List<Cell> getKvs() {
     return this.batch;
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
deleted file mode 100644
index 6524fd4..0000000
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/CoveredColumnsIndexBuilder.java
+++ /dev/null
@@ -1,491 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.hbase.index.covered;
-
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
-
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Longs;
-import org.apache.phoenix.hbase.index.builder.BaseIndexBuilder;
-import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
-import org.apache.phoenix.hbase.index.covered.data.LocalTable;
-import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
-import org.apache.phoenix.hbase.index.covered.update.IndexUpdateManager;
-import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
-
-/**
- * Build covered indexes for phoenix updates.
- * <p>
- * Before any call to prePut/preDelete, the row has already been locked. This ensures that we don't
- * need to do any extra synchronization in the IndexBuilder.
- * <p>
- * NOTE: This implementation doesn't cleanup the index when we remove a key-value on compaction or
- * flush, leading to a bloated index that needs to be cleaned up by a background process.
- */
-public class CoveredColumnsIndexBuilder extends BaseIndexBuilder {
-
-  private static final Log LOG = LogFactory.getLog(CoveredColumnsIndexBuilder.class);
-  public static final String CODEC_CLASS_NAME_KEY = "org.apache.hadoop.hbase.index.codec.class";
-
-  protected RegionCoprocessorEnvironment env;
-  protected IndexCodec codec;
-  protected LocalHBaseState localTable;
-
-  @Override
-  public void setup(RegionCoprocessorEnvironment env) throws IOException {
-    this.env = env;
-    // setup the phoenix codec. Generally, this will just be in standard one, but abstracting here
-    // so we can use it later when generalizing covered indexes
-    Configuration conf = env.getConfiguration();
-    Class<? extends IndexCodec> codecClass =
-        conf.getClass(CODEC_CLASS_NAME_KEY, null, IndexCodec.class);
-    try {
-      Constructor<? extends IndexCodec> meth = codecClass.getDeclaredConstructor(new Class[0]);
-      meth.setAccessible(true);
-      this.codec = meth.newInstance();
-      this.codec.initialize(env);
-    } catch (IOException e) {
-      throw e;
-    } catch (Exception e) {
-      throw new IOException(e);
-    }
-    
-    this.localTable = new LocalTable(env);
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Mutation mutation) throws IOException {
-    // build the index updates for each group
-    IndexUpdateManager updateMap = new IndexUpdateManager();
-
-    batchMutationAndAddUpdates(updateMap, mutation);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found index updates for Mutation: " + mutation + "\n" + updateMap);
-    }
-
-    return updateMap.toMap();
-  }
-
-  /**
-   * Split the mutation into batches based on the timestamps of each keyvalue. We need to check each
-   * key-value in the update to see if it matches the others. Generally, this will be the case, but
-   * you can add kvs to a mutation that don't all have the timestamp, so we need to manage
-   * everything in batches based on timestamp.
-   * <p>
-   * Adds all the updates in the {@link Mutation} to the state, as a side-effect.
-   * @param updateMap index updates into which to add new updates. Modified as a side-effect.
-   * @param state current state of the row for the mutation.
-   * @param m mutation to batch
- * @throws IOException 
-   */
-  private void batchMutationAndAddUpdates(IndexUpdateManager manager, Mutation m) throws IOException {
-    // split the mutation into timestamp-based batches
-    Collection<Batch> batches = createTimestampBatchesFromMutation(m);
-
-    // create a state manager, so we can manage each batch
-    LocalTableState state = new LocalTableState(env, localTable, m);
-
-    // go through each batch of keyvalues and build separate index entries for each
-    boolean cleanupCurrentState = true;
-    for (Batch batch : batches) {
-      /*
-       * We have to split the work between the cleanup and the update for each group because when we
-       * update the current state of the row for the current batch (appending the mutations for the
-       * current batch) the next group will see that as the current state, which will can cause the
-       * a delete and a put to be created for the next group.
-       */
-      if (addMutationsForBatch(manager, batch, state, cleanupCurrentState)) {
-        cleanupCurrentState = false;
-      }
-    }
-  }
-
-  /**
-   * Batch all the {@link KeyValue}s in a {@link Mutation} by timestamp. Updates any
-   * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
-   * the time the method is called.
-   * @param m {@link Mutation} from which to extract the {@link KeyValue}s
-   * @return the mutation, broken into batches and sorted in ascending order (smallest first)
-   */
-  protected Collection<Batch> createTimestampBatchesFromMutation(Mutation m) {
-    Map<Long, Batch> batches = new HashMap<Long, Batch>();
-    for (List<Cell> family : m.getFamilyCellMap().values()) {
-      List<KeyValue> familyKVs = KeyValueUtil.ensureKeyValues(family);
-      createTimestampBatchesFromKeyValues(familyKVs, batches);
-    }
-    // sort the batches
-    List<Batch> sorted = new ArrayList<Batch>(batches.values());
-    Collections.sort(sorted, new Comparator<Batch>() {
-      @Override
-      public int compare(Batch o1, Batch o2) {
-        return Longs.compare(o1.getTimestamp(), o2.getTimestamp());
-      }
-    });
-    return sorted;
-  }
-
-  /**
-   * Batch all the {@link KeyValue}s in a collection of kvs by timestamp. Updates any
-   * {@link KeyValue} with a timestamp == {@link HConstants#LATEST_TIMESTAMP} to the timestamp at
-   * the time the method is called.
-   * @param kvs {@link KeyValue}s to break into batches
-   * @param batches to update with the given kvs
-   */
-  protected void createTimestampBatchesFromKeyValues(Collection<KeyValue> kvs,
-      Map<Long, Batch> batches) {
-    long now = EnvironmentEdgeManager.currentTimeMillis();
-    byte[] nowBytes = Bytes.toBytes(now);
-
-    // batch kvs by timestamp
-    for (KeyValue kv : kvs) {
-      long ts = kv.getTimestamp();
-      // override the timestamp to the current time, so the index and primary tables match
-      // all the keys with LATEST_TIMESTAMP will then be put into the same batch
-      if (kv.updateLatestStamp(nowBytes)) {
-        ts = now;
-      }
-      Batch batch = batches.get(ts);
-      if (batch == null) {
-        batch = new Batch(ts);
-        batches.put(ts, batch);
-      }
-      batch.add(kv);
-    }
-  }
-
-  /**
-   * For a single batch, get all the index updates and add them to the updateMap
-   * <p>
-   * This method manages cleaning up the entire history of the row from the given timestamp forward
-   * for out-of-order (e.g. 'back in time') updates.
-   * <p>
-   * If things arrive out of order (client is using custom timestamps) we should still see the index
-   * in the correct order (assuming we scan after the out-of-order update in finished). Therefore,
-   * we when we aren't the most recent update to the index, we need to delete the state at the
-   * current timestamp (similar to above), but also issue a delete for the added index updates at
-   * the next newest timestamp of any of the columns in the update; we need to cleanup the insert so
-   * it looks like it was also deleted at that next newest timestamp. However, its not enough to
-   * just update the one in front of us - that column will likely be applied to index entries up the
-   * entire history in front of us, which also needs to be fixed up.
-   * <p>
-   * However, the current update usually will be the most recent thing to be added. In that case,
-   * all we need to is issue a delete for the previous index row (the state of the row, without the
-   * update applied) at the current timestamp. This gets rid of anything currently in the index for
-   * the current state of the row (at the timestamp). Then we can just follow that by applying the
-   * pending update and building the index update based on the new row state.
-   * @param updateMap map to update with new index elements
-   * @param batch timestamp-based batch of edits
-   * @param state local state to update and pass to the codec
-   * @param requireCurrentStateCleanup <tt>true</tt> if we should should attempt to cleanup the
-   *          current state of the table, in the event of a 'back in time' batch. <tt>false</tt>
-   *          indicates we should not attempt the cleanup, e.g. an earlier batch already did the
-   *          cleanup.
-   * @return <tt>true</tt> if we cleaned up the current state forward (had a back-in-time put),
-   *         <tt>false</tt> otherwise
- * @throws IOException 
-   */
-  private boolean addMutationsForBatch(IndexUpdateManager updateMap, Batch batch,
-      LocalTableState state, boolean requireCurrentStateCleanup) throws IOException {
-
-    // need a temporary manager for the current batch. It should resolve any conflicts for the
-    // current batch. Essentially, we can get the case where a batch doesn't change the current
-    // state of the index (all Puts are covered by deletes), in which case we don't want to add
-    // anything
-    // A. Get the correct values for the pending state in the batch
-    // A.1 start by cleaning up the current state - as long as there are key-values in the batch
-    // that are indexed, we need to change the current state of the index. Its up to the codec to
-    // determine if we need to make any cleanup given the pending update.
-    long batchTs = batch.getTimestamp();
-    state.setPendingUpdates(batch.getKvs());
-    addCleanupForCurrentBatch(updateMap, batchTs, state);
-
-    // A.2 do a single pass first for the updates to the current state
-    state.applyPendingUpdates();
-    long minTs = addUpdateForGivenTimestamp(batchTs, state, updateMap);
-    // if all the updates are the latest thing in the index, we are done - don't go and fix history
-    if (ColumnTracker.isNewestTime(minTs)) {
-      return false;
-    }
-
-    // A.3 otherwise, we need to roll up through the current state and get the 'correct' view of the
-    // index. after this, we have the correct view of the index, from the batch up to the index
-    while(!ColumnTracker.isNewestTime(minTs) ){
-      minTs = addUpdateForGivenTimestamp(minTs, state, updateMap);
-    }
-
-    // B. only cleanup the current state if we need to - its a huge waste of effort otherwise.
-   if (requireCurrentStateCleanup) {
-      // roll back the pending update. This is needed so we can remove all the 'old' index entries.
-      // We don't need to do the puts here, but just the deletes at the given timestamps since we
-      // just want to completely hide the incorrect entries.
-      state.rollback(batch.getKvs());
-      // setup state
-      state.setPendingUpdates(batch.getKvs());
-
-      // cleanup the pending batch. If anything in the correct history is covered by Deletes used to
-      // 'fix' history (same row key and ts), we just drop the delete (we don't want to drop both
-      // because the update may have a different set of columns or value based on the update).
-      cleanupIndexStateFromBatchOnward(updateMap, batchTs, state);
-
-      // have to roll the state forward again, so the current state is correct
-      state.applyPendingUpdates();
-      return true;
-    }
-    return false;
-  }
-
-  private long addUpdateForGivenTimestamp(long ts, LocalTableState state,
-      IndexUpdateManager updateMap) throws IOException {
-    state.setCurrentTimestamp(ts);
-    ts = addCurrentStateMutationsForBatch(updateMap, state);
-    return ts;
-  }
-
-  private void addCleanupForCurrentBatch(IndexUpdateManager updateMap, long batchTs,
-      LocalTableState state) throws IOException {
-    // get the cleanup for the current state
-    state.setCurrentTimestamp(batchTs);
-    addDeleteUpdatesToMap(updateMap, state, batchTs);
-    // ignore any index tracking from the delete
-    state.resetTrackedColumns();
-  }
-  
-  /**
-   * Add the necessary mutations for the pending batch on the local state. Handles rolling up
-   * through history to determine the index changes after applying the batch (for the case where the
-   * batch is back in time).
-   * @param updateMap to update with index mutations
-   * @param batch to apply to the current state
-   * @param state current state of the table
-   * @return the minimum timestamp across all index columns requested. If
-   *         {@link ColumnTracker#isNewestTime(long)} returns <tt>true</tt> on the returned
-   *         timestamp, we know that this <i>was not a back-in-time update</i>.
- * @throws IOException 
-   */
-  private long
-      addCurrentStateMutationsForBatch(IndexUpdateManager updateMap, LocalTableState state) throws IOException {
-
-    // get the index updates for this current batch
-    Iterable<IndexUpdate> upserts = codec.getIndexUpserts(state);
-    state.resetTrackedColumns();
-
-    /*
-     * go through all the pending updates. If we are sure that all the entries are the latest
-     * timestamp, we can just add the index updates and move on. However, if there are columns that
-     * we skip past (based on the timestamp of the batch), we need to roll back up the history.
-     * Regardless of whether or not they are the latest timestamp, the entries here are going to be
-     * correct for the current batch timestamp, so we add them to the updates. The only thing we
-     * really care about it if we need to roll up the history and fix it as we go.
-     */
-    // timestamp of the next update we need to track
-    long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
-    List<IndexedColumnGroup> columnHints = new ArrayList<IndexedColumnGroup>();
-    for (IndexUpdate update : upserts) {
-      // this is the one bit where we check the timestamps
-      final ColumnTracker tracker = update.getIndexedColumns();
-      long trackerTs = tracker.getTS();
-      // update the next min TS we need to track
-      if (trackerTs < minTs) {
-        minTs = tracker.getTS();
-      }
-      // track index hints for the next round. Hint if we need an update for that column for the
-      // next timestamp. These columns clearly won't need to update as we go through time as they
-      // already match the most recent possible thing.
-      boolean needsCleanup = false;
-      if (tracker.hasNewerTimestamps()) {
-        columnHints.add(tracker);
-        // this update also needs to be cleaned up at the next timestamp because it not the latest.
-        needsCleanup = true;
-      }
-
-
-      // only make the put if the index update has been setup
-      if (update.isValid()) {
-        byte[] table = update.getTableName();
-        Mutation mutation = update.getUpdate();
-        updateMap.addIndexUpdate(table, mutation);
-
-        // only make the cleanup if we made a put and need cleanup
-        if (needsCleanup) {
-          // there is a TS for the interested columns that is greater than the columns in the
-          // put. Therefore, we need to issue a delete at the same timestamp
-          Delete d = new Delete(mutation.getRow());
-          d.setTimestamp(tracker.getTS());
-          updateMap.addIndexUpdate(table, d);
-        }
-      }
-    }
-    return minTs;
-  }
-
-  /**
-   * Cleanup the index based on the current state from the given batch. Iterates over each timestamp
-   * (for the indexed rows) for the current state of the table and cleans up all the existing
-   * entries generated by the codec.
-   * <p>
-   * Adds all pending updates to the updateMap
-   * @param updateMap updated with the pending index updates from the codec
-   * @param batchTs timestamp from which we should cleanup
-   * @param state current state of the primary table. Should already by setup to the correct state
-   *          from which we want to cleanup.
- * @throws IOException 
-   */
-  private void cleanupIndexStateFromBatchOnward(IndexUpdateManager updateMap,
-      long batchTs, LocalTableState state) throws IOException {
-    // get the cleanup for the current state
-    state.setCurrentTimestamp(batchTs);
-    addDeleteUpdatesToMap(updateMap, state, batchTs);
-    Set<ColumnTracker> trackers = state.getTrackedColumns();
-    long minTs = ColumnTracker.NO_NEWER_PRIMARY_TABLE_ENTRY_TIMESTAMP;
-    for (ColumnTracker tracker : trackers) {
-      if (tracker.getTS() < minTs) {
-        minTs = tracker.getTS();
-      }
-    }
-    state.resetTrackedColumns();
-    if (!ColumnTracker.isNewestTime(minTs)) {
-      state.setHints(Lists.newArrayList(trackers));
-      cleanupIndexStateFromBatchOnward(updateMap, minTs, state);
-    }
-  }
-
-
-  /**
-   * Get the index deletes from the codec {@link IndexCodec#getIndexDeletes(TableState)} and then
-   * add them to the update map.
-   * <p>
-   * Expects the {@link LocalTableState} to already be correctly setup (correct timestamp, updates
-   * applied, etc).
- * @throws IOException 
-   */
-  protected void
-      addDeleteUpdatesToMap(IndexUpdateManager updateMap,
-      LocalTableState state, long ts) throws IOException {
-    Iterable<IndexUpdate> cleanup = codec.getIndexDeletes(state);
-    if (cleanup != null) {
-      for (IndexUpdate d : cleanup) {
-        if (!d.isValid()) {
-          continue;
-        }
-        // override the timestamps in the delete to match the current batch.
-        Delete remove = (Delete)d.getUpdate();
-        remove.setTimestamp(ts);
-        updateMap.addIndexUpdate(d.getTableName(), remove);
-      }
-    }
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete d) throws IOException {
-    // stores all the return values
-    IndexUpdateManager updateMap = new IndexUpdateManager();
-
-    // We have to figure out which kind of delete it is, since we need to do different things if its
-    // a general (row) delete, versus a delete of just a single column or family
-    Map<byte[], List<Cell>> families = d.getFamilyCellMap();
-
-    /*
-     * Option 1: its a row delete marker, so we just need to delete the most recent state for each
-     * group, as of the specified timestamp in the delete. This can happen if we have a single row
-     * update and it is part of a batch mutation (prepare doesn't happen until later... maybe a
-     * bug?). In a single delete, this delete gets all the column families appended, so the family
-     * map won't be empty by the time it gets here.
-     */
-    if (families.size() == 0) {
-      LocalTableState state = new LocalTableState(env, localTable, d);
-      // get a consistent view of name
-      long now = d.getTimeStamp();
-      if (now == HConstants.LATEST_TIMESTAMP) {
-        now = EnvironmentEdgeManager.currentTimeMillis();
-        // update the delete's idea of 'now' to be consistent with the index
-        d.setTimestamp(now);
-      }
-      // get deletes from the codec
-      // we only need to get deletes and not add puts because this delete covers all columns
-      addDeleteUpdatesToMap(updateMap, state, now);
-
-      /*
-       * Update the current state for all the kvs in the delete. Generally, we would just iterate
-       * the family map, but since we go here, the family map is empty! Therefore, we need to fake a
-       * bunch of family deletes (just like hos HRegion#prepareDelete works). This is just needed
-       * for current version of HBase that has an issue where the batch update doesn't update the
-       * deletes before calling the hook.
-       */
-      byte[] deleteRow = d.getRow();
-      for (byte[] family : this.env.getRegion().getTableDesc().getFamiliesKeys()) {
-        state.addPendingUpdates(new KeyValue(deleteRow, family, null, now,
-            KeyValue.Type.DeleteFamily));
-      }
-    } else {
-      // Option 2: Its actually a bunch single updates, which can have different timestamps.
-      // Therefore, we need to do something similar to the put case and batch by timestamp
-      batchMutationAndAddUpdates(updateMap, d);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Found index updates for Delete: " + d + "\n" + updateMap);
-    }
-
-    return updateMap.toMap();
-  }
-
-  @Override
-  public Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
-      Collection<KeyValue> filtered) throws IOException {
-    // TODO Implement IndexBuilder.getIndexUpdateForFilteredRows
-    return null;
-  }
-
-  /**
-   * Exposed for testing!
-   * @param codec codec to use for this instance of the builder
-   */
-  public void setIndexCodecForTesting(IndexCodec codec) {
-    this.codec = codec;
-  }
-
-  @Override
-  public boolean isEnabled(Mutation m) throws IOException {
-    // ask the codec to see if we should even attempt indexing
-    return this.codec.isEnabled(m);
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
index daa631b..e3ef831 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/IndexCodec.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered;
 
@@ -22,89 +14,93 @@ import java.io.IOException;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-
 import org.apache.phoenix.index.BaseIndexCodec;
 
-
 /**
  * Codec for creating index updates from the current state of a table.
  * <p>
- * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as
- * features need to be added to the codec, as well as potentially not haivng to implement some
- * methods.
+ * Generally, you should extend {@link BaseIndexCodec} instead, so help maintain compatibility as features need to be
+ * added to the codec, as well as potentially not haivng to implement some methods.
  */
 public interface IndexCodec {
 
-  /**
-   * Do any code initialization necessary
-   * @param env environment in which the codec is operating
-   * @throws IOException if the codec cannot be initalized correctly
-   */
-  public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+    /**
+     * Do any code initialization necessary
+     * 
+     * @param env
+     *            environment in which the codec is operating
+     * @throws IOException
+     *             if the codec cannot be initalized correctly
+     */
+    public void initialize(RegionCoprocessorEnvironment env) throws IOException;
+
+    /**
+     * Get the index cleanup entries. Currently, this must return just single row deletes (where just the row-key is
+     * specified and no columns are returned) mapped to the table name. For instance, to you have an index 'myIndex'
+     * with row :
+     * 
+     * <pre>
+     * v1,v2,v3 | CF:CQ0  | rowkey
+     *          | CF:CQ1  | rowkey
+     * </pre>
+     * 
+     * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
+     * 
+     * @param state
+     *            the current state of the table that needs to be cleaned up. Generally, you only care about the latest
+     *            column values, for each column you are indexing for each index table.
+     * @return the pairs of (deletes, index table name) that should be applied.
+     * @throws IOException
+     */
+    public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
 
-  /**
-   * Get the index cleanup entries. Currently, this must return just single row deletes (where just
-   * the row-key is specified and no columns are returned) mapped to the table name. For instance,
-   * to you have an index 'myIndex' with row :
-   * 
-   * <pre>
-   * v1,v2,v3 | CF:CQ0  | rowkey
-   *          | CF:CQ1  | rowkey
-   * </pre>
-   * 
-   * To then cleanup this entry, you would just return 'v1,v2,v3', 'myIndex'.
-   * @param state the current state of the table that needs to be cleaned up. Generally, you only
-   *          care about the latest column values, for each column you are indexing for each index
-   *          table.
-   * @return the pairs of (deletes, index table name) that should be applied.
- * @throws IOException 
-   */
-  public Iterable<IndexUpdate> getIndexDeletes(TableState state) throws IOException;
+    // table state has the pending update already applied, before calling
+    // get the new index entries
+    /**
+     * Get the index updates for the primary table state, for each index table. The returned {@link Put}s need to be
+     * fully specified (including timestamp) to minimize passes over the same key-values multiple times.
+     * <p>
+     * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so the index entries
+     * match the primary table row. This could be managed at a higher level, but would require iterating all the kvs in
+     * the Put again - very inefficient when compared to the current interface where you must provide a timestamp
+     * anyways (so you might as well provide the right one).
+     * 
+     * @param state
+     *            the current state of the table that needs to an index update Generally, you only care about the latest
+     *            column values, for each column you are indexing for each index table.
+     * @return the pairs of (updates,index table name) that should be applied.
+     * @throws IOException
+     */
+    public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
 
-  // table state has the pending update already applied, before calling
-  // get the new index entries
-  /**
-   * Get the index updates for the primary table state, for each index table. The returned
-   * {@link Put}s need to be fully specified (including timestamp) to minimize passes over the same
-   * key-values multiple times.
-   * <p>
-   * You must specify the same timestamps on the Put as {@link TableState#getCurrentTimestamp()} so
-   * the index entries match the primary table row. This could be managed at a higher level, but
-   * would require iterating all the kvs in the Put again - very inefficient when compared to the
-   * current interface where you must provide a timestamp anyways (so you might as well provide the
-   * right one).
-   * @param state the current state of the table that needs to an index update Generally, you only
-   *          care about the latest column values, for each column you are indexing for each index
-   *          table.
-   * @return the pairs of (updates,index table name) that should be applied.
- * @throws IOException 
-   */
-  public Iterable<IndexUpdate> getIndexUpserts(TableState state) throws IOException;
+    /**
+     * This allows the codec to dynamically change whether or not indexing should take place for a table. If it doesn't
+     * take place, we can save a lot of time on the regular Put patch. By making it dynamic, we can save offlining and
+     * then onlining a table just to turn indexing on.
+     * <p>
+     * We can also be smart about even indexing a given update here too - if the update doesn't contain any columns that
+     * we care about indexing, we can save the effort of analyzing the put and further.
+     * 
+     * @param m
+     *            mutation that should be indexed.
+     * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table basis, as each
+     *         codec is instantiated per-region.
+     * @throws IOException
+     */
+    public boolean isEnabled(Mutation m) throws IOException;
 
-  /**
-   * This allows the codec to dynamically change whether or not indexing should take place for a
-   * table. If it doesn't take place, we can save a lot of time on the regular Put patch. By making
-   * it dynamic, we can save offlining and then onlining a table just to turn indexing on.
-   * <p>
-   * We can also be smart about even indexing a given update here too - if the update doesn't
-   * contain any columns that we care about indexing, we can save the effort of analyzing the put
-   * and further.
-   * @param m mutation that should be indexed.
-   * @return <tt>true</tt> if indexing is enabled for the given table. This should be on a per-table
-   *         basis, as each codec is instantiated per-region.
- * @throws IOException 
-   */
-  public boolean isEnabled(Mutation m) throws IOException;
+    /**
+     * Get the batch identifier of the given mutation. Generally, updates to the table will take place in a batch of
+     * updates; if we know that the mutation is part of a batch, we can build the state much more intelligently.
+     * <p>
+     * <b>If you have batches that have multiple updates to the same row state, you must specify a batch id for each
+     * batch. Otherwise, we cannot guarantee index correctness</b>
+     * 
+     * @param m
+     *            mutation that may or may not be part of the batch
+     * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
+     */
+    public byte[] getBatchId(Mutation m);
 
-  /**
-   * Get the batch identifier of the given mutation. Generally, updates to the table will take place
-   * in a batch of updates; if we know that the mutation is part of a batch, we can build the state
-   * much more intelligently.
-   * <p>
-   * <b>If you have batches that have multiple updates to the same row state, you must specify a
-   * batch id for each batch. Otherwise, we cannot guarantee index correctness</b>
-   * @param m mutation that may or may not be part of the batch
-   * @return <tt>null</tt> if the mutation is not part of a batch or an id for the batch.
-   */
-  public byte[] getBatchId(Mutation m);
+    public void setContext(TableState state, Mutation mutation) throws IOException;
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/5a558e16/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
index e4bc193..f47a71a 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/LocalTableState.java
@@ -1,19 +1,11 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+ * governing permissions and limitations under the License.
  */
 package org.apache.phoenix.hbase.index.covered;
 
@@ -34,7 +26,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.util.Pair;
-
+import org.apache.phoenix.hbase.index.ValueGetter;
 import org.apache.phoenix.hbase.index.covered.data.IndexMemStore;
 import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
@@ -42,203 +34,250 @@ import org.apache.phoenix.hbase.index.covered.update.ColumnTracker;
 import org.apache.phoenix.hbase.index.covered.update.IndexedColumnGroup;
 import org.apache.phoenix.hbase.index.scanner.Scanner;
 import org.apache.phoenix.hbase.index.scanner.ScannerBuilder;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
+
+import com.google.common.collect.Maps;
 
 /**
  * Manage the state of the HRegion's view of the table, for the single row.
  * <p>
- * Currently, this is a single-use object - you need to create a new one for each row that you need
- * to manage. In the future, we could make this object reusable, but for the moment its easier to
- * manage as a throw-away object.
+ * Currently, this is a single-use object - you need to create a new one for each row that you need to manage. In the
+ * future, we could make this object reusable, but for the moment its easier to manage as a throw-away object.
  * <p>
- * This class is <b>not</b> thread-safe - it requires external synchronization is access
- * concurrently.
+ * This class is <b>not</b> thread-safe - it requires external synchronization is access concurrently.
  */
 public class LocalTableState implements TableState {
 
-  private long ts;
-  private RegionCoprocessorEnvironment env;
-  private KeyValueStore memstore;
-  private LocalHBaseState table;
-  private Mutation update;
-  private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
-  private ScannerBuilder scannerBuilder;
-  private List<KeyValue> kvs = new ArrayList<KeyValue>();
-  private List<? extends IndexedColumnGroup> hints;
-  private CoveredColumns columnSet;
-
-  public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
-    this.env = environment;
-    this.table = table;
-    this.update = update;
-    this.memstore = new IndexMemStore();
-    this.scannerBuilder = new ScannerBuilder(memstore, update);
-    this.columnSet = new CoveredColumns();
-  }
-
-  public void addPendingUpdates(KeyValue... kvs) {
-    if (kvs == null) return;
-    addPendingUpdates(Arrays.asList(kvs));
-  }
-
-  public void addPendingUpdates(List<KeyValue> kvs) {
-    if(kvs == null) return;
-    setPendingUpdates(kvs);
-    addUpdate(kvs);
-  }
-
-  private void addUpdate(List<KeyValue> list) {
-    addUpdate(list, true);
-  }
-
-  private void addUpdate(List<KeyValue> list, boolean overwrite) {
-    if (list == null) return;
-    for (KeyValue kv : list) {
-      this.memstore.add(kv, overwrite);
-    }
-  }
-
-  @Override
-  public RegionCoprocessorEnvironment getEnvironment() {
-    return this.env;
-  }
-
-  @Override
-  public long getCurrentTimestamp() {
-    return this.ts;
-  }
-
-  @Override
-  public void setCurrentTimestamp(long timestamp) {
-    this.ts = timestamp;
-  }
-
-  public void resetTrackedColumns() {
-    this.trackedColumns.clear();
-  }
-
-  public Set<ColumnTracker> getTrackedColumns() {
-    return this.trackedColumns;
-  }
-
-  @Override
-  public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
-      Collection<? extends ColumnReference> indexedColumns) throws IOException {
-    ensureLocalStateInitialized(indexedColumns);
-    // filter out things with a newer timestamp and track the column references to which it applies
-    ColumnTracker tracker = new ColumnTracker(indexedColumns);
-    synchronized (this.trackedColumns) {
-      // we haven't seen this set of columns before, so we need to create a new tracker
-      if (!this.trackedColumns.contains(tracker)) {
-        this.trackedColumns.add(tracker);
-      }
-    }
-
-    Scanner scanner =
-        this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
-
-    return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
-  }
-
-  /**
-   * Initialize the managed local state. Generally, this will only be called by
-   * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside.
-   * Even then, there is still fairly low contention as each new Put/Delete will have its own table
-   * state.
-   */
-  private synchronized void ensureLocalStateInitialized(
-      Collection<? extends ColumnReference> columns) throws IOException {
-    // check to see if we haven't initialized any columns yet
-    Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
-    // we have all the columns loaded, so we are good to go.
-    if (toCover.isEmpty()) {
-      return;
-    }
-
-    // add the current state of the row
-    this.addUpdate(this.table.getCurrentRowState(update, toCover).list(), false);
-
-    // add the covered columns to the set
-    for (ColumnReference ref : toCover) {
-      this.columnSet.addColumn(ref);
-    }
-  }
-
-  @Override
-  public Map<String, byte[]> getUpdateAttributes() {
-    return this.update.getAttributesMap();
-  }
-
-  @Override
-  public byte[] getCurrentRowKey() {
-    return this.update.getRow();
-  }
-
-  public Result getCurrentRowState() {
-    KeyValueScanner scanner = this.memstore.getScanner();
-    List<Cell> kvs = new ArrayList<Cell>();
-    while (scanner.peek() != null) {
-      try {
-        kvs.add(scanner.next());
-      } catch (IOException e) {
-        // this should never happen - something has gone terribly arwy if it has
-        throw new RuntimeException("Local MemStore threw IOException!");
-      }
-    }
-    return Result.create(kvs);
-  }
-
-  /**
-   * Helper to add a {@link Mutation} to the values stored for the current row
-   * @param pendingUpdate update to apply
-   */
-  public void addUpdateForTesting(Mutation pendingUpdate) {
-    for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
-      List<KeyValue> edits = KeyValueUtil.ensureKeyValues(e.getValue());
-      addUpdate(edits);
-    }
-  }
-
-  /**
-   * @param hints
-   */
-  public void setHints(List<? extends IndexedColumnGroup> hints) {
-    this.hints = hints;
-  }
-
-  @Override
-  public List<? extends IndexedColumnGroup> getIndexColumnHints() {
-    return this.hints;
-  }
-
-  @Override
-  public Collection<KeyValue> getPendingUpdate() {
-    return this.kvs;
-  }
-
-  /**
-   * Set the {@link KeyValue}s in the update for which we are currently building an index update,
-   * but don't actually apply them.
-   * @param update pending {@link KeyValue}s
-   */
-  public void setPendingUpdates(Collection<KeyValue> update) {
-    this.kvs.clear();
-    this.kvs.addAll(update);
-  }
-
-  /**
-   * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
-   */
-  public void applyPendingUpdates() {
-    this.addUpdate(kvs);
-  }
-
-  /**
-   * Rollback all the given values from the underlying state.
-   * @param values
-   */
-  public void rollback(Collection<KeyValue> values) {
-    for (KeyValue kv : values) {
-      this.memstore.rollback(kv);
-    }
-  }
+    private long ts;
+    private RegionCoprocessorEnvironment env;
+    private KeyValueStore memstore;
+    private LocalHBaseState table;
+    private Mutation update;
+    private Set<ColumnTracker> trackedColumns = new HashSet<ColumnTracker>();
+    private ScannerBuilder scannerBuilder;
+    private List<Cell> kvs = new ArrayList<Cell>();
+    private List<? extends IndexedColumnGroup> hints;
+    private CoveredColumns columnSet;
+    private final Map<String,Object> context = Maps.newHashMap();
+
+    public LocalTableState(RegionCoprocessorEnvironment environment, LocalHBaseState table, Mutation update) {
+        this.env = environment;
+        this.table = table;
+        this.update = update;
+        this.memstore = new IndexMemStore();
+        this.scannerBuilder = new ScannerBuilder(memstore, update);
+        this.columnSet = new CoveredColumns();
+    }
+
+    public void addPendingUpdates(Cell... kvs) {
+        if (kvs == null) return;
+        addPendingUpdates(Arrays.asList(kvs));
+    }
+
+    public void addPendingUpdates(List<Cell> kvs) {
+        if (kvs == null) return;
+        setPendingUpdates(kvs);
+        addUpdate(kvs);
+    }
+
+    private void addUpdate(List<Cell> list) {
+        addUpdate(list, true);
+    }
+
+    private void addUpdate(List<Cell> list, boolean overwrite) {
+        if (list == null) return;
+        for (Cell kv : list) {
+            this.memstore.add(KeyValueUtil.ensureKeyValue(kv), overwrite);
+        }
+    }
+
+    @Override
+    public RegionCoprocessorEnvironment getEnvironment() {
+        return this.env;
+    }
+
+    @Override
+    public long getCurrentTimestamp() {
+        return this.ts;
+    }
+
+    /**
+     * Set the current timestamp up to which the table should allow access to the underlying table.
+     * This overrides the timestamp view provided by the indexer - use with care!
+     * @param timestamp timestamp up to which the table should allow access.
+     */
+    public void setCurrentTimestamp(long timestamp) {
+        this.ts = timestamp;
+    }
+
+    public void resetTrackedColumns() {
+        this.trackedColumns.clear();
+    }
+
+    public Set<ColumnTracker> getTrackedColumns() {
+        return this.trackedColumns;
+    }
+
+    /**
+     * Get a scanner on the columns that are needed by the index.
+     * <p>
+     * The returned scanner is already pre-seeked to the first {@link KeyValue} that matches the given
+     * columns with a timestamp earlier than the timestamp to which the table is currently set (the
+     * current state of the table for which we need to build an update).
+     * <p>
+     * If none of the passed columns matches any of the columns in the pending update (as determined
+     * by {@link ColumnReference#matchesFamily(byte[])} and
+     * {@link ColumnReference#matchesQualifier(byte[])}, then an empty scanner will be returned. This
+     * is because it doesn't make sense to build index updates when there is no change in the table
+     * state for any of the columns you are indexing.
+     * <p>
+     * <i>NOTE:</i> This method should <b>not</b> be used during
+     * {@link IndexCodec#getIndexDeletes(TableState)} as the pending update will not yet have been
+     * applied - you are merely attempting to cleanup the current state and therefore do <i>not</i>
+     * need to track the indexed columns.
+     * <p>
+     * As a side-effect, we update a timestamp for the next-most-recent timestamp for the columns you
+     * request - you will never see a column with the timestamp we are tracking, but the next oldest
+     * timestamp for that column.
+     * @param indexedColumns the columns to that will be indexed
+     * @return an iterator over the columns and the {@link IndexUpdate} that should be passed back to
+     *         the builder. Even if no update is necessary for the requested columns, you still need
+     *         to return the {@link IndexUpdate}, just don't set the update for the
+     *         {@link IndexUpdate}.
+     * @throws IOException
+     */
+    public Pair<Scanner, IndexUpdate> getIndexedColumnsTableState(
+        Collection<? extends ColumnReference> indexedColumns) throws IOException {
+        ensureLocalStateInitialized(indexedColumns);
+        // filter out things with a newer timestamp and track the column references to which it applies
+        ColumnTracker tracker = new ColumnTracker(indexedColumns);
+        synchronized (this.trackedColumns) {
+            // we haven't seen this set of columns before, so we need to create a new tracker
+            if (!this.trackedColumns.contains(tracker)) {
+                this.trackedColumns.add(tracker);
+            }
+        }
+
+        Scanner scanner = this.scannerBuilder.buildIndexedColumnScanner(indexedColumns, tracker, ts);
+
+        return new Pair<Scanner, IndexUpdate>(scanner, new IndexUpdate(tracker));
+    }
+
+    /**
+     * Initialize the managed local state. Generally, this will only be called by
+     * {@link #getNonIndexedColumnsTableState(List)}, which is unlikely to be called concurrently from the outside. Even
+     * then, there is still fairly low contention as each new Put/Delete will have its own table state.
+     */
+    private synchronized void ensureLocalStateInitialized(Collection<? extends ColumnReference> columns)
+            throws IOException {
+        // check to see if we haven't initialized any columns yet
+        Collection<? extends ColumnReference> toCover = this.columnSet.findNonCoveredColumns(columns);
+        // we have all the columns loaded, so we are good to go.
+        if (toCover.isEmpty()) { return; }
+
+        // add the current state of the row
+        this.addUpdate(this.table.getCurrentRowState(update, toCover).listCells(), false);
+
+        // add the covered columns to the set
+        for (ColumnReference ref : toCover) {
+            this.columnSet.addColumn(ref);
+        }
+    }
+
+    @Override
+    public Map<String, byte[]> getUpdateAttributes() {
+        return this.update.getAttributesMap();
+    }
+
+    @Override
+    public byte[] getCurrentRowKey() {
+        return this.update.getRow();
+    }
+
+    public Result getCurrentRowState() {
+        KeyValueScanner scanner = this.memstore.getScanner();
+        List<Cell> kvs = new ArrayList<Cell>();
+        while (scanner.peek() != null) {
+            try {
+                kvs.add(scanner.next());
+            } catch (IOException e) {
+                // this should never happen - something has gone terribly arwy if it has
+                throw new RuntimeException("Local MemStore threw IOException!");
+            }
+        }
+        return Result.create(kvs);
+    }
+
+    /**
+     * Helper to add a {@link Mutation} to the values stored for the current row
+     * 
+     * @param pendingUpdate
+     *            update to apply
+     */
+    public void addUpdateForTesting(Mutation pendingUpdate) {
+        for (Map.Entry<byte[], List<Cell>> e : pendingUpdate.getFamilyCellMap().entrySet()) {
+            List<Cell> edits = e.getValue();
+            addUpdate(edits);
+        }
+    }
+
+    /**
+     * @param hints
+     */
+    public void setHints(List<? extends IndexedColumnGroup> hints) {
+        this.hints = hints;
+    }
+
+    @Override
+    public List<? extends IndexedColumnGroup> getIndexColumnHints() {
+        return this.hints;
+    }
+
+    @Override
+    public Collection<Cell> getPendingUpdate() {
+        return this.kvs;
+    }
+
+    /**
+     * Set the {@link KeyValue}s in the update for which we are currently building an index update, but don't actually
+     * apply them.
+     * 
+     * @param update
+     *            pending {@link KeyValue}s
+     */
+    public void setPendingUpdates(Collection<Cell> update) {
+        this.kvs.clear();
+        this.kvs.addAll(update);
+    }
+
+    /**
+     * Apply the {@link KeyValue}s set in {@link #setPendingUpdates(Collection)}.
+     */
+    public void applyPendingUpdates() {
+        this.addUpdate(kvs);
+    }
+
+    /**
+     * Rollback all the given values from the underlying state.
+     * 
+     * @param values
+     */
+    public void rollback(Collection<Cell> values) {
+        for (Cell kv : values) {
+            this.memstore.rollback(KeyValueUtil.ensureKeyValue(kv));
+        }
+    }
+
+    @Override
+    public Pair<ValueGetter, IndexUpdate> getIndexUpdateState(Collection<? extends ColumnReference> indexedColumns)
+            throws IOException {
+        Pair<Scanner, IndexUpdate> pair = getIndexedColumnsTableState(indexedColumns);
+        ValueGetter valueGetter = IndexManagementUtil.createGetterFromScanner(pair.getFirst(), getCurrentRowKey());
+        return new Pair<ValueGetter, IndexUpdate>(valueGetter, pair.getSecond());
+    }
+
+    @Override
+    public Map<String, Object> getContext() {
+        return context;
+    }
 }
\ No newline at end of file